Hello. I’m having some issues with mapped task par...
# ask-community
r
Hello. I’m having some issues with mapped task parallel execution. I’m testing locally using the LocalDaskExecutor(). Can you confirm that separate mapped tasks without a dependency will execute serially? In the example below the mapped tasks within ‘a’ and b’ execute in parallel, but ‘a’ and ‘b’ execute serially.
Copy code
with Flow("testing") as flow:
    a = poll.map(poll_interval=[5,10])
    b = poll.map(poll_interval=[4,9])

flow.run(executor=LocalDaskExecutor())
Copy code
[2021-05-01 18:20:08-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'testing'
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Starting task run...
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Finished task run for task with final state: 'Mapped'
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Starting task run...
[2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Finished task run for task with final state: 'Mapped'
[2021-05-01 18:20:09-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Starting task run...
[2021-05-01 18:20:09-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Starting task run...
[2021-05-01 18:20:14-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Starting task run...
[2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Starting task run...
[2021-05-01 18:20:23-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:28-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Finished task run for task with final state: 'Success'
[2021-05-01 18:20:28-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Thx!
r
there is no dependancy between a and b so Dask can execute them at the same time. You should use set_dependencies(b, upstream_tasks=[a]) if you want 'b' to wait for a
r
In my test, b always waits for a to complete without having defined the upstream_tasks dependency. I have tested changing the num_workers parameter but it seems to only control the parallelism of a single mapped task, not both mapped tasks. Visualizing the flow shows them in “parallel”:
Here is the same flow running in the cloud:
k
Hi @Robert Bastian! What is the behavior you're expecting? You think they should work on the same time with the Dask Executor? Do you have enough workers to run both simultaneously?
r
Yes, I’m expecting both A and B to run simultaneously. I do have enough workers. I can scale either task map up to 8 “sub tasks”, but b will always run after a:
Copy code
with Flow("why-no-parallel") as flow:
    a = a.map(poll_interval=[30,30,30,30,30,30,30,30])
    b = b.map(poll_interval=[30,30,30])

flow.run(executor=LocalDaskExecutor(scheduler="threads", num_workers=16))
I think this explains it:
Copy code
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[0]': Starting task run...
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[2]': Starting task run...
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[3]': Starting task run...
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[7]': Starting task run...
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[1]': Starting task run...
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[5]': Starting task run...
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[6]': Starting task run...
[2021-05-02 18:35:24+0000] INFO - prefect.TaskRunner | Task 'a[4]': Starting task run...
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[0]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[2]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[3]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[7]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[6]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[5]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[1]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'a[4]': Finished task run for task with final state: 'Success'
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'b[0]': Starting task run...
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'b[2]': Starting task run...
[2021-05-02 18:35:54+0000] INFO - prefect.TaskRunner | Task 'b[1]': Starting task run...
k
I have replicated this and raised it to the team. Unsure for now if it’s a bug or standard Dask behavior but I’ll let you know when I get an answer.
r
Thank you!