Ross
03/07/2025, 9:13 PMMarvin
03/07/2025, 9:13 PMConcurrentTask
and SequentialTask
subflows in Prefect 3.x. Here's a concise explanation:
1. Dask Flow: Prefect supports Dask as a task runner, allowing you to parallelize task execution across a cluster. You can set up a Dask flow by using the flow
decorator from Prefect.
2. ConcurrentTask and SequentialTask: These concepts aren't explicitly defined in the Prefect documentation, but you can achieve concurrent execution using Dask's parallel capabilities, and sequential execution by managing task dependencies.
Here's a basic outline of how you might structure such a flow:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def concurrent_task():
# Task logic here
pass
@task
def sequential_task():
# Task logic here
pass
@flow(task_runner=DaskTaskRunner)
def my_dask_flow():
# Concurrent execution
concurrent_task.map() # Assuming this is a placeholder for actual concurrent logic
# Sequential execution
sequential_task()
my_dask_flow()
### Useful Links
- Prefect Dask Integration
- Prefect Task and Flow API
This structure allows you to use Dask for parallel execution while manually controlling task order for sequential execution. If you have specific patterns for ConcurrentTask
and SequentialTask
, you might need to adapt the flow logic accordingly.Ross
03/07/2025, 9:19 PMNate
03/07/2025, 9:30 PMRoss
03/07/2025, 9:58 PMRoss
03/07/2025, 9:59 PMRoss
03/07/2025, 9:59 PMMarvin
03/07/2025, 9:59 PM.submit()
method. Instead, you can use the run
method to execute a flow. To achieve your design of running two concurrent flows within a Dask cluster, you can structure your code like this:
1. Use the DaskTaskRunner
to run the parent flow, which will manage the execution of child flows concurrently.
2. Use ConcurrentTaskRunner
within each child flow to manage the parallel execution of tasks within that flow.
Here's how you can implement this:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from prefect.task_runners import ConcurrentTaskRunner
@task
def task_a():
# Task logic for task_a
return "Result A"
@task
def task_b():
# Task logic for task_b
return "Result B"
@flow(task_runner=ConcurrentTaskRunner())
def child_flow(param):
res_a = task_a.submit()
res_b = task_b.submit()
return res_a, res_b
@flow(task_runner=DaskTaskRunner())
def parent_flow():
# Run child flows concurrently
child_flow(param="a")
child_flow(param="b")
# Execute the parent flow
parent_flow()
### Key Points:
- DaskTaskRunner
: Manages the execution of the parent_flow
on a Dask cluster.
- ConcurrentTaskRunner
: Used within child_flow
to run tasks task_a
and task_b
concurrently.
- Flow Execution: Instead of .submit()
, use the flow name directly to execute it, as shown in parent_flow
.
### Useful Links
- Prefect Task Runners
- Prefect Dask Integration
This setup will allow each child_flow
to run its tasks concurrently, and both child flows will run simultaneously in the Dask cluster.Ross
03/07/2025, 10:01 PMMarvin
03/07/2025, 10:01 PMparent_flow
is managed by the task runner specified for the flow, in this case, DaskTaskRunner
. When you call child_flow(param="a")
and child_flow(param="b")
within parent_flow
, these calls are scheduled by the DaskTaskRunner
to run concurrently.
### How It Works:
- Task Runner: The DaskTaskRunner
is responsible for managing the execution of tasks and flows. It can execute multiple tasks or subflows concurrently if there are no explicit dependencies between them.
- Flow Calls: When you call a subflow within a parent flow, the task runner treats these calls as tasks. Since DaskTaskRunner
is designed to handle parallel execution, it will execute these subflows concurrently.
Here's the revised example with comments:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from prefect.task_runners import ConcurrentTaskRunner
@task
def task_a():
return "Result A"
@task
def task_b():
return "Result B"
@flow(task_runner=ConcurrentTaskRunner())
def child_flow(param):
res_a = task_a.submit()
res_b = task_b.submit()
return res_a, res_b
@flow(task_runner=DaskTaskRunner())
def parent_flow():
# These child flow calls are managed by DaskTaskRunner
# and will run concurrently if there are no dependencies.
child_flow(param="a")
child_flow(param="b")
# Execute the parent flow
parent_flow()
### Key Points:
- Concurrency Management: The DaskTaskRunner
handles the scheduling and execution of tasks and subflows concurrently.
- No Explicit .submit()
: You don't need to use .submit()
for flows; calling the flow function is sufficient when using task runners that support concurrency.
This design leverages Prefect's ability to manage task concurrency through its task runners, allowing you to focus on the flow logic without dealing with the intricacies of task scheduling.Nate
03/07/2025, 10:01 PMNate
03/07/2025, 10:02 PMRoss
03/07/2025, 10:03 PMRoss
03/07/2025, 10:05 PMNate
03/07/2025, 10:05 PMNate
03/07/2025, 10:05 PM.submit
on tasks (which you're free to nest)Ross
03/07/2025, 10:34 PMRoss
03/07/2025, 10:34 PMNate
03/07/2025, 10:35 PMthing_1()
thing_2()
Ross
03/07/2025, 10:39 PMRoss
03/07/2025, 10:40 PMNate
03/07/2025, 10:49 PMimport time
from prefect_dask import DaskTaskRunner
from prefect import flow, task
@task
def process_item(item: str) -> str:
time.sleep(2)
return f"Completed {item}"
@flow(task_runner=DaskTaskRunner())
def run_many_things_with_dask(things: list[str]) -> list[str]:
return process_item.map(things).result()
@flow
def parent_flow():
return (
task(run_many_things_with_dask)
.map(things=[["thing-a", "thing-b"], ["thing-c", "thing-d"]])
.result()
)
if __name__ == "__main__":
start = time.time()
results = parent_flow()
end = time.time()
print(f"\nResults: {results}")
print(f"Total time: {end - start:.2f} seconds")
Nate
03/07/2025, 10:50 PMRoss
03/07/2025, 10:54 PMNate
03/07/2025, 10:54 PMNate
03/07/2025, 10:55 PMNate
03/07/2025, 10:55 PMRoss
03/07/2025, 10:56 PMRoss
03/07/2025, 10:56 PMNate
03/07/2025, 10:57 PM@task
@flow
def foo()
but I don't like thatNate
03/07/2025, 10:58 PM» python -c "from prefect import flow; flow(lambda: 42)()"
16:58:28.189 | INFO | Flow run 'honest-heron' - Beginning flow run 'honest-heron' for flow 'unknown-lambda'
16:58:28.191 | INFO | Flow run 'honest-heron' - View at <http://localhost:4200/runs/flow-run/e7a6765c-218f-4c78-9318-4d19dae24c6b>
16:58:28.203 | INFO | Flow run 'honest-heron' - Finished in state Completed()
Ross
03/07/2025, 11:00 PMRoss
03/07/2025, 11:40 PMNate
03/07/2025, 11:41 PMRoss
03/07/2025, 11:43 PMFile "/home/smds/share_drive/ross.leung/sandbox/prefect_sandbox/dask_test_advanced.py", line 22, in parent_flow
task(run_many_things_with_dask)
AttributeError: 'list' object has no attribute 'result'
Nate
03/07/2025, 11:43 PMRoss
03/07/2025, 11:43 PMRoss
03/07/2025, 11:44 PMRoss
03/07/2025, 11:44 PMNate
03/07/2025, 11:44 PM