Hello :prefect:, I have a hard time understanding ...
# ask-community
l
Hello P, I have a hard time understanding the depth-first-execution of mapped tasks with Dask executor. From my experiments it seems like the execution is bfe, but maybe I'm missing something trivial like task trigger or something? More info with code example in 🧵
I'm confused about depth-first-execution order of mapped tasks with Dask executor. My code is based on this Map faster! post or this thread. Note that these examples work with very small inputs, if you have more workers than input size than the execution indeed seems DFE. If you have much bigger input, all tasks in the first "layer" will be started first (not completed!), before execution proceed to the next layer. Here is the code example:
Copy code
from datetime import datetime

import time

from prefect import Flow, task
from prefect.executors import DaskExecutor

def n():
    return datetime.now().strftime("%H:%M:%S")

@task
def sleep_1(x):
    print(f'{n()} 1: sleep {x}')
    time.sleep(x)
    print(f'{n()} 1: done {x}')
    return x


@task
def sleep_2(x):
    print(f'{n()} 2: sleep {x}')
    time.sleep(x)
    print(f'{n()} 2: done {x}')
    return x


with Flow(
    name='dask_bfe', executor=DaskExecutor()
) as flow:
    a = sleep_1.map(range(5, 15))
    b = sleep_2.map(a)

if __name__ == '__main__':
    flow.run()
When I run it I get this output (the order is random, so you may get different result, but execution of layers stays the same). By layers I mean layer 1 = sleep_1, layer 2 = sleep_2:
Copy code
20:38:46 1: sleep 5
20:38:46 1: sleep 6
20:38:46 1: sleep 13
20:38:46 1: sleep 14
20:38:46 1: sleep 12
20:38:46 1: sleep 11
20:38:46 1: sleep 8
20:38:46 1: sleep 7  # <- based on this I deduce I'm using 8 workers (because 8 tasks started at the same time)
20:38:51 1: done 5
20:38:51 1: sleep 9
20:38:52 1: done 6
20:38:52 1: sleep 10 # <- last task on the first "layer" is starting 
20:38:53 1: done 7
20:38:53 2: sleep 7 # <- we have free workers, we may start the second "layer"
20:38:54 1: done 8
20:38:54 2: sleep 8
20:38:57 1: done 11
20:38:57 2: sleep 11
20:38:58 1: done 12
20:38:58 2: sleep 12
20:38:59 1: done 13
20:38:59 2: sleep 13
20:39:00 1: done 9
20:39:00 1: done 14
20:39:00 2: sleep 9
20:39:00 2: sleep 14
20:39:00 2: done 7
20:39:00 2: sleep 6
20:39:02 1: done 10
20:39:02 2: sleep 10
20:39:02 2: done 8
20:39:02 2: sleep 5
20:39:06 2: done 6
20:39:07 2: done 5
20:39:08 2: done 11
20:39:09 2: done 9
20:39:10 2: done 12
20:39:12 2: done 10
20:39:12 2: done 13
20:39:14 2: done 14
It is true, that we don't have to wait until the first "layer" finishes, however in depth first execution I would expect this:
Copy code
20:38:46 1: sleep 5
20:38:46 1: sleep 6
20:38:46 1: sleep 13
20:38:46 1: sleep 14
20:38:46 1: sleep 12
20:38:46 1: sleep 11
20:38:46 1: sleep 8
20:38:46 1: sleep 7  # <- based on this I deduce I'm using 8 workers (because 8 tasks started at the same time)
20:38:51 1: done 5
20:38:51 2: sleep 5  # instead of (20:38:51 1: sleep 9), because layer 1 input 5 is complete and we go deeper
This is important for me because I have a chain of map operations where the last one uploads data to S3 freeing worker memory, but if the DFE work like my example suggests I still need to hold everything in memory / local storage?
k
Hey @Lukas N.! This has to do with Dask itself. Depth first execution is preferred but not required and it can end up with breadth first execution sometimes. It can’t be forced to depth-first.
l
Thank you for the answer, however, from testing on several different flows it ends up with BFE in all cases 🤔 . I've picked this example only for it's simplicity.
k
Yeah I know I’ve tried this myself quite a bunch and it waits for the breadth to finish! Unfortunately, this is not something we can help with.
l
Ok, thanks. At least I know what to expect and build around that 🙂