Lukas N.
05/06/2021, 7:16 PMLukas N.
05/06/2021, 7:16 PMfrom datetime import datetime
import time
from prefect import Flow, task
from prefect.executors import DaskExecutor
def n():
return datetime.now().strftime("%H:%M:%S")
@task
def sleep_1(x):
print(f'{n()} 1: sleep {x}')
time.sleep(x)
print(f'{n()} 1: done {x}')
return x
@task
def sleep_2(x):
print(f'{n()} 2: sleep {x}')
time.sleep(x)
print(f'{n()} 2: done {x}')
return x
with Flow(
name='dask_bfe', executor=DaskExecutor()
) as flow:
a = sleep_1.map(range(5, 15))
b = sleep_2.map(a)
if __name__ == '__main__':
flow.run()
When I run it I get this output (the order is random, so you may get different result, but execution of layers stays the same). By layers I mean layer 1 = sleep_1, layer 2 = sleep_2:
20:38:46 1: sleep 5
20:38:46 1: sleep 6
20:38:46 1: sleep 13
20:38:46 1: sleep 14
20:38:46 1: sleep 12
20:38:46 1: sleep 11
20:38:46 1: sleep 8
20:38:46 1: sleep 7 # <- based on this I deduce I'm using 8 workers (because 8 tasks started at the same time)
20:38:51 1: done 5
20:38:51 1: sleep 9
20:38:52 1: done 6
20:38:52 1: sleep 10 # <- last task on the first "layer" is starting
20:38:53 1: done 7
20:38:53 2: sleep 7 # <- we have free workers, we may start the second "layer"
20:38:54 1: done 8
20:38:54 2: sleep 8
20:38:57 1: done 11
20:38:57 2: sleep 11
20:38:58 1: done 12
20:38:58 2: sleep 12
20:38:59 1: done 13
20:38:59 2: sleep 13
20:39:00 1: done 9
20:39:00 1: done 14
20:39:00 2: sleep 9
20:39:00 2: sleep 14
20:39:00 2: done 7
20:39:00 2: sleep 6
20:39:02 1: done 10
20:39:02 2: sleep 10
20:39:02 2: done 8
20:39:02 2: sleep 5
20:39:06 2: done 6
20:39:07 2: done 5
20:39:08 2: done 11
20:39:09 2: done 9
20:39:10 2: done 12
20:39:12 2: done 10
20:39:12 2: done 13
20:39:14 2: done 14
It is true, that we don't have to wait until the first "layer" finishes, however in depth first execution I would expect this:
20:38:46 1: sleep 5
20:38:46 1: sleep 6
20:38:46 1: sleep 13
20:38:46 1: sleep 14
20:38:46 1: sleep 12
20:38:46 1: sleep 11
20:38:46 1: sleep 8
20:38:46 1: sleep 7 # <- based on this I deduce I'm using 8 workers (because 8 tasks started at the same time)
20:38:51 1: done 5
20:38:51 2: sleep 5 # instead of (20:38:51 1: sleep 9), because layer 1 input 5 is complete and we go deeper
This is important for me because I have a chain of map operations where the last one uploads data to S3 freeing worker memory, but if the DFE work like my example suggests I still need to hold everything in memory / local storage?Kevin Kho
Lukas N.
05/06/2021, 7:28 PMKevin Kho
Lukas N.
05/06/2021, 7:35 PM