<@ULVA73B9P> can you show me an example of using a...
# ask-marvin
j
@Marvin can you show me an example of using asynchronous sub-flows where each sub-flow has multiple tasks?
m
Asynchronous sub-flows enable you to create modular components within your workflow with Prefect 2.x. When using sub-flows, each can contain multiple tasks that are organized in a flexible way without the need for a rigid Directed Acyclic Graph (DAG) structure that was typical in Prefect Core 1.0. Sub-flows can work in parallel or sequentially, based on the logic you define in your workflow. Regretfully, I couldn't find a direct example in the documentation through the search. However, I can conceptualize how you might set up an asynchronous sub-flow with multiple tasks:
Copy code
python
from prefect import flow, task
from asyncio import gather

@task
def task1():
    # perform some operation
    return "result from task 1"

@task
def task2():
    # perform some operation
    return "result from task 2"

@flow
def sub_flow():
    # async calling multiple tasks within the sub-flow
    result1 = task1.submit()
    result2 = task2.submit()
    return gather(result1, result2)

@flow
def main_flow():
    # Running sub-flows asynchronously
    sub_flow_1_future = sub_flow.submit()
    sub_flow_2_future = sub_flow.submit()

    # Optionally, you can await their results together, if needed
    results = gather(sub_flow_1_future, sub_flow_2_future)
    return results

# Call main flow to run everything
main_flow()
In this example: - We have two simple tasks
task1
and
task2
, which return some results. - The
sub_flow
is defined as a flow where
task1
and
task2
are called asynchronously using the
submit
method on tasks. We use
gather
to run them concurrently within the sub-flow. - The
main_flow
represents the primary flow, where multiple
sub_flow
instances are launched asynchronously with the
submit
method on the flow itself. For waiting on all results, we again use
gather
. This is a highly simplified example of how you might structure your flows and tasks to operate asynchronously, which can be especially useful when performing parallel data processing tasks. You can adapt this pattern to more complex scenarios as needed.
🙅 1
k
Almost! flows don't have a
submit
method, but you can write an
async def flow_func
and gather the results of multiple async subflow calls.
Copy code
from prefect import flow
from anyio import sleep, run
from asyncio import gather


@flow
async def hello():
    results = await gather(subflow1(), subflow2())
    print(results)

    print("both subflows finished")


@flow
async def subflow1():
    await sleep(3)
    print("hello from subflow 1")
    return 1


@flow
async def subflow2():
    await sleep(6)
    print("hello from subflow 2")
    return 2


if __name__ == "__main__":
    run(hello)
Copy code
12:55:40.642 | INFO    | prefect.engine - Created flow run 'papaya-ladybug' for flow 'hello'
12:55:41.279 | INFO    | Flow run 'papaya-ladybug' - Created subflow run 'proficient-alpaca' for flow 'subflow1'
12:55:41.315 | INFO    | Flow run 'papaya-ladybug' - Created subflow run 'brown-boar' for flow 'subflow2'
hello from subflow 1
12:55:45.132 | INFO    | Flow run 'proficient-alpaca' - Finished in state Completed()
hello from subflow 2
12:55:47.704 | INFO    | Flow run 'brown-boar' - Finished in state Completed()
[1, 2]
both subflows finished
12:55:47.866 | INFO    | Flow run 'papaya-ladybug' - Finished in state Completed('All states completed.')
j
@Kevin Grismore thanks for the example! I think I'm going to get into some trouble because I need to do some things synchronously so I can pass their return values to both sub-flows. Is this possible? I hardly ever use asyncio due to the complexity
Copy code
@flow(log_prints=True, name='HQ - Locations')
async def locations_flow(creds, snowflake_session):
    # these are both sync tasks but must be called sequentially
    locations = get_locations(creds)
    load_locations(locations, snowflake_session)


@flow(log_prints=True, name="HQ - Vehicles")
async def vehicles_flow(creds, snowflake_session):
    # these are both sync tasks but must be called sequentially
    vehicles = get_vehicles(creds)
    load_vehicles(vehicles, snowflake_session)


@flow(log_prints=True, name='HQ Integration')
async def main():
    # these two are not tasks, and they must be called sequentially
    # so they can be passed to the 2 sub-flows
    creds = get_private_keys()
    s = snow.Session.builder.configs(
            {
               ... (creds is used here)
            }
    ).create()

    await asyncio.gather(locations_flow(creds, s), vehicles_flow(creds, s))
k
So, something like this? I wrote some sync tasks, want them to execute in the order I'm calling them in the async subflows.
Copy code
from prefect import flow, task
from anyio import run
from asyncio import gather


@flow
async def hello():
    print("this should be first")
    results = await gather(subflow1(), subflow2())
    print(results)

    print("both subflows finished")


@flow
async def subflow1():
    print("hello from subflow 1")
    howdy()
    whatup()
    return 1


@flow
async def subflow2():
    print("hello from subflow 2")
    whatup()
    howdy()
    return 2


@task
def whatup():
    print("whatup")


@task
def howdy():
    print("howdy")


if __name__ == "__main__":
    run(hello)
🤩 1
Since the tasks themselves are sync, they're not going to give off the appearance of running at the same time across subflows
j
I think my issue is that I'm passing arguments to the sub-flows and that's what's giving me errors. My
creds
object relies on getting an AWS block.
k
you need to
await
your block loading
What are you looking to gain by using async? In my above example with only sync tasks, it would be no different if I just ran both subflows in sequence, since there's no awaitable function being called to take advantage of the async context
j
I'm trying to run separate, independent groups of tasks at the same time. I'm hitting an endpoint and pushing the data to snowflake, and I'm doing this for multiple endpoints.
k
"at the same time" can mean different things depending on what you're doing. The tangible benefit async gives you is that you can have your code do other things while you're waiting for something else (like a network data transfer) to happen
but if the requests you're actually making here are synchronous, or in sync tasks, then in the end your tasks are all gonna run sequentially anyway
j
The requests can all be done concurrently as well as pushing their results. The only synchronous part is that I need to get credentials that are the same for all endpoints and use those in each group of tasks. Do I just need to switch these to normal flows and schedule them all at the same time?
This is what my code looked like before I started trying to async the 3 chunks at the bottom:
Copy code
@flow(log_prints=True, name='HQ Integration')
def main():
    creds = get_private_keys()
    s = snow.Session.builder.configs(
            {
                creds
            }
    ).create()

        locations = get_locations(creds)
        load_locations(locations, s)

        vehicles = get_vehicles(creds)
        load_vehicles(vehicles, s)

        reservations = get_reservations(creds, last_resv_date)
        load_reservations(reservations, s)
Those 3 sets at the bottom all use the same creds and snowflake session, so I'd like to run each set at the same time while wrapped in this parent flow and I really don't want to make 3 deployments with different entry points
Now that I fixed the credentials part I'm not getting any errors, and the flows are created at the same time like I expect, but the tasks inside them all execute sequentially. Really weird - I must not understand how the tasks block each other
it's probably because they're all
sync
and I'm not awaiting anything in the sub-flows. I just assumed that each sub-flow and their tasks would run totally independently of any others. I'll keep playing with it
👀 1
@Kevin Grismore I got it to work, thanks again*10! I've got some tweaks to make because I think my Snowflake session is blocking me but this screenshot makes me real happy @Marvin you need to take lessons from Kevin 😄
🤩 1
k
Nice!!!!!!!!