Pascal Jahan Elahi (Pawsey)
01/09/2025, 6:07 AMMarvin
01/09/2025, 6:07 AMpython
from prefect import flow, task
import asyncio
@task
async def subflow_1():
print("Running Subflow 1")
await asyncio.sleep(1) # Simulate some work
return "Subflow 1 Complete"
@task
async def subflow_2():
print("Running Subflow 2")
await asyncio.sleep(1) # Simulate some work
return "Subflow 2 Complete"
@flow
async def main_flow():
# Run subflows concurrently
result_1, result_2 = await asyncio.gather(subflow_1(), subflow_2())
print(result_1, result_2)
if __name__ == "__main__":
main_flow()
This example uses Python's asyncio.gather
to run subflow_1
and subflow_2
concurrently within the main_flow
. Each subflow is defined as an asynchronous task. This approach leverages Python's async capabilities to achieve parallel execution.
For more details on Prefect's flow execution, you might want to explore the Prefect documentation further.Pascal Jahan Elahi (Pawsey)
01/09/2025, 6:17 AM@flow
rather than @task
?Marvin
01/09/2025, 6:17 AM@flow
decorator in Prefect 3.x, you can leverage asynchronous execution. Here's an example:
python
from prefect import flow
import asyncio
@flow
async def subflow_1():
print("Running Subflow 1")
await asyncio.sleep(1) # Simulate some work
return "Subflow 1 Complete"
@flow
async def subflow_2():
print("Running Subflow 2")
await asyncio.sleep(1) # Simulate some work
return "Subflow 2 Complete"
@flow
async def main_flow():
# Run subflows concurrently
result_1, result_2 = await asyncio.gather(subflow_1(), subflow_2())
print(result_1, result_2)
if __name__ == "__main__":
main_flow()
In this setup, both subflow_1
and subflow_2
are defined as flows using the @flow
decorator. The main_flow
orchestrates their concurrent execution using asyncio.gather
, which allows them to run in parallel. This approach utilizes Python's asynchronous capabilities to manage concurrency.
For more detailed information on writing flows and using concurrency with Prefect, you can refer to the Prefect documentation.