Hi! I have (again :face_with_raised_eyebrow:) a we...
# prefect-community
Hi! I have (again 🤨) a weird issue. I have some more sophisticated class-based Tasks (like requests with authentication, that I already initialize) organized in different files in a Python-package structure. For simplicity there are
files everywhere, importing everything in the respective sub folder. When I use in my Flows a
class that I imported, everything goes well until I run a flow. Then I get from Dask
ModuleNotFoundError("No module named 'reporting'")
is the main package but I cannot refer that back to anything, running it using the
works. The debug utility also tells me, that the flow is not serializable. As soon as I copy the Task-class into the same place where the Flow gets defined, everything works like a charm (only difference, that the Class does not get imported) Question: Is there any pitfall I need to consider when defining Task-classes and importing them?
If I write the code as a
function and import it, everything works fine, too. I am so lost 😄
Overview of my testing: This is
Copy code
from prefect import Task, task
import requests

class Projects(Task):
    def __init__(self, username, password, url, **kwargs):
        self.username = username
        self.password = password
        self.url = url

    def run(self):
        data = requests.get(self.url, auth=(
            self.username, self.password)).json()
        return data

def projects(url, username, password):
    data = requests.get(url, auth=(
        username, password)).json()
    return data
This is the Flow:
Copy code
from reporting.etl.extract.jira_tempo.projects import Projects, projects
from prefect import Flow, Parameter  
from django.conf import settings  

extract_projects = Projects(
    url=settings.LOAD_JIRAURL + '/rest/api/2/project',

with Flow("ETL Projects 1") as flow1:
    extracted_projects = extract_projects()

with Flow("ETL Projects 2") as flow2:
    url = Parameter("url", default=settings.LOAD_JIRAURL +
    username = Parameter("username", default=settings.LOAD_JIRAUSER)
    password = Parameter("password", default=settings.LOAD_JIRAPASSWORD)

    extracted_projects = projects(url=url, username=username, password=password)
Copy code
from prefect.engine.executors import LocalExecutor, LocalDaskExecutor  

flow1.run(executor=LocalExecutor())   # SUCCESS
flow2.run(executor=LocalExecutor())   # SUCCESS
flow1.run(executor=LocalDaskExecutor())   # SUCCESS
flow2.run(executor=LocalDaskExecutor())   # SUCCESS
flow1.run(executor=DaskExecutor())  # FAILS with <Failed: "Unexpected error: ModuleNotFoundError("No module named 'reporting'")">
flow2.run(executor=DaskExecutor())  # SUCCESS
a pip installed package or is it found via the path? I bet it's the latter and your dask scheduler and worker process do not find it
True, it’s latter. Whats the right way to make that sure though? I am somehow lost in this.
Sorry, I am not all the time at my computer so my responses are a little slow…
Well, the dask scheduler and the dask worker run as separate programs. If you are doing some path manipulation like
or anything like that, the dask scheduler and worker will not inherit that. You can
on a simple task and compare to the local executor option. The local executor, which runs in the same Python instance as your code that does
has the same path so no problems there. There is a way to tell dask to run a pre_load function where you can do your path manipulation, but I think that it is better to avoid path manipulations; it’s just messy. What I suggest is that you convert your
to a Python package and you install it in development mode. You can either add a
pip install -e .
for that. I personally prefer to use a
handled by poetry and just do a
poetry install
I was way too stupid for this. I added the working directory (which contains the folder reporting) to the PYTHONPATH of the dask worker and now it works apparently. 🤦‍♂️ Thanks @David Ojeda for pointing me in the right direction!
Cool! I do strongly advise against path manipulations and other mumbo-jumbo… it will come back to bite you back one day! Stick to virtual environments and install your packages accordingly… call it _Python hygiene_…