<@ULVA73B9P>, can you provide an example of two co...
# ask-marvin
p
@Marvin, can you provide an example of two concurrently running subflows?
m
It seems that the documentation and code excerpts don't provide a straightforward example of running subflows concurrently. However, based on the typical usage of Prefect 3.x, you can achieve concurrent subflows using asynchronous execution within your main flow. Here's a conceptual example:
Copy code
python
from prefect import flow, task
import asyncio

@task
async def subflow_1():
    print("Running Subflow 1")
    await asyncio.sleep(1)  # Simulate some work
    return "Subflow 1 Complete"

@task
async def subflow_2():
    print("Running Subflow 2")
    await asyncio.sleep(1)  # Simulate some work
    return "Subflow 2 Complete"

@flow
async def main_flow():
    # Run subflows concurrently
    result_1, result_2 = await asyncio.gather(subflow_1(), subflow_2())
    print(result_1, result_2)

if __name__ == "__main__":
    main_flow()
This example uses Python's
asyncio.gather
to run
subflow_1
and
subflow_2
concurrently within the
main_flow
. Each subflow is defined as an asynchronous task. This approach leverages Python's async capabilities to achieve parallel execution. For more details on Prefect's flow execution, you might want to explore the Prefect documentation further.
p
@Marvin, how can you do this using
@flow
rather than
@task
?
m
To run subflows concurrently using the
@flow
decorator in Prefect 3.x, you can leverage asynchronous execution. Here's an example:
Copy code
python
from prefect import flow
import asyncio

@flow
async def subflow_1():
    print("Running Subflow 1")
    await asyncio.sleep(1)  # Simulate some work
    return "Subflow 1 Complete"

@flow
async def subflow_2():
    print("Running Subflow 2")
    await asyncio.sleep(1)  # Simulate some work
    return "Subflow 2 Complete"

@flow
async def main_flow():
    # Run subflows concurrently
    result_1, result_2 = await asyncio.gather(subflow_1(), subflow_2())
    print(result_1, result_2)

if __name__ == "__main__":
    main_flow()
In this setup, both
subflow_1
and
subflow_2
are defined as flows using the
@flow
decorator. The
main_flow
orchestrates their concurrent execution using
asyncio.gather
, which allows them to run in parallel. This approach utilizes Python's asynchronous capabilities to manage concurrency. For more detailed information on writing flows and using concurrency with Prefect, you can refer to the Prefect documentation.