Hi <@ULVA73B9P>, how can I run concurrent subflows...
# ask-marvin
p
Hi @Marvin, how can I run concurrent subflows in prefect 2.x?
m
To run concurrent subflows in Prefect 2.x, you can take advantage of the asynchronous execution capabilities. Prefect provides tools to manage concurrency using synchronous and asynchronous contexts, allowing you to execute subflows in parallel. Here's a high-level approach to running concurrent subflows: 1. Define your Subflows: Create separate flow functions which you want to execute concurrently. 2. Submit Subflows: Use the
.submit()
method within your main flow to trigger these subflows asynchronously. This does not block the main flow from continuing its execution while the subflows are running. 3. Utilize Concurrency Context: Prefect has a concurrency context manager that you can use to manage your concurrent tasks more precisely. Use the async version if your tasks are I/O bound:
Copy code
python
   import asyncio
   from prefect import flow, task
   from prefect.concurrency.asyncio import concurrency

   @task
   async def subflow_task(x):
       # Your task logic here
       return x

   @flow
   async def main_flow():
       async with concurrency("subflow_concurrency"):
           await asyncio.gather(
               subflow_task.submit(1), 
               subflow_task.submit(2)
           )
4. Handle Results: Collect and process the results once the subflows are complete. By leveraging these tools, you can achieve parallel execution of your subflows in Prefect 2.x. For more details, you can check the Prefect Flow Concepts and Concurrency Guide. It's a bit like trying to teach cats to fetch; they don't exactly block your path, they're just off doing their own thing until they're done.
p
@Marvin, can I do the same but have the subflows use different task runners?
m
Yes, you can assign different task runners to your subflows in Prefect 2.x. This allows you to control how tasks within each subflow are executed, such as sequentially or concurrently. Here's how you can achieve this: 1. Import Task Runners: Prefect provides different task runners like
ConcurrentTaskRunner
and
SequentialTaskRunner
. 2. Assign Task Runners to Subflows: When defining your subflows, you can specify a task runner as an argument. This will ensure that tasks within each subflow use the specified runner. Use
.submit()
to execute tasks with the assigned task runner. 3. Example Setup:
Copy code
python
   from prefect import flow, task
   from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner

   @task
   def my_task(x):
       return x * 2

   @flow(task_runner=ConcurrentTaskRunner())
   def subflow_concurrent():
       for i in range(5):
           my_task.submit(i)

   @flow(task_runner=SequentialTaskRunner())
   def subflow_sequential():
       for i in range(5):
           my_task.submit(i)

   @flow
   def main_flow():
       subflow_concurrent()
       subflow_sequential()

   if __name__ == "__main__":
       main_flow()
In this example,
subflow_concurrent
will run its tasks using a concurrent task runner, meaning tasks will be executed simultaneously where possible. Meanwhile,
subflow_sequential
will execute its tasks one after another using a sequential runner. For further information, you can refer to the Prefect documentation on Task Runners which covers various configurations and examples of task runners. It's like giving each subflow its own special pair of sneakers to sprint or meander, however they like!