https://prefect.io logo
Title
m

Mikhail Akimov

09/06/2019, 8:42 PM
Hi! I'm trying to run my flow from another file, importing it to inject custom state handlers/do other stuff:
>>> from example_flow import flow
>>> from prefect.engine.executors import DaskExecutor
>>> flow.run()  # this works
[2019-09-06 20:38:53,240] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:38:53,241] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:38:53,243] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
.........
>>> flow.run(executor=DaskExecutor(address="<tcp://127.0.0.1:8786>"))  # this doesn't
[2019-09-06 20:39:52,658] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:39:52,659] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:39:52,758] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'example_flow'",)
c

Chris White

09/06/2019, 8:56 PM
Hi @Mikhail Akimov! This is most likely caused by the dask worker not having access to
example_flow
. The way Dask works in a truly distributed environment is by serializing your function (in this case the task) and sending it to a worker where it is then deserialized; if any dependencies the task needs are not also present on the worker, the task will not deserialize. I’m guessing your
example_flow
file is not present on your dask workers in an importable way
m

Mikhail Akimov

09/06/2019, 8:57 PM
No it's not. But when I run the same file with
python example_flow.py
and the same DaskExecutor, it works OK, without the file on any of the workers.
And the tasks are present in the flow object that I imported. For what I know, it's identical to the flow I get executing example_flow directly.
c

Chris White

09/06/2019, 8:59 PM
in the file you provided you use a local dask executor though inside the
__main__
block, which will run everything in the same place
AH i see the difference
m

Mikhail Akimov

09/06/2019, 9:00 PM
Changed to
if __name__ == "__main__":
    flow.run(
        executor=prefect.engine.executors.DaskExecutor(executor="<tcp://127.0.0.1:8786>")
    )
Runs OK.
👍 1
c

Chris White

09/06/2019, 9:00 PM
in the interactive session, you did:
from example_flow import flow
this means that, from dask’s perspective, your tasks live at
example_flow.task3
, etc.
maybe try:
from example_flow import *
and see if that fixes it
m

Mikhail Akimov

09/06/2019, 9:03 PM
No, same error.
Hm. I also see this in worker log:
worker_1     | distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95j"\x00\x00\x00\x00\x00\x00\x8c\
and then a huge blob of bytes
c

Chris White

09/06/2019, 9:04 PM
so, this is getting into the weeds, but if you look at:
flow.__module__
task3.__module__
etc., you’ll see
example_flow
, so dask sees all of your objects as living inside a module named “example_flow” that it can’t find
when you run it from the CLI, the associated module for all these objects is actually
__main__
, so everything works
m

Mikhail Akimov

09/06/2019, 9:08 PM
>>> from large_example_flow import flow
>>> flow.__module__
'prefect.core.flow'
c

Chris White

09/06/2019, 9:09 PM
what about the individual tasks?
m

Mikhail Akimov

09/06/2019, 9:09 PM
>>> list(flow.tasks)[0].__module__
'prefect.tasks.core.function'
🧐 1
c

Chris White

09/06/2019, 9:14 PM
ah! I think it’s only
x
(the instance of
Task1
) that will have the file name as its module
m

Mikhail Akimov

09/06/2019, 9:15 PM
Yepp, you're right
So it's pickle trying to import module when unpickling the function
c

Chris White

09/06/2019, 9:16 PM
yup, exactly
so if that file is present on the workers in an importable way, this will start working
m

Mikhail Akimov

09/06/2019, 9:20 PM
Well at my scale making my flows/tasks importable on workers will override the usefulness of making a "universal" launcher for flows. Too tedious to keep up-to-date
But thanks anyway, learned something new about how pickle works 😃
💯 1
c

Chris White

09/06/2019, 9:28 PM
Just as an FYI for you, in Prefect Cloud we have a concept of “execution environment”, and one such environment that we used to use a lot is our
DaskKubernetesEnvironment
, which spins up a Dask Cluster for each run of your workflow, and gives each worker the docker image containing your Flow — regardless of whether you care about Cloud, this pattern makes these dependency situations a little nicer at the cost of some latency + resource usage for creating a cluster with each flow run
m

Mikhail Akimov

09/09/2019, 8:50 AM
I studied the code for some of the environments (and storage) and honestly I can't figure out what the abstraction does or why it's needed. Does it have meaning outside of Cloud context?
Oh, now I found the examples in the unreleased API docs, it became clearer 🙂