Matthias
05/09/2020, 1:35 PM__init__.py
files everywhere, importing everything in the respective sub folder.
When I use in my Flows a Task
class that I imported, everything goes well until I run a flow. Then I get from Dask ModuleNotFoundError("No module named 'reporting'")
where reporting
is the main package but I cannot refer that back to anything, running it using the LocalExecutor
works. The debug utility also tells me, that the flow is not serializable. As soon as I copy the Task-class into the same place where the Flow gets defined, everything works like a charm (only difference, that the Class does not get imported)
Question: Is there any pitfall I need to consider when defining Task-classes and importing them?@task
function and import it, everything works fine, too. I am so lost 😄/reporting/etl/extract/jira/projects.py
:
from prefect import Task, task
import requests
class Projects(Task):
def __init__(self, username, password, url, **kwargs):
self.username = username
self.password = password
self.url = url
super().__init__(**kwargs)
def run(self):
data = requests.get(self.url, auth=(
self.username, self.password)).json()
return data
@task
def projects(url, username, password):
data = requests.get(url, auth=(
username, password)).json()
return data
This is the Flow:
from reporting.etl.extract.jira_tempo.projects import Projects, projects
from prefect import Flow, Parameter
from django.conf import settings
extract_projects = Projects(
url=settings.LOAD_JIRAURL + '/rest/api/2/project',
username=settings.LOAD_JIRAUSER,
password=settings.LOAD_JIRAPASSWORD
)
with Flow("ETL Projects 1") as flow1:
extracted_projects = extract_projects()
with Flow("ETL Projects 2") as flow2:
url = Parameter("url", default=settings.LOAD_JIRAURL +
'/rest/api/2/project')
username = Parameter("username", default=settings.LOAD_JIRAUSER)
password = Parameter("password", default=settings.LOAD_JIRAPASSWORD)
extracted_projects = projects(url=url, username=username, password=password)
Results:
from prefect.engine.executors import LocalExecutor, LocalDaskExecutor
flow1.run(executor=LocalExecutor()) # SUCCESS
flow2.run(executor=LocalExecutor()) # SUCCESS
flow1.run(executor=LocalDaskExecutor()) # SUCCESS
flow2.run(executor=LocalDaskExecutor()) # SUCCESS
flow1.run(executor=DaskExecutor()) # FAILS with <Failed: "Unexpected error: ModuleNotFoundError("No module named 'reporting'")">
flow2.run(executor=DaskExecutor()) # SUCCESS
David Ojeda
05/09/2020, 3:36 PMreporting
a pip installed package or is it found via the path? I bet it's the latter and your dask scheduler and worker process do not find itMatthias
05/09/2020, 4:03 PMDavid Ojeda
05/09/2020, 6:48 PMsys.path.append('.')
or anything like that, the dask scheduler and worker will not inherit that. You can print(sys.path)
on a simple task and compare to the local executor option.
The local executor, which runs in the same Python instance as your code that does flow.run
has the same path so no problems there.
There is a way to tell dask to run a pre_load function where you can do your path manipulation, but I think that it is better to avoid path manipulations; it’s just messy.
What I suggest is that you convert your reporting
to a Python package and you install it in development mode.
You can either add a setup.py
and pip install -e .
for that.
I personally prefer to use a pyproject.toml
handled by poetry and just do a poetry install
Matthias
05/10/2020, 12:25 AMDavid Ojeda
05/10/2020, 6:37 AM