Pascal Jahan Elahi (Pawsey)
01/09/2025, 6:07 AMMarvin
01/09/2025, 6:07 AMpython
from prefect import flow, task
import asyncio
@task
async def subflow_1():
print("Running Subflow 1")
await asyncio.sleep(1) # Simulate some work
return "Subflow 1 Complete"
@task
async def subflow_2():
print("Running Subflow 2")
await asyncio.sleep(1) # Simulate some work
return "Subflow 2 Complete"
@flow
async def main_flow():
# Run subflows concurrently
result_1, result_2 = await asyncio.gather(subflow_1(), subflow_2())
print(result_1, result_2)
if __name__ == "__main__":
main_flow()
This example uses Python's asyncio.gather to run subflow_1 and subflow_2 concurrently within the main_flow. Each subflow is defined as an asynchronous task. This approach leverages Python's async capabilities to achieve parallel execution.
For more details on Prefect's flow execution, you might want to explore the Prefect documentation further.Pascal Jahan Elahi (Pawsey)
01/09/2025, 6:17 AM@flow rather than @task?Marvin
01/09/2025, 6:17 AM@flow decorator in Prefect 3.x, you can leverage asynchronous execution. Here's an example:
python
from prefect import flow
import asyncio
@flow
async def subflow_1():
print("Running Subflow 1")
await asyncio.sleep(1) # Simulate some work
return "Subflow 1 Complete"
@flow
async def subflow_2():
print("Running Subflow 2")
await asyncio.sleep(1) # Simulate some work
return "Subflow 2 Complete"
@flow
async def main_flow():
# Run subflows concurrently
result_1, result_2 = await asyncio.gather(subflow_1(), subflow_2())
print(result_1, result_2)
if __name__ == "__main__":
main_flow()
In this setup, both subflow_1 and subflow_2 are defined as flows using the @flow decorator. The main_flow orchestrates their concurrent execution using asyncio.gather, which allows them to run in parallel. This approach utilizes Python's asynchronous capabilities to manage concurrency.
For more detailed information on writing flows and using concurrency with Prefect, you can refer to the Prefect documentation.