https://prefect.io logo
a

Aric Huang

02/19/2022, 1:38 AM
Have a question about expected behavior for
map
- With the following sample flow, I was expecting the mapped task
f
to run concurrently with
wait
because there are no dependencies and
LocalDaskExecutor
is being used. However, the behavior I see is that only `wait`'s mapped tasks get executed, so
f
is not executed until all the
wait
tasks return.
Copy code
from prefect import Flow, task
import time
from prefect.executors import LocalDaskExecutor

@task
def f(x):
    return x*2

@task
def wait(x):
    time.sleep(x)

with Flow("test") as flow:
    a = list(range(4))
    wait.map(a)
    result = f.map(a)

flow.executor = LocalDaskExecutor()
Here are the logs, from here you can see the
wait[i]
tasks all getting executed first and
f[i]
not running until all the `wait`s are complete
Copy code
Retrieving local flow... Done
Running flow locally...
└── 01:37:45 | INFO    | Beginning Flow run for 'test'
└── 01:37:45 | INFO    | Task 'wait': Starting task run...
└── 01:37:45 | INFO    | Task 'wait': Finished task run for task with final state: 'Mapped'
└── 01:37:45 | INFO    | Task 'f': Starting task run...
└── 01:37:45 | INFO    | Task 'f': Finished task run for task with final state: 'Mapped'
└── 01:37:45 | INFO    | Task 'wait[0]': Starting task run...
└── 01:37:45 | INFO    | Task 'wait[1]': Starting task run...
└── 01:37:45 | INFO    | Task 'wait[3]': Starting task run...
└── 01:37:45 | INFO    | Task 'wait[0]': Finished task run for task with final state: 'Success'
└── 01:37:45 | INFO    | Task 'wait[2]': Starting task run...
└── 01:37:46 | INFO    | Task 'wait[1]': Finished task run for task with final state: 'Success'
└── 01:37:47 | INFO    | Task 'wait[2]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO    | Task 'wait[3]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO    | Task 'f[0]': Starting task run...
└── 01:37:48 | INFO    | Task 'f[2]': Starting task run...
└── 01:37:48 | INFO    | Task 'f[2]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO    | Task 'f[0]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO    | Task 'f[3]': Starting task run...
└── 01:37:48 | INFO    | Task 'f[1]': Starting task run...
└── 01:37:48 | INFO    | Task 'f[3]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO    | Task 'f[1]': Finished task run for task with final state: 'Success'
└── 01:37:48 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Is this the expected behavior for
map
? I tried adjusting
num_workers
for
LocalDaskExecutor
but behavior stayed the same
k

Kevin Kho

02/19/2022, 3:29 AM
Hey @Aric Huang, the question here is breadth first execution versus depth first execution. So Dask the engine prefers depth first but it can’t be forced. But from what I have seen,
DaskExecutor
prefers depth first while
LocalDaskExecutor
can have breadth first
a

Aric Huang

02/19/2022, 4:36 AM
Interesting, I read the Medium post about dfe vs bfe in Prefect ( https://link.medium.com/MSvqyfqwLnb) but not sure I understand how/if that applies here. Since the two maps in my example are on the same level and don't pass data to each other, shouldn't they both be able to run at the same time? Right now it seems to behave like one map on the same level must fully complete before moving on to the next map.
My thinking here was that as long as LocalDaskExecutor has more threads workers than the number of
wait
tasks, it would be able to start running some
f
tasks without having to wait. But varying
num_workers
had no effect. So I must be missing something
k

Kevin Kho

02/19/2022, 4:45 AM
Ah my bad I misread the code then. So current Prefect can only handle one mapped task at a time. There is an open issue for it. I think any fix towards this would happen in Orion
One distinct mapped task at time. You are right though it’s not efficient
a

Aric Huang

02/19/2022, 4:47 AM
Got it, that explains it - thanks! Appreciate the response, have a great weekend
6 Views