Hi! I'm trying to run my flow from another file, importing it to inject custom state handlers/do other stuff:
>>> from example_flow import flow
>>> from prefect.engine.executors import DaskExecutor
>>> flow.run()  # this works
[2019-09-06 20:38:53,240] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:38:53,241] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:38:53,243] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
>>> flow.run(executor=DaskExecutor(address="<tcp://>"))  # this doesn't
[2019-09-06 20:39:52,658] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:39:52,659] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:39:52,758] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'example_flow'",)
Hi @Mikhail Akimov! This is most likely caused by the dask worker not having access to
. The way Dask works in a truly distributed environment is by serializing your function (in this case the task) and sending it to a worker where it is then deserialized; if any dependencies the task needs are not also present on the worker, the task will not deserialize. I’m guessing your
file is not present on your dask workers in an importable way
No it's not. But when I run the same file with
python example_flow.py
and the same DaskExecutor, it works OK, without the file on any of the workers.
And the tasks are present in the flow object that I imported. For what I know, it's identical to the flow I get executing example_flow directly.
in the file you provided you use a local dask executor though inside the
block, which will run everything in the same place
AH i see the difference
Changed to
if __name__ == "__main__":
Runs OK.
👍 1
in the interactive session, you did:
from example_flow import flow
this means that, from dask’s perspective, your tasks live at
, etc.
maybe try:
from example_flow import *
and see if that fixes it
No, same error.
Hm. I also see this in worker log:
worker_1     | distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95j"\x00\x00\x00\x00\x00\x00\x8c\
and then a huge blob of bytes
so, this is getting into the weeds, but if you look at:
etc., you’ll see
, so dask sees all of your objects as living inside a module named “example_flow” that it can’t find
when you run it from the CLI, the associated module for all these objects is actually
, so everything works
>>> from large_example_flow import flow
>>> flow.__module__
what about the individual tasks?
>>> list(flow.tasks)[0].__module__
🧐 1
ah! I think it’s only
(the instance of
) that will have the file name as its module
Yepp, you're right
So it's pickle trying to import module when unpickling the function
yup, exactly
so if that file is present on the workers in an importable way, this will start working
Well at my scale making my flows/tasks importable on workers will override the usefulness of making a "universal" launcher for flows. Too tedious to keep up-to-date
But thanks anyway, learned something new about how pickle works 😃
💯 1
Just as an FYI for you, in Prefect Cloud we have a concept of “execution environment”, and one such environment that we used to use a lot is our
, which spins up a Dask Cluster for each run of your workflow, and gives each worker the docker image containing your Flow — regardless of whether you care about Cloud, this pattern makes these dependency situations a little nicer at the cost of some latency + resource usage for creating a cluster with each flow run
I studied the code for some of the environments (and storage) and honestly I can't figure out what the abstraction does or why it's needed. Does it have meaning outside of Cloud context?
Oh, now I found the examples in the unreleased API docs, it became clearer 🙂