Hi! I'm trying to run my flow from another file, i...
# prefect-community
m
Hi! I'm trying to run my flow from another file, importing it to inject custom state handlers/do other stuff:
Copy code
>>> from example_flow import flow
>>> from prefect.engine.executors import DaskExecutor
>>> flow.run()  # this works
[2019-09-06 20:38:53,240] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:38:53,241] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:38:53,243] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
.........
>>> flow.run(executor=DaskExecutor(address="<tcp://127.0.0.1:8786>"))  # this doesn't
[2019-09-06 20:39:52,658] INFO - prefect.FlowRunner | Beginning Flow run for 'example_flow'
[2019-09-06 20:39:52,659] INFO - prefect.FlowRunner | Starting flow run.
[2019-09-06 20:39:52,758] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'example_flow'",)
c
Hi @Mikhail Akimov! This is most likely caused by the dask worker not having access to
example_flow
. The way Dask works in a truly distributed environment is by serializing your function (in this case the task) and sending it to a worker where it is then deserialized; if any dependencies the task needs are not also present on the worker, the task will not deserialize. I’m guessing your
example_flow
file is not present on your dask workers in an importable way
m
No it's not. But when I run the same file with
python example_flow.py
and the same DaskExecutor, it works OK, without the file on any of the workers.
And the tasks are present in the flow object that I imported. For what I know, it's identical to the flow I get executing example_flow directly.
c
in the file you provided you use a local dask executor though inside the
__main__
block, which will run everything in the same place
AH i see the difference
m
Changed to
Copy code
if __name__ == "__main__":
    flow.run(
        executor=prefect.engine.executors.DaskExecutor(executor="<tcp://127.0.0.1:8786>")
    )
Runs OK.
👍 1
c
in the interactive session, you did:
Copy code
from example_flow import flow
this means that, from dask’s perspective, your tasks live at
example_flow.task3
, etc.
maybe try:
Copy code
from example_flow import *
and see if that fixes it
m
No, same error.
Hm. I also see this in worker log:
Copy code
worker_1     | distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95j"\x00\x00\x00\x00\x00\x00\x8c\
and then a huge blob of bytes
c
so, this is getting into the weeds, but if you look at:
Copy code
flow.__module__
task3.__module__
etc., you’ll see
example_flow
, so dask sees all of your objects as living inside a module named “example_flow” that it can’t find
when you run it from the CLI, the associated module for all these objects is actually
__main__
, so everything works
m
Copy code
>>> from large_example_flow import flow
>>> flow.__module__
'prefect.core.flow'
c
what about the individual tasks?
m
Copy code
>>> list(flow.tasks)[0].__module__
'prefect.tasks.core.function'
🧐 1
c
ah! I think it’s only
x
(the instance of
Task1
) that will have the file name as its module
m
Yepp, you're right
So it's pickle trying to import module when unpickling the function
c
yup, exactly
so if that file is present on the workers in an importable way, this will start working
m
Well at my scale making my flows/tasks importable on workers will override the usefulness of making a "universal" launcher for flows. Too tedious to keep up-to-date
But thanks anyway, learned something new about how pickle works 😃
💯 1
c
Just as an FYI for you, in Prefect Cloud we have a concept of “execution environment”, and one such environment that we used to use a lot is our
DaskKubernetesEnvironment
, which spins up a Dask Cluster for each run of your workflow, and gives each worker the docker image containing your Flow — regardless of whether you care about Cloud, this pattern makes these dependency situations a little nicer at the cost of some latency + resource usage for creating a cluster with each flow run
m
I studied the code for some of the environments (and storage) and honestly I can't figure out what the abstraction does or why it's needed. Does it have meaning outside of Cloud context?
Oh, now I found the examples in the unreleased API docs, it became clearer 🙂