https://prefect.io logo
Title
k

Kamil Okáč

01/07/2020, 2:01 PM
I have a trouble working with tasks declared in external files (using dask executor). Is this how it's supposed to work? main.py:
from prefect import Flow
from prefect.engine.executors.dask import DaskExecutor
import mytask

mt = mytask.MyTask()
with Flow("Flow") as flow:
    t1 = mt(1)

executor = DaskExecutor(address='tcp://....:8786')
flow.run(executor=executor)
mytask,py:
from prefect import Task

class MyTask(Task):
    def run(self, x):
        return x
This leads to error on worker: "`ModuleNotFoundError: No module named 'mytask'`" If I use @task decorator instead of subclassing, there's no problem.
e

emre

01/07/2020, 2:09 PM
In this case, your dask workers need access to your custom module
mytask
. This usually isn’t an issue if you don’t specify an address and prefect automatically launches a dask cluster, which I don’t believe to be the case here. https://docs.prefect.io/cloud/recipes/deployment.html states that:
cloudpickle is an excellent alternative to the standard libary's Pickle protocol for converting Python objects to a serialized byte representation. Note that cloudpickle typically stores imported objects as importable references. So, for example, if you used a function foo that you imported as from my_file import foo, cloudpickle (and consequently Prefect) will assume this same import can take place inside the Docker container. For this reason, it is considered best practice in Prefect to ensure all utility scripts and custom Python code be accessible on your Docker image's system PATH.
your flow object is serialized with
cloudpickle
, which is then sent to your dask cluster. Therefore code in your module
mytask
is not serialized in the flow, it is stored as a reference. If you make sure that
mytask
is in your
PYTHONPATH
in each of your dask workers, defining tasks in modules should work.
j

Jeremiah

01/07/2020, 2:24 PM
^ what @emre said. Dask is trying to rehydrate code that isn’t available to the python process where the dask worker lives.
You can get around this (if you need to) by defining
MyTask
in the same file as your
flow.run
statement, because Cloudpickle will do its best to serialize globals in their entirety. However, if Cloudpickle detects that you imported
MyTask
, it will simply serialize a reference to the import, which fails if the import isn’t available. Note that even though you can get around this as I’ve described, best practice is to use imports.
k

Kamil Okáč

01/07/2020, 4:49 PM
Thanks!