https://prefect.io logo
Title
m

Mac

11/16/2022, 4:29 PM
Hi, is there an equivalent of
create_flow_run.map()
from prefect v1 in v2? I am trying to run a few subflows at once, then have the flow wait to get successful responses from all of the subflow before moving on the the next task
m

Mason Menges

11/16/2022, 10:19 PM
Hey Bo, this can kind of depend on your use case, would you be willing to share more about what you're trying to accomplish with this pattern, are these subflows separate deployments or just subflows your calling as part of a main flow in the same file/directory?
m

Mac

11/16/2022, 10:43 PM
I have a deployment that I can reuse for multiple EL pipelines by passing different environment variables. I was to create a new deployment flow that will kick off multiple EL pipelines (say 3), wait for all of them to finish, then start a dbt job to finish off the “T”. I accomplished this in Prefect 1
m

Mason Menges

11/16/2022, 10:50 PM
You should be able to use run deployment to accomplish this, by default this waits for the flow run to finish before returning a value, you could then append the returned flow to a list, the run deployment returns the flow run object which will include the state for the flow as well. I believe we have an example of this somewhere I'll see if I can dig it up 😄
m

Mac

11/16/2022, 10:51 PM
I’m using the run_deployment method in some other flows, but just not sure how to run those async, but then wait for all the results. An example would be excellent!
@Mason Menges Did you manage to find an example?
m

Mason Menges

11/22/2022, 5:45 PM
Hey @Mac I actually didn't find one specifically so I was going to write one up, I haven't had the chance to do so just yet been putting out a few fires over the weekend 😅 I'll see if I can throw one together today though 😄
m

Mac

11/22/2022, 5:46 PM
That would be excellent! Please keep me posted, thank you!!
m

Mason Menges

11/23/2022, 1:43 AM
Hey @Mac This is a simple example, setting the timeout to 0 on deployments will allow you to kick off the other flow runs concurrently and from their you are just utilizing the api to query for the state of the flow runs until they're all completed. There are likely other ways to achieve this as well but this pattern should work pretty well. The logger states are just there to help visualize the pattern the flow is following 😄
from prefect import flow, get_run_logger, get_client
from prefect.deployments import run_deployment
import asyncio
import time


@flow
async def test_flow():
    logger = get_run_logger()

    list_inputs = ["grass", "fire"]

    results = []
    for type in list_inputs:
        deployed_run = await run_deployment(
            name="create-pk-team/pk-flow-dev", 
            parameters={"pk_type": type}, 
            flow_run_name=f"{type} create-pk-team/pk-flow-dev", 
            timeout=0)

        results.append(deployed_run.id)


    api_client = get_client()

    while len(results) > 0:
        <http://logger.info|logger.info>("in while loop")
        for id in results:
            <http://logger.info|logger.info>(f"checking flow run id: {id}")
            deployed_run = await api_client.read_flow_run(id)
            run_state = deployed_run.state
            <http://logger.info|logger.info>(f"Run {id} Completed {run_state}")
            if run_state.is_final() == True:
                <http://logger.info|logger.info>("removed result")
                results.remove(id)
                <http://logger.info|logger.info>(results)
        time.sleep(5)

    <http://logger.info|logger.info>("I waited")

if __name__ == "__main__":
    asyncio.run(test_flow())
m

Mac

11/23/2022, 2:13 PM
Thanks @Mason Menges, I will take a look a try this out!
This worked, thank you!!
🙌 1