Hi everyone! Sorry to bother you with this, but I'...
# ask-community
c
Hi everyone! Sorry to bother you with this, but I'm having a really hard time trying to run my unpickled flow from an S3 storage, and I think I don't fully understand how the pickling/unpickling really works behind the scenes. More details in the thread:
To reduce it to the simplest form, let's say that I have the following files structure:
Copy code
flows/
├── __init__.py
└── logging_flow.py
Suppose that
logging_flow.py
is defined as follows:
Copy code
from prefect import Flow, task
from prefect.tasks.shell import ShellTask
from prefect.storage import S3

shell_task = ShellTask()

with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
    shell_task(command="echo This works!")
I would then register the flow to the server (in this case is Prefect cloud, but it doesn't matter) and then an agent would deploy an ECS container with, for instance, the base image
prefect:0.14.15
, from which the flow will be run, and this works perfectly fine. The problem comes when I want to do some Task class inheritance. So if I modify my
logging_flow.py
like this:
Copy code
from prefect import Flow, task
from prefect.tasks.shell import ShellTask
from prefect.storage import S3

class ParentShellTask(ShellTask):
    def run(self, **kwargs):
        print("I will ruin your flow")
        super(ParentShellTask, self).run(**kwargs)

shell_task = ParentShellTask()

with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
    shell_task(command="echo This won't work")
And repeat the register-run steps, then the unpickling phase would raise the following exception:
Copy code
An error occurred while unpickling the flow:
  ModuleNotFoundError("No module named 'flows'")
This may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.
Traceback (most recent call last):
  File "/home/prefect/.local/lib/python3.7/site-packages/prefect/utilities/storage.py", line 207, in flow_from_bytes_pickle
    flow = cloudpickle.loads(flow_bytes)
ModuleNotFoundError: No module named 'flows'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/prefect/.local/lib/python3.7/site-packages/prefect/cli/execute.py", line 90, in flow_run
    raise exc
  File "/home/prefect/.local/lib/python3.7/site-packages/prefect/cli/execute.py", line 67, in flow_run
    flow = storage.get_flow(flow_data.name)
  File "/home/prefect/.local/lib/python3.7/site-packages/prefect/storage/s3.py", line 103, in get_flow
    return flow_from_bytes_pickle(output)
  File "/home/prefect/.local/lib/python3.7/site-packages/prefect/utilities/storage.py", line 234, in flow_from_bytes_pickle
    raise StorageError("\n".join(parts)) from exc
prefect.utilities.exceptions.StorageError: An error occurred while unpickling the flow:
  ModuleNotFoundError("No module named 'flows'")
This may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.
So the only thing that changes is that I'm using now a subclass Task that inherits from
ShellTask
, and what I believe is happening is that this subclass is not natively contained in the Prefect library as
ShellTask
was, so it raises an error because the image run in the container does not contain this code. Note that if I create a custom task using decorators, as for instance:
Copy code
from prefect import Flow, task
from prefect.storage import S3
import subprocess

@task
def simple_task():
    subprocess.run(["echo", "ThisWorks!"])

with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
    simple_task()
The flow is successfully executed as well. So what am I missing here? Thank you very much for the help guys, and sorry if its a bit of a naive question :)
k
Hello @Carlos Gutierrez! All questions are welcome of course! We’re here to help. I recreated your ParentShellTask and it works on my setup. Do you have a folder named
flows
? And what agent are you running on?
Sorry I’m blind I see your flows folder now. This error is not about the inherited class. Just register your flow by running
flow.register
inside
logging_flow.py
and then try running that (just register 1 script). This should work. Once this does, we can discuss the source of the error.
Can you show me how you registered?
c
Thank you very much @Kevin Kho for the quick response! I did some tests after your comment and discovered something. First of all, let me extend my case scenario so that the code is a little more accurate to the real scenario. The tree looks more like this:
Copy code
flows/
├── config.py
├── __init__.py
├── logging_flow.py
├── __main__.py
└── tasks
    └── __init__.py
