Pascal Jahan Elahi (Pawsey)
12/13/2024, 7:42 AMMarvin
12/13/2024, 7:43 AM.submit()
method within your main flow to trigger these subflows asynchronously. This does not block the main flow from continuing its execution while the subflows are running.
3. Utilize Concurrency Context: Prefect has a concurrency context manager that you can use to manage your concurrent tasks more precisely. Use the async version if your tasks are I/O bound:
python
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def subflow_task(x):
# Your task logic here
return x
@flow
async def main_flow():
async with concurrency("subflow_concurrency"):
await asyncio.gather(
subflow_task.submit(1),
subflow_task.submit(2)
)
4. Handle Results: Collect and process the results once the subflows are complete.
By leveraging these tools, you can achieve parallel execution of your subflows in Prefect 2.x. For more details, you can check the Prefect Flow Concepts and Concurrency Guide.
It's a bit like trying to teach cats to fetch; they don't exactly block your path, they're just off doing their own thing until they're done.Pascal Jahan Elahi (Pawsey)
12/13/2024, 7:55 AMMarvin
12/13/2024, 7:55 AMConcurrentTaskRunner
and SequentialTaskRunner
.
2. Assign Task Runners to Subflows: When defining your subflows, you can specify a task runner as an argument. This will ensure that tasks within each subflow use the specified runner. Use .submit()
to execute tasks with the assigned task runner.
3. Example Setup:
python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner, SequentialTaskRunner
@task
def my_task(x):
return x * 2
@flow(task_runner=ConcurrentTaskRunner())
def subflow_concurrent():
for i in range(5):
my_task.submit(i)
@flow(task_runner=SequentialTaskRunner())
def subflow_sequential():
for i in range(5):
my_task.submit(i)
@flow
def main_flow():
subflow_concurrent()
subflow_sequential()
if __name__ == "__main__":
main_flow()
In this example, subflow_concurrent
will run its tasks using a concurrent task runner, meaning tasks will be executed simultaneously where possible. Meanwhile, subflow_sequential
will execute its tasks one after another using a sequential runner.
For further information, you can refer to the Prefect documentation on Task Runners which covers various configurations and examples of task runners.
It's like giving each subflow its own special pair of sneakers to sprint or meander, however they like!