Carlos Gutierrez
04/12/2021, 9:32 PMCarlos Gutierrez
04/12/2021, 9:32 PMflows/
├── __init__.py
└── logging_flow.py
Suppose that logging_flow.py
is defined as follows:
from prefect import Flow, task
from prefect.tasks.shell import ShellTask
from prefect.storage import S3
shell_task = ShellTask()
with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
shell_task(command="echo This works!")
I would then register the flow to the server (in this case is Prefect cloud, but it doesn't matter) and then an agent would deploy an ECS container with, for instance, the base image prefect:0.14.15
, from which the flow will be run, and this works perfectly fine.
The problem comes when I want to do some Task class inheritance. So if I modify my logging_flow.py
like this:
from prefect import Flow, task
from prefect.tasks.shell import ShellTask
from prefect.storage import S3
class ParentShellTask(ShellTask):
def run(self, **kwargs):
print("I will ruin your flow")
super(ParentShellTask, self).run(**kwargs)
shell_task = ParentShellTask()
with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
shell_task(command="echo This won't work")
And repeat the register-run steps, then the unpickling phase would raise the following exception:
An error occurred while unpickling the flow:
ModuleNotFoundError("No module named 'flows'")
This may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.
Traceback (most recent call last):
File "/home/prefect/.local/lib/python3.7/site-packages/prefect/utilities/storage.py", line 207, in flow_from_bytes_pickle
flow = cloudpickle.loads(flow_bytes)
ModuleNotFoundError: No module named 'flows'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/prefect/.local/lib/python3.7/site-packages/prefect/cli/execute.py", line 90, in flow_run
raise exc
File "/home/prefect/.local/lib/python3.7/site-packages/prefect/cli/execute.py", line 67, in flow_run
flow = storage.get_flow(flow_data.name)
File "/home/prefect/.local/lib/python3.7/site-packages/prefect/storage/s3.py", line 103, in get_flow
return flow_from_bytes_pickle(output)
File "/home/prefect/.local/lib/python3.7/site-packages/prefect/utilities/storage.py", line 234, in flow_from_bytes_pickle
raise StorageError("\n".join(parts)) from exc
prefect.utilities.exceptions.StorageError: An error occurred while unpickling the flow:
ModuleNotFoundError("No module named 'flows'")
This may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.
So the only thing that changes is that I'm using now a subclass Task that inherits from ShellTask
, and what I believe is happening is that this subclass is not natively contained in the Prefect library as ShellTask
was, so it raises an error because the image run in the container does not contain this code.
Note that if I create a custom task using decorators, as for instance:
from prefect import Flow, task
from prefect.storage import S3
import subprocess
@task
def simple_task():
subprocess.run(["echo", "ThisWorks!"])
with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
simple_task()
The flow is successfully executed as well. So what am I missing here?
Thank you very much for the help guys, and sorry if its a bit of a naive question :)Kevin Kho
flows
? And what agent are you running on?Kevin Kho
flow.register
inside logging_flow.py
and then try running that (just register 1 script). This should work. Once this does, we can discuss the source of the error.Kevin Kho
Carlos Gutierrez
04/13/2021, 7:31 AMflows/
├── config.py
├── __init__.py
├── logging_flow.py
├── __main__.py
└── tasks
└── __init__.py
The files config.py
and the tasks
module contain helper code for the flow, but let's forget about them for a moment, as I managed to reproduce the exact error commenting those imported files and reducing the code to the minimum just inside logging_flow.py
, so the tree actually looks like this:
flows/
├── __init__.py
├── logging_flow.py
├── __main__.py
What the __main__.py
file has is a code to manage the flow registering process from outside the Flow (which I didn't imagine it would matter, but after your comment and running some tests it seems like it does). The code is the following:
import click
from .task_flow import flow
from prefect.storage import Local
@click.group()
def cli():
pass
@cli.command("run")
def run_flow():
flow.run()
@cli.command("register")
@click.option("--local", is_flag=True)
@click.option(
"--env",
type=click.Choice(supported_labels, case_sensitive=False),
)
def register(local, env):
if local:
flow.storage = Local()
labels = [str.lower(env)] if env else []
flow.name += f" ({env})" if env else ""
flow.register(project_name="Demo", labels=labels)
@cli.command("visualize")
def visualize():
flow.visualize()
if __name__ == "__main__":
cli()
As you can see, here we use the click
library to define some CLI commands that make tasks like running, registering or visualizing the flow a little more convenient. With this, to register the flow what we do is execute python -m flows register
and it would register it to S3 unless the flag --local
is passed to the command. At the beginning this didn't seem like an issue, because for the case of running a simple ShellTask
inside the flow it wouldn't complain. However, I tried placing a flow.register(project_name="Demo")
inside logging_flow.py
and running python -m flows.logging_flow
for the conflicting case and it works like a charm!
Why would registering the flow from outside the flow file would have this weird behavior of not working the moment I define this ParentShellTask
?
Also, I did some other tests and realized that if I moved the ParentShellTask
to the tasks
module that I showed in the beginning and import it in the logging_flow.py
file, the same error happens even if I register the flow within the flow file. BUT, this doesn't happen if I define a simple task inside the tasks
module using the decorator, as I described in the first comment of the thread, and the flow works just fine, like this:
# logging_flow.py
from prefect import Flow
from .tasks import simple_task # imports simple_task from the tasks module described in the extended-scenario tree at the beginning of this comment
with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
simple_task() # this will work
flow.register(project_name="Demo", labels=labels)
(Note that this also works if I register the flow from outside the logging_flow.py
file as described above)
Any ideas why this could be happening? When I tried your approach of registering the flow within the file I thought that might be the issue, but importing a task-decorated function from outside works fine while importing a subclass Task does not :(Carlos Gutierrez
04/13/2021, 9:30 AMprefect_run:
image: prefect:0.14.15
command: bash -c "prefect execute flow-run"
environment:
- PREFECT__BACKEND=cloud
- PREFECT__CLOUD__AGENT__LABELS=['local']
- PREFECT__CLOUD__API=<https://api.prefect.io>
- PREFECT__CLOUD__AUTH_TOKEN=the-cloud-api-token
- PREFECT__CLOUD__USE_LOCAL_SECRETS=false
- PREFECT__CONTEXT__FLOW_ID=the-flow-id
- PREFECT__CONTEXT__FLOW_RUN_ID=the-flow-run-id
- PREFECT__CONTEXT__IMAGE=prefect:0.14.15
- PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS=prefect.engine.cloud.CloudFlowRunner
- PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS=prefect.engine.cloud.CloudTaskRunner
- PREFECT__LOGGING__LEVEL=DEBUG
- PREFECT__LOGGING__LOG_TO_CLOUD=true
- S3_FLOWS_BUCKET=some-bucket
- AWS_ACCESS_KEY_ID=my-access-key
- AWS_SECRET_ACCESS_KEY=my-secret-access-key
- AWS_DEFAULT_REGION=us-east-1
Then I would change the PREFECT__CONTEXT__FLOW_ID
and PREFECT__CONTEXT__FLOW_RUN_ID
to those obtained in the Cloud so that I can execute the specific flow run. Note that this is just done to speed up the testing process, as deploying the Fargate is a little more time consuming, but I obtain the same errors as if I deployed the Fargate (already checked that)Carlos Gutierrez
04/13/2021, 11:17 AMModuleNotFoundError
is raised only when any of the code executed during the flow run uses a class imported from outside the flow file, but does not happen if it imports just functions and literals from other modules. For example:
# config.py (failing version)
class MyConfig:
my_var = "some variable"
my_config = MyConfig()
#######################
# tasks/__init__.py
from ..config import my_config # or MyConfig
from prefect import task
@task
def simple_task():
print(f"My config var is: {config.my_var}" # or MyConfig().my_var
# config.py (working version)
my_var = "some variable"
#######################
# tasks/__init__.py
from ..config import my_var
from prefect import task
@task
def simple_task():
print(f"My config var is: {my_var}"
# logging_flow.py
from prefect import Flow
from prefect.storage import S3
from .tasks import simple_task
with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
simple_task()
flow.register(project_name="Demo", labels=labels)
Kevin Kho
Kevin Kho
Kevin Kho
Carlos Gutierrez
04/13/2021, 3:40 PMCarlos Gutierrez
04/13/2021, 3:41 PMKevin Kho
Carlos Gutierrez
04/13/2021, 3:44 PMKevin Kho
Zanie
Carlos Gutierrez
04/13/2021, 3:45 PMCarlos Gutierrez
04/13/2021, 3:46 PMKevin Kho
Zanie
Zanie
Zanie
Carlos Gutierrez
04/13/2021, 3:51 PMZanie
Carlos Gutierrez
04/13/2021, 3:53 PMZanie
Carlos Gutierrez
04/13/2021, 3:59 PMSean Harkins
04/15/2021, 2:46 PMprefecthq/prefect:0.14.13-python3.8
.
When using Dask, are external modules used by a Flow required to present on the containers where the scheduler and workers are run? Can you describe in a bit more detail how Flows are deserialized in relation to Dask workers?
@Zanie As you assisted me with here https://prefect-community.slack.com/archives/CL09KU1K7/p1618183077406400 our hope was to defer installing volatile dependencies until worker runtime but it seems that the version of the external module used during Flow storage needs to be available on the container to support unpickling.Sean Harkins
04/15/2021, 2:50 PMFailed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'SingleTaskWrapper\' on <module \'rechunker.executors.prefect\' from \'/usr/local/lib/python3.8/site-packages/rechunker/executors/prefect.py\'>")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with \'3.8.7\', currently running with \'3.8.8\')')
This was fixed by updating the module version in the worker image but I wanted to better understand how unpickling/deserialization works at the Dask level so that I construct things the optimal way.Sean Harkins
04/17/2021, 1:49 AMKevin Kho
LocalDaskExecutor
works and DaskExecutor
doesn’t for their flow because of Tasks being unserializable. This is because of the need to pickle to distribute to workers.Kevin Kho