Hey all, I’m running into an issue with mapped tas...
# ask-community
s
Hey all, I’m running into an issue with mapped tasks not being executed in parallel. In the example below, if we set
func4
to have an upstream dependency on
func1
, it executes at the same time as
func2
, as expected. However, if we make
func4
a mapped task with the same upstream dependency on
func1
, it doesn’t actually execute until after
func3
has finished. Is there a way to get mapped tasks to run in parallel?
a
@Sean Leakehe I think that we cannot confuse dependencies with the actual parallelism. The fact that func4 only depends on func1 doesn’t necessarily mean that func4 will start immediately after func1 - it only means that func4 won’t start until func1 finished successfully. func4 has no knowledge about anything in your flow’s local process other than that it can only start once func1 is successful, and that’s the case. If you wish that func4 starts directly after func1 is finished, you could call func4 directly after func1, but even then the only guarantee that you get is that the dependencies will be respected rather than the exact order in which the tasks that may run at the same time from the dependency perspective will run at the same time. Regardless of how you call those tasks, the mapped tasks always run in parallel with a LocalDaskExecutor. Does it all make sense?
s
@Anna Geller Thanks for taking a look! I guess I’m still unclear why the flow doesn’t make use of all available workers to run tasks concurrently when using mapped tasks? After
func1
has finished, we should have 4 available workers so shouldn’t we be able to run
func2
and
func4 parallel
at the same time?
a
The thing is that the parallel execution only starts for the mapped tasks. The func1, func2 and func3 run sequentially. Only func4 runs on a LocalDaskExecutor because it’s a mapped task. You can think of it this way: the same way you could submit some function to concurrent futures executor, you can submit it using mapping to LocalDaskExecutor. But “normal” tasks are not configured to run in parallel, therefore they run sequentially.
s
Ahh okay that makes sense. Thanks for the clarification!
@Anna Geller Sorry, 1 more question. If I use a LocalDaskExecutor on a flow that only contains “normal” tasks such as
Copy code
with Flow(name="dask_flow") as flow:
        func1_task = func1()
        func2_task = func2(upstream_tasks=[func1_task])
        func3_task = func3(upstream_tasks=[func2_task])
        func4_task = func4(upstream_tasks=[func1_task])
        func5_task = func5(upstream_tasks=[func1_task])
        func6_task = func6(upstream_tasks=[func1_task])
The log appears to show that we have 4 normal tasks running concurrently:
Copy code
(base) EHFVFY90ZWHV2H:flows seanlekehe$ python test.py 
[2021-12-13 16:45:57-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'dask_flow'
[2021-12-13 16:45:58-0700] INFO - prefect.TaskRunner | Task 'func1': Starting task run...
Running func1
[2021-12-13 16:46:03-0700] INFO - prefect.TaskRunner | Task 'func1': Finished task run for task with final state: 'Success'
[2021-12-13 16:46:03-0700] INFO - prefect.TaskRunner | Task 'func2': Starting task run...
Running func2
[2021-12-13 16:46:03-0700] INFO - prefect.TaskRunner | Task 'func4': Starting task run...
Running func4
[2021-12-13 16:46:03-0700] INFO - prefect.TaskRunner | Task 'func6': Starting task run...
Running func6
[2021-12-13 16:46:03-0700] INFO - prefect.TaskRunner | Task 'func5': Starting task run...
Running func5
[2021-12-13 16:46:04-0700] INFO - prefect.TaskRunner | Task 'func5': Finished task run for task with final state: 'Success'
[2021-12-13 16:46:04-0700] INFO - prefect.TaskRunner | Task 'func4': Finished task run for task with final state: 'Success'
[2021-12-13 16:46:04-0700] INFO - prefect.TaskRunner | Task 'func6': Finished task run for task with final state: 'Success'
[2021-12-13 16:46:08-0700] INFO - prefect.TaskRunner | Task 'func2': Finished task run for task with final state: 'Success'
[2021-12-13 16:46:08-0700] INFO - prefect.TaskRunner | Task 'func3': Starting task run...
Running func3
[2021-12-13 16:46:13-0700] INFO - prefect.TaskRunner | Task 'func3': Finished task run for task with final state
Is the behavior just different when a Flow has a mix of “normal” and mapped tasks?
c
^ we’re on version 0.15.8 if that’s helpful
k
So this is a limitation of the current executor that only one mapped task can run in parallel at a time. See this thread. I think if tasks are not mapped and have no dependencies, they can run in parallel
👍 1
In short if you have 10 cores and task A maps for 6 items and task B maps over 4 items, Prefect fails to use all of the cores and will run task A with the 6 items and then B with the 4 items after
🙏 2