Thomas Wiecki
04/28/2020, 3:18 PMZachary Hughes
04/28/2020, 3:29 PMThomas Wiecki
04/28/2020, 3:31 PMZachary Hughes
04/28/2020, 3:38 PMThomas Wiecki
04/28/2020, 3:40 PMZachary Hughes
04/28/2020, 3:41 PMThomas Wiecki
04/28/2020, 3:43 PMZachary Hughes
04/28/2020, 3:43 PMThomas Wiecki
04/28/2020, 3:46 PMJim Crist-Harif
04/28/2020, 3:51 PM.map
is implemented currently. The current implementation relies on tasks being able to submit additional tasks. The mapping execution mechanism is currently being reworked to support DFE (https://github.com/PrefectHQ/prefect/issues/2041), and one thing that will fall-out of this is that tasks will no longer submit additional tasks.
The reason this fails for multiprocessing, is that dask's local scheduler (which can either use threads or processes) expects to get all work submitted up-front, then compute the graph in one pass.
We recommend using the distributed
scheduler (which works fine locally), which natively supports tasks submitting tasks. This will be less necessary after the mapping refactor, but the distributed scheduler is generally nicer and more performant than the multiprocessing scheduler (as a Dask dev, we generally recommend users avoid the multiprocessing scheduler, using either threads or distributed instead).Thomas Wiecki
04/28/2020, 4:13 PMJim Crist-Harif
04/28/2020, 4:13 PMThomas Wiecki
04/28/2020, 4:16 PMJim Crist-Harif
04/28/2020, 4:23 PMfrom prefect.engine.executors import DaskExecutor
with Flow(...) as flow:
# create your flow
executor = DaskExecutor(n_workers=4, threads_per_worker=1)
flow.run(executor=executor)
flow.run
call, and will be cleaned up afterwards.Thomas Wiecki
04/28/2020, 4:26 PMJim Crist-Harif
04/28/2020, 4:27 PMprocesses
)Thomas Wiecki
04/28/2020, 4:29 PM# Map over two coins
with Flow('CoinFlow') as flow:
flips = flip_coin.map([.5, .25])
infer_p.map(flips)
flow.run(executor=DaskExecutor(n_workers=4, threads_per_worker=1))
Jim Crist-Harif
04/28/2020, 4:30 PMDaskExecutor
uses a distributed scheduler internally, the LocalDaskExecutor
uses one of the local schedulers internally. .map
doesn't work with the LocalDaskExecutor
when using processes, but does with threads.Thomas Wiecki
04/28/2020, 4:30 PMJim Crist-Harif
04/28/2020, 4:30 PMThomas Wiecki
04/28/2020, 4:30 PMJim Crist-Harif
04/28/2020, 4:31 PMThomas Wiecki
04/28/2020, 4:33 PMJim Crist-Harif
04/28/2020, 4:33 PMfrom prefect import Flow, task
from prefect.engine.executors import DaskExecutor
@task
def flip_coin(x):
return x
@task
def infer_p(x):
return x + 1
with Flow('CoinFlow') as flow:
flips = flip_coin.map([.5, .25])
infer_p.map(flips)
flow.run(executor=DaskExecutor(n_workers=4, threads_per_worker=1))
Thomas Wiecki
04/28/2020, 4:33 PMJim Crist-Harif
04/28/2020, 4:33 PMThomas Wiecki
04/28/2020, 4:34 PMJim Crist-Harif
04/28/2020, 4:38 PMflow.map
doesn't work with processes
, this is during execution and is an error raised by theano.Thomas Wiecki
04/28/2020, 4:47 PMJim Crist-Harif
04/28/2020, 4:48 PMThomas Wiecki
04/28/2020, 5:03 PMJim Crist-Harif
04/28/2020, 5:04 PMThomas Wiecki
04/28/2020, 5:04 PM