<@ULVA73B9P> In Prefect 2.20, how can I run severa...
# ask-marvin
m
@Marvin In Prefect 2.20, how can I run several subflows asynchronously. For example, a defined flow called
extract
called multiple times with different parameters.
Copy code
@flow
def main():
     extract("file1.sql")
     extract("file2.sql")
     extract("file3.sql")

main()
I want the
main
flow to wait for all three to complete, and I want them all to run concurrently not sequentially.
m
In Prefect 2.20, you can run subflows concurrently by using the
submit
method on tasks or subflows. This allows them to execute asynchronously, and the
main
flow will wait for all to complete by collecting their futures. Here's how you can modify your
main
flow to achieve this:
Copy code
python
from prefect import flow, task

@flow
def extract(file_name):
    print(f"Processing {file_name}")

@flow
def main():
    # Submit subflows to run concurrently
    future1 = extract.submit("file1.sql")
    future2 = extract.submit("file2.sql")
    future3 = extract.submit("file3.sql")
    
    # Wait for all subflows to complete
    future1.result()
    future2.result()
    future3.result()

main()
By using
.submit()
, each call to
extract
is run asynchronously, and
.result()
waits for their completion. For further details, you can refer to the Prefect 2.x documentation on flows.
m
@Marvin But I thought
submit()
is defined for tasks, not flows.
m
I apologize for the confusion. In Prefect 2.x, the
submit()
method is indeed intended for tasks, not flows. To run subflows concurrently, you can use the
create_flow_run
method from the Prefect client to trigger the subflows asynchronously. Here's how you can achieve that:
Copy code
python
from prefect import flow
from prefect.client import get_client

@flow
def extract(file_name):
    print(f"Processing {file_name}")

@flow
async def main():
    client = get_client()
    
    # Create flow runs for each subflow
    flow_run1 = await client.create_flow_run(
        flow=extract, parameters={"file_name": "file1.sql"}
    )
    flow_run2 = await client.create_flow_run(
        flow=extract, parameters={"file_name": "file2.sql"}
    )
    flow_run3 = await client.create_flow_run(
        flow=extract, parameters={"file_name": "file3.sql"}
    )
    
    # Wait for all subflows to complete
    await flow_run1.wait()
    await flow_run2.wait()
    await flow_run3.wait()

main()
This approach uses asynchronous flow runs to achieve concurrent execution. You can find more details on flow concurrency in the Prefect 2.x documentation.
m
@Marvin Do I need to use
create_flow_run
, why not just call
await extract("file1.sql")
etc.
m
In Prefect 2.x, calling a subflow directly (e.g.,
await extract("file1.sql")
) will execute it synchronously within the parent flow, meaning it will block the execution of subsequent subflows until it completes. To run subflows concurrently, you need to create separate flow runs for each subflow. Using
create_flow_run
allows you to trigger each subflow asynchronously and wait for their completion independently. This is necessary because subflows, when called directly, are not inherently asynchronous. For more in-depth information on flow concurrency, you can refer to the Prefect 2.x documentation on flows.
m
@Marvin Could I use python task groups to create multiple async flow runs, like:
Copy code
async with asyncio.TaskGroup() as tg:
        flow_runs = []
        for file in file_list:
            task = tg.create_task(
                extract(
                    filename=file
                    )
                )
            flow_runs.append(task)
m
Yes, you can use Python's
asyncio.TaskGroup
to create multiple asynchronous flow runs in Prefect 2.x. This approach is compatible with Prefect's support for asynchronous task and flow definitions. Here's how you can structure your code:
Copy code
python
import asyncio
from prefect import flow

@flow
async def extract(filename):
    print(f"Processing {filename}")

@flow
async def main(file_list):
    async with asyncio.TaskGroup() as tg:
        flow_runs = []
        for file in file_list:
            task = tg.create_task(extract(filename=file))
            flow_runs.append(task)
    
    # Optionally, wait for all tasks to complete
    await asyncio.gather(*flow_runs)

file_list = ["file1.sql", "file2.sql", "file3.sql"]
await main(file_list)
This code will run the
extract
subflows concurrently for each file in the
file_list
. The
asyncio.TaskGroup
ensures that all tasks are awaited and completed before proceeding. For more details on using async patterns with Prefect, you can refer to the Prefect 2.x documentation on tasks.
m
@Marvin must I define the flow
extract
with
async def extract(...)
? This flow has many nested flows and tasks that I dont also want to add
async
definitions for.
m
No, you don't need to define the
extract
flow as
async def
if you prefer not to. You can still use
asyncio
to run multiple instances of a synchronous flow concurrently by using
loop.run_in_executor
or similar constructs, but this approach can be cumbersome and less efficient. For simplicity, if
extract
is a synchronous flow, you can directly call it within an
asyncio
context without making it asynchronous. Here's how you can manage it:
Copy code
python
import asyncio
from prefect import flow

@flow
def extract(filename):
    print(f"Processing {filename}")

@flow
async def main(file_list):
    loop = asyncio.get_event_loop()
    tasks = [
        loop.run_in_executor(None, extract, file)
        for file in file_list
    ]
    
    # Wait for all tasks to complete
    await asyncio.gather(*tasks)

file_list = ["file1.sql", "file2.sql", "file3.sql"]
await main(file_list)
In this setup,
loop.run_in_executor
allows synchronous functions to be run concurrently in separate threads. This way, you don't have to modify the
extract
flow or its nested components to be asynchronous.
m
@Marvin Very good, is there a way to do this with a task group context like
async with asyncio.TaskGroup() as tg:
?
m
Unfortunately,
asyncio.TaskGroup
is designed to work with coroutines, not synchronous functions. This means you cannot directly use it with synchronous flows or tasks without converting them into coroutines. However, you can wrap your synchronous flow calls in an asynchronous function using
loop.run_in_executor
and then use
asyncio.gather
to manage them. Here's how you can structure it:
Copy code
python
import asyncio
from prefect import flow

@flow
def extract(filename):
    print(f"Processing {filename}")

async def run_extract(filename):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, extract, filename)

@flow
async def main(file_list):
    async with asyncio.TaskGroup() as tg:
        for file in file_list:
            tg.create_task(run_extract(file))

file_list = ["file1.sql", "file2.sql", "file3.sql"]
await main(file_list)
In this setup,
run_extract
is an asynchronous wrapper around the
extract
flow, allowing you to use
asyncio.TaskGroup
effectively. This ensures that all flow runs are managed as asynchronous tasks.