Matthias
05/09/2020, 1:35 PM__init__.py files everywhere, importing everything in the respective sub folder.
When I use in my Flows a Task class that I imported, everything goes well until I run a flow. Then I get from Dask ModuleNotFoundError("No module named 'reporting'") where reporting is the main package but I cannot refer that back to anything, running it using the LocalExecutor works. The debug utility also tells me, that the flow is not serializable. As soon as I copy the Task-class into the same place where the Flow gets defined, everything works like a charm (only difference, that the Class does not get imported)
Question: Is there any pitfall I need to consider when defining Task-classes and importing them?Matthias
05/09/2020, 2:18 PM@task function and import it, everything works fine, too. I am so lost 😄Matthias
05/09/2020, 2:37 PM/reporting/etl/extract/jira/projects.py :
from prefect import Task, task
import requests
class Projects(Task):
def __init__(self, username, password, url, **kwargs):
self.username = username
self.password = password
self.url = url
super().__init__(**kwargs)
def run(self):
data = requests.get(self.url, auth=(
self.username, self.password)).json()
return data
@task
def projects(url, username, password):
data = requests.get(url, auth=(
username, password)).json()
return data
This is the Flow:
from reporting.etl.extract.jira_tempo.projects import Projects, projects
from prefect import Flow, Parameter
from django.conf import settings
extract_projects = Projects(
url=settings.LOAD_JIRAURL + '/rest/api/2/project',
username=settings.LOAD_JIRAUSER,
password=settings.LOAD_JIRAPASSWORD
)
with Flow("ETL Projects 1") as flow1:
extracted_projects = extract_projects()
with Flow("ETL Projects 2") as flow2:
url = Parameter("url", default=settings.LOAD_JIRAURL +
'/rest/api/2/project')
username = Parameter("username", default=settings.LOAD_JIRAUSER)
password = Parameter("password", default=settings.LOAD_JIRAPASSWORD)
extracted_projects = projects(url=url, username=username, password=password)
Results:
from prefect.engine.executors import LocalExecutor, LocalDaskExecutor
flow1.run(executor=LocalExecutor()) # SUCCESS
flow2.run(executor=LocalExecutor()) # SUCCESS
flow1.run(executor=LocalDaskExecutor()) # SUCCESS
flow2.run(executor=LocalDaskExecutor()) # SUCCESS
flow1.run(executor=DaskExecutor()) # FAILS with <Failed: "Unexpected error: ModuleNotFoundError("No module named 'reporting'")">
flow2.run(executor=DaskExecutor()) # SUCCESSDavid Ojeda
05/09/2020, 3:36 PMreporting a pip installed package or is it found via the path? I bet it's the latter and your dask scheduler and worker process do not find itMatthias
05/09/2020, 4:03 PMDavid Ojeda
05/09/2020, 6:48 PMDavid Ojeda
05/09/2020, 6:52 PMsys.path.append('.') or anything like that, the dask scheduler and worker will not inherit that. You can print(sys.path) on a simple task and compare to the local executor option.
The local executor, which runs in the same Python instance as your code that does flow.run has the same path so no problems there.
There is a way to tell dask to run a pre_load function where you can do your path manipulation, but I think that it is better to avoid path manipulations; it’s just messy.
What I suggest is that you convert your reporting to a Python package and you install it in development mode.
You can either add a setup.py and pip install -e . for that.
I personally prefer to use a pyproject.toml handled by poetry and just do a poetry installMatthias
05/10/2020, 12:25 AMDavid Ojeda
05/10/2020, 6:37 AM