https://prefect.io logo
Title
m

Matthias

05/09/2020, 1:35 PM
Hi! I have (again 🤨) a weird issue. I have some more sophisticated class-based Tasks (like requests with authentication, that I already initialize) organized in different files in a Python-package structure. For simplicity there are
__init__.py
files everywhere, importing everything in the respective sub folder. When I use in my Flows a
Task
class that I imported, everything goes well until I run a flow. Then I get from Dask
ModuleNotFoundError("No module named 'reporting'")
where
reporting
is the main package but I cannot refer that back to anything, running it using the
LocalExecutor
works. The debug utility also tells me, that the flow is not serializable. As soon as I copy the Task-class into the same place where the Flow gets defined, everything works like a charm (only difference, that the Class does not get imported) Question: Is there any pitfall I need to consider when defining Task-classes and importing them?
If I write the code as a
@task
function and import it, everything works fine, too. I am so lost 😄
Overview of my testing: This is
/reporting/etl/extract/jira/projects.py
:
from prefect import Task, task
import requests


class Projects(Task):
    def __init__(self, username, password, url, **kwargs):
        self.username = username
        self.password = password
        self.url = url
        super().__init__(**kwargs)

    def run(self):
        data = requests.get(self.url, auth=(
            self.username, self.password)).json()
        return data


@task
def projects(url, username, password):
    data = requests.get(url, auth=(
        username, password)).json()
    return data
This is the Flow:
from reporting.etl.extract.jira_tempo.projects import Projects, projects
from prefect import Flow, Parameter  
from django.conf import settings  

extract_projects = Projects(
    url=settings.LOAD_JIRAURL + '/rest/api/2/project',
    username=settings.LOAD_JIRAUSER,
    password=settings.LOAD_JIRAPASSWORD
)

with Flow("ETL Projects 1") as flow1:
    extracted_projects = extract_projects()

with Flow("ETL Projects 2") as flow2:
    url = Parameter("url", default=settings.LOAD_JIRAURL +
                    '/rest/api/2/project')
    username = Parameter("username", default=settings.LOAD_JIRAUSER)
    password = Parameter("password", default=settings.LOAD_JIRAPASSWORD)

    extracted_projects = projects(url=url, username=username, password=password)
Results:
from prefect.engine.executors import LocalExecutor, LocalDaskExecutor  

flow1.run(executor=LocalExecutor())   # SUCCESS
flow2.run(executor=LocalExecutor())   # SUCCESS
flow1.run(executor=LocalDaskExecutor())   # SUCCESS
flow2.run(executor=LocalDaskExecutor())   # SUCCESS
flow1.run(executor=DaskExecutor())  # FAILS with <Failed: "Unexpected error: ModuleNotFoundError("No module named 'reporting'")">
flow2.run(executor=DaskExecutor())  # SUCCESS
d

David Ojeda

05/09/2020, 3:36 PM
Is
reporting
a pip installed package or is it found via the path? I bet it's the latter and your dask scheduler and worker process do not find it
m

Matthias

05/09/2020, 4:03 PM
True, it’s latter. Whats the right way to make that sure though? I am somehow lost in this.
d

David Ojeda

05/09/2020, 6:48 PM
Sorry, I am not all the time at my computer so my responses are a little slow…
Well, the dask scheduler and the dask worker run as separate programs. If you are doing some path manipulation like
sys.path.append('.')
or anything like that, the dask scheduler and worker will not inherit that. You can
print(sys.path)
on a simple task and compare to the local executor option. The local executor, which runs in the same Python instance as your code that does
flow.run
has the same path so no problems there. There is a way to tell dask to run a pre_load function where you can do your path manipulation, but I think that it is better to avoid path manipulations; it’s just messy. What I suggest is that you convert your
reporting
to a Python package and you install it in development mode. You can either add a
setup.py
and
pip install -e .
for that. I personally prefer to use a
pyproject.toml
handled by poetry and just do a
poetry install
m

Matthias

05/10/2020, 12:25 AM
I was way too stupid for this. I added the working directory (which contains the folder reporting) to the PYTHONPATH of the dask worker and now it works apparently. 🤦‍♂️ Thanks @David Ojeda for pointing me in the right direction!
d

David Ojeda

05/10/2020, 6:37 AM
Cool! I do strongly advise against path manipulations and other mumbo-jumbo… it will come back to bite you back one day! Stick to virtual environments and install your packages accordingly… call it _Python hygiene_…