Mikhail Akimov
09/06/2019, 8:42 PM>>> from example_flow import flow
>>> from prefect.engine.executors import DaskExecutor
>>> flow.run() # this works
[2019-09-06 20:38:53,240] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:38:53,241] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:38:53,243] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
.........
>>> flow.run(executor=DaskExecutor(address="<tcp://127.0.0.1:8786>")) # this doesn't
[2019-09-06 20:39:52,658] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:39:52,659] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:39:52,758] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'example_flow'",)
Chris White
09/06/2019, 8:56 PMexample_flow
. The way Dask works in a truly distributed environment is by serializing your function (in this case the task) and sending it to a worker where it is then deserialized; if any dependencies the task needs are not also present on the worker, the task will not deserialize. I’m guessing your example_flow
file is not present on your dask workers in an importable wayMikhail Akimov
09/06/2019, 8:57 PMpython example_flow.py
and the same DaskExecutor, it works OK, without the file on any of the workers.Chris White
09/06/2019, 8:59 PM__main__
block, which will run everything in the same placeMikhail Akimov
09/06/2019, 9:00 PMif __name__ == "__main__":
flow.run(
executor=prefect.engine.executors.DaskExecutor(executor="<tcp://127.0.0.1:8786>")
)
Chris White
09/06/2019, 9:00 PMfrom example_flow import flow
this means that, from dask’s perspective, your tasks live at example_flow.task3
, etc.from example_flow import *
and see if that fixes itMikhail Akimov
09/06/2019, 9:03 PMworker_1 | distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95j"\x00\x00\x00\x00\x00\x00\x8c\
Chris White
09/06/2019, 9:04 PMflow.__module__
task3.__module__
etc., you’ll see example_flow
, so dask sees all of your objects as living inside a module named “example_flow” that it can’t find__main__
, so everything worksMikhail Akimov
09/06/2019, 9:08 PM>>> from large_example_flow import flow
>>> flow.__module__
'prefect.core.flow'
Chris White
09/06/2019, 9:09 PMMikhail Akimov
09/06/2019, 9:09 PM>>> list(flow.tasks)[0].__module__
'prefect.tasks.core.function'
Chris White
09/06/2019, 9:14 PMx
(the instance of Task1
) that will have the file name as its moduleMikhail Akimov
09/06/2019, 9:15 PMChris White
09/06/2019, 9:16 PMMikhail Akimov
09/06/2019, 9:20 PMChris White
09/06/2019, 9:28 PMDaskKubernetesEnvironment
, which spins up a Dask Cluster for each run of your workflow, and gives each worker the docker image containing your Flow — regardless of whether you care about Cloud, this pattern makes these dependency situations a little nicer at the cost of some latency + resource usage for creating a cluster with each flow runMikhail Akimov
09/09/2019, 8:50 AM