merlin
01/07/2025, 6:44 PMextract
called multiple times with different parameters.
@flow
def main():
extract("file1.sql")
extract("file2.sql")
extract("file3.sql")
main()
I want the main
flow to wait for all three to complete, and I want them all to run concurrently not sequentially.Marvin
01/07/2025, 6:44 PMsubmit
method on tasks or subflows. This allows them to execute asynchronously, and the main
flow will wait for all to complete by collecting their futures.
Here's how you can modify your main
flow to achieve this:
python
from prefect import flow, task
@flow
def extract(file_name):
print(f"Processing {file_name}")
@flow
def main():
# Submit subflows to run concurrently
future1 = extract.submit("file1.sql")
future2 = extract.submit("file2.sql")
future3 = extract.submit("file3.sql")
# Wait for all subflows to complete
future1.result()
future2.result()
future3.result()
main()
By using .submit()
, each call to extract
is run asynchronously, and .result()
waits for their completion. For further details, you can refer to the Prefect 2.x documentation on flows.merlin
01/07/2025, 6:45 PMsubmit()
is defined for tasks, not flows.Marvin
01/07/2025, 6:45 PMsubmit()
method is indeed intended for tasks, not flows. To run subflows concurrently, you can use the create_flow_run
method from the Prefect client to trigger the subflows asynchronously. Here's how you can achieve that:
python
from prefect import flow
from prefect.client import get_client
@flow
def extract(file_name):
print(f"Processing {file_name}")
@flow
async def main():
client = get_client()
# Create flow runs for each subflow
flow_run1 = await client.create_flow_run(
flow=extract, parameters={"file_name": "file1.sql"}
)
flow_run2 = await client.create_flow_run(
flow=extract, parameters={"file_name": "file2.sql"}
)
flow_run3 = await client.create_flow_run(
flow=extract, parameters={"file_name": "file3.sql"}
)
# Wait for all subflows to complete
await flow_run1.wait()
await flow_run2.wait()
await flow_run3.wait()
main()
This approach uses asynchronous flow runs to achieve concurrent execution. You can find more details on flow concurrency in the Prefect 2.x documentation.merlin
01/07/2025, 6:47 PMcreate_flow_run
, why not just call await extract("file1.sql")
etc.Marvin
01/07/2025, 6:47 PMawait extract("file1.sql")
) will execute it synchronously within the parent flow, meaning it will block the execution of subsequent subflows until it completes.
To run subflows concurrently, you need to create separate flow runs for each subflow. Using create_flow_run
allows you to trigger each subflow asynchronously and wait for their completion independently. This is necessary because subflows, when called directly, are not inherently asynchronous.
For more in-depth information on flow concurrency, you can refer to the Prefect 2.x documentation on flows.merlin
01/07/2025, 6:52 PMasync with asyncio.TaskGroup() as tg:
flow_runs = []
for file in file_list:
task = tg.create_task(
extract(
filename=file
)
)
flow_runs.append(task)
Marvin
01/07/2025, 6:52 PMasyncio.TaskGroup
to create multiple asynchronous flow runs in Prefect 2.x. This approach is compatible with Prefect's support for asynchronous task and flow definitions. Here's how you can structure your code:
python
import asyncio
from prefect import flow
@flow
async def extract(filename):
print(f"Processing {filename}")
@flow
async def main(file_list):
async with asyncio.TaskGroup() as tg:
flow_runs = []
for file in file_list:
task = tg.create_task(extract(filename=file))
flow_runs.append(task)
# Optionally, wait for all tasks to complete
await asyncio.gather(*flow_runs)
file_list = ["file1.sql", "file2.sql", "file3.sql"]
await main(file_list)
This code will run the extract
subflows concurrently for each file in the file_list
. The asyncio.TaskGroup
ensures that all tasks are awaited and completed before proceeding. For more details on using async patterns with Prefect, you can refer to the Prefect 2.x documentation on tasks.merlin
01/08/2025, 2:55 AMextract
with async def extract(...)
? This flow has many nested flows and tasks that I dont also want to add async
definitions for.Marvin
01/08/2025, 2:56 AMextract
flow as async def
if you prefer not to. You can still use asyncio
to run multiple instances of a synchronous flow concurrently by using loop.run_in_executor
or similar constructs, but this approach can be cumbersome and less efficient.
For simplicity, if extract
is a synchronous flow, you can directly call it within an asyncio
context without making it asynchronous. Here's how you can manage it:
python
import asyncio
from prefect import flow
@flow
def extract(filename):
print(f"Processing {filename}")
@flow
async def main(file_list):
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(None, extract, file)
for file in file_list
]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
file_list = ["file1.sql", "file2.sql", "file3.sql"]
await main(file_list)
In this setup, loop.run_in_executor
allows synchronous functions to be run concurrently in separate threads. This way, you don't have to modify the extract
flow or its nested components to be asynchronous.merlin
01/08/2025, 3:09 AMasync with asyncio.TaskGroup() as tg:
?Marvin
01/08/2025, 3:09 AMasyncio.TaskGroup
is designed to work with coroutines, not synchronous functions. This means you cannot directly use it with synchronous flows or tasks without converting them into coroutines.
However, you can wrap your synchronous flow calls in an asynchronous function using loop.run_in_executor
and then use asyncio.gather
to manage them. Here's how you can structure it:
python
import asyncio
from prefect import flow
@flow
def extract(filename):
print(f"Processing {filename}")
async def run_extract(filename):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, extract, filename)
@flow
async def main(file_list):
async with asyncio.TaskGroup() as tg:
for file in file_list:
tg.create_task(run_extract(file))
file_list = ["file1.sql", "file2.sql", "file3.sql"]
await main(file_list)
In this setup, run_extract
is an asynchronous wrapper around the extract
flow, allowing you to use asyncio.TaskGroup
effectively. This ensures that all flow runs are managed as asynchronous tasks.