The files
config.py
and the
tasks
module contain helper code for the flow, but let's forget about them for a moment, as I managed to reproduce the exact error commenting those imported files and reducing the code to the minimum just inside
logging_flow.py
, so the tree actually looks like this:
Copy code
flows/
├── __init__.py
├── logging_flow.py
├── __main__.py
What the
__main__.py
file has is a code to manage the flow registering process from outside the Flow (which I didn't imagine it would matter, but after your comment and running some tests it seems like it does). The code is the following:
Copy code
import click
from .task_flow import flow
from prefect.storage import Local


@click.group()
def cli():
    pass

@cli.command("run")
def run_flow():
    flow.run()

@cli.command("register")
@click.option("--local", is_flag=True)
@click.option(
    "--env",
    type=click.Choice(supported_labels, case_sensitive=False),
)
def register(local, env):
    if local:
        flow.storage = Local()

    labels = [str.lower(env)] if env else []
    flow.name += f" ({env})" if env else ""
    flow.register(project_name="Demo", labels=labels)


@cli.command("visualize")
def visualize():
    flow.visualize()


if __name__ == "__main__":
    cli()
As you can see, here we use the
click
library to define some CLI commands that make tasks like running, registering or visualizing the flow a little more convenient. With this, to register the flow what we do is execute
python -m flows register
and it would register it to S3 unless the flag
--local
is passed to the command. At the beginning this didn't seem like an issue, because for the case of running a simple
ShellTask
inside the flow it wouldn't complain. However, I tried placing a
flow.register(project_name="Demo")
inside
logging_flow.py
and running
python -m flows.logging_flow
for the conflicting case and it works like a charm! Why would registering the flow from outside the flow file would have this weird behavior of not working the moment I define this
ParentShellTask
? Also, I did some other tests and realized that if I moved the
ParentShellTask
to the
tasks
module that I showed in the beginning and import it in the
logging_flow.py
file, the same error happens even if I register the flow within the flow file. BUT, this doesn't happen if I define a simple task inside the
tasks
module using the decorator, as I described in the first comment of the thread, and the flow works just fine, like this:
Copy code
# logging_flow.py

from prefect import Flow
from .tasks import simple_task # imports simple_task from the tasks module described in the extended-scenario tree at the beginning of this comment


with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
    simple_task() # this will work

flow.register(project_name="Demo", labels=labels)
(Note that this also works if I register the flow from outside the
logging_flow.py
file as described above) Any ideas why this could be happening? When I tried your approach of registering the flow within the file I thought that might be the issue, but importing a task-decorated function from outside works fine while importing a subclass Task does not :(
Oh and I forgot to answer, I'm using an ECS agent that deploys the flow on Fargate. Although for testing purposes I'm using an equivalent docker-compose local service that basically deploys a container based on the same image used by the agent, and sets up environment variables as set on Fargate, like this:
Copy code
prefect_run:
    image: prefect:0.14.15
    command: bash -c "prefect execute flow-run"
    environment:
      - PREFECT__BACKEND=cloud
      - PREFECT__CLOUD__AGENT__LABELS=['local']
      - PREFECT__CLOUD__API=<https://api.prefect.io>
      - PREFECT__CLOUD__AUTH_TOKEN=the-cloud-api-token
      - PREFECT__CLOUD__USE_LOCAL_SECRETS=false
      - PREFECT__CONTEXT__FLOW_ID=the-flow-id
      - PREFECT__CONTEXT__FLOW_RUN_ID=the-flow-run-id
      - PREFECT__CONTEXT__IMAGE=prefect:0.14.15
      - PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS=prefect.engine.cloud.CloudFlowRunner
      - PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS=prefect.engine.cloud.CloudTaskRunner
      - PREFECT__LOGGING__LEVEL=DEBUG
      - PREFECT__LOGGING__LOG_TO_CLOUD=true
      - S3_FLOWS_BUCKET=some-bucket
      - AWS_ACCESS_KEY_ID=my-access-key
      - AWS_SECRET_ACCESS_KEY=my-secret-access-key
      - AWS_DEFAULT_REGION=us-east-1
Then I would change the
PREFECT__CONTEXT__FLOW_ID
and
PREFECT__CONTEXT__FLOW_RUN_ID
to those obtained in the Cloud so that I can execute the specific flow run. Note that this is just done to speed up the testing process, as deploying the Fargate is a little more time consuming, but I obtain the same errors as if I deployed the Fargate (already checked that)
1
Ok @Kevin Kho I discovered something a little more accurate regarding the source of error. I realised that the
ModuleNotFoundError
is raised only when any of the code executed during the flow run uses a class imported from outside the flow file, but does not happen if it imports just functions and literals from other modules. For example:
Copy code
# config.py  (failing version)

class MyConfig:
    my_var = "some variable"

my_config = MyConfig()

#######################

# tasks/__init__.py

from ..config import my_config # or MyConfig
from prefect import task

@task
def simple_task():
    print(f"My config var is: {config.my_var}" # or MyConfig().my_var
Copy code
# config.py (working version)

my_var = "some variable"

#######################

# tasks/__init__.py

from ..config import my_var
from prefect import task

@task
def simple_task():
    print(f"My config var is: {my_var}"
Copy code
# logging_flow.py

from prefect import Flow
from prefect.storage import S3
from .tasks import simple_task

with Flow("logging-flow", storage=S3(bucket="some-bucket")) as flow:
    simple_task()

flow.register(project_name="Demo", labels=labels)
k
Hey @Carlos Gutierrez, Just letting you know I’m gonna come back to this in a bit. Just answering a couple of other questions first
👌 2
Okay I read through this. By default Prefect only stores the script that the flow in registered from. What you need to do is store a module. The standard way to do this is to copy the package into a Docker container and then install them in the container.
The reason you get ModuleNotFound with the importing is because those files don’t exist on the Agent. Only the Flow that was registered does.
c
I see, but why does it complain when importing classes, but I have no problem when importing functions outside the flow script?
I mean, the file containing the functions or task functions don't exist on the Agent either, but the flow is executed correctly
k
I know what you are asking. Let me ask someone who knows more because this has to do with the serialization that happens when you register the flow.
c
Thank you very much for your help Kevin 😄
k
In the meantime, do you need help getting the module together though? I have resources for that
z
Hey @Carlos Gutierrez -- I believe you're encountering some edge-case behavior with the pickler we use (cloudpickle). It will pickle functions alongside your flow but it will not pickle arbitrary objects and will instead store a reference to them. (All functions are serializable but only some classes are)
c
I managed to get my flow working without class inheritance and just using @task functions, but if you have any docs or resources I could take a look at I would appreaciate it
Hey @Zanie, thank you for joining in
k
z
Generally, pickling causes weird behavior and we're kind of interested in moving away from it as the default. I'd recommend reading through pickle vs script-based storage at https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storage and consider switching to script based storage which will always require your module to be available (which I would recommend as best practice anyway).
I'd take a look at this example deployment pattern as our recommendation for how to set up shared tasks https://github.com/PrefectHQ/prefect/discussions/4042#discussioncomment-328271
🙂 we have lots of reading for you I guess haha
👍 1
😂 3
c
I took a look at script-based storage but didn't manage to make it work. But I guess that in summary is that if I were to use classes/objects I need to build any flow-related code inside the image deployed by the Agent, right?
z
Yep!
c
Thanks a lot for all the docs, I will take a look at them in detail
z
(and by build I just mean that the python module needs to be installed/available in your agent)
c
Thank you very much guys! hope I can get a working solution for this
s
@Carlos Gutierrez Apologies for jumping on your thread but I also had some potentially relevant questions around unpickling. Are external modules (and their specific version) utilized by a Flow required to be present the container where the Agent is run? In my situation this does not seem to be the case as our Agent runs on a container using
prefecthq/prefect:0.14.13-python3.8
. When using Dask, are external modules used by a Flow required to present on the containers where the scheduler and workers are run? Can you describe in a bit more detail how Flows are deserialized in relation to Dask workers? @Zanie As you assisted me with here https://prefect-community.slack.com/archives/CL09KU1K7/p1618183077406400 our hope was to defer installing volatile dependencies until worker runtime but it seems that the version of the external module used during Flow storage needs to be available on the container to support unpickling.
As an example, the image used by my Dask workers had an older version of a module and when running a Flow which referenced a newer version of the module when serialized we receive this error
Copy code
Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'SingleTaskWrapper\' on <module \'rechunker.executors.prefect\' from \'/usr/local/lib/python3.8/site-packages/rechunker/executors/prefect.py\'>")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with \'3.8.7\', currently running with \'3.8.8\')')
This was fixed by updating the module version in the worker image but I wanted to better understand how unpickling/deserialization works at the Dask level so that I construct things the optimal way.
@Zanie Any suggestions around the best approach for managing dependencies in relation to Flow serialization here?
k
Hey @Sean Harkins, we use cloudpickle to serialize Tasks and distribute them to Dask workers. I believe Dask in general runs into a lot of errors when there are version mismatches between client and works (without Prefect in the mix). In general, yes, modules for a Flow would need to be available on the workers (especially if it’s in a Task that can be distributed). Some users run into situations where
LocalDaskExecutor
works and
DaskExecutor
doesn’t for their flow because of Tasks being unserializable. This is because of the need to pickle to distribute to workers.
The best practice for managing dependencies would be to pin versions in your images. I read that thread with Michael and I understand the need for flexibility in development work. You might be able to get away with deferring some of the import statements inside the Tasks rather than in the outer script (of course, this can be risky). I can’t guarantee this would work though, just a thought.