Has anyone seen this kind of error? Everything was...
# prefect-community
w
Has anyone seen this kind of error? Everything was fine in testing with local storage, and now that we’ve pushed to prod (which uses s3 storage), we are seeing this when we run our flows almost immediately:
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'create_params_file\' on <module \'our_filename.py\' from \'/our_filename.py\'>")')
The signature of
create_params_file
, which is not a task but a method that can be called looks like:
def create_params_file(base_filename: str, **kwargs) -> str:
k
Is that a method attached to a class or a function?
Is this a separate file from the one your Flow is defined in like a custom module? Because S3 Storage can’t support that
w
@Kevin Kho it’s a method in the same file as the task
And yeah, the
our_filename.py
has just this task and the method in question, which is not a task. It exists outside the flow itself and has worked with other flows.
I mean the task has worked
this method is new
k
Ah I see. This is a bit hard to guess. I can try to reproduce in a bit
Is
our_filename.py
like this?
Copy code
class SomeClass:
    @staticmethod
    def create_params_file(base_filename: str, **kwargs) -> str:
        return something

@task
def some_other_task():
    return something
w
@Kevin Kho inside the task, it does this call:
Copy code
create_flow_run.run(
        flow_name="a_flow_name",
        project_name=get_project_name(prefect.context.flow_id),
        run_name=run_name,
        parameters={
            "parameters_filename": create_params_file(
                k1=v1, k2=v2, ...
            ),
        },
    )
Is the fact that it’s inside the
.run()
have anything to do with it? Like maybe it’ll work if I put it above the method and then pass the result in? All the code I just posted is in a
@task def my_task
kind of setup
@Kevin Kho Let me post the whole file, with the private stuff removed. Hang on
Copy code
import prefect
from prefect import task
from prefect.tasks.prefect import create_flow_run


def create_params_file(base_filename: str, **kwargs) -> str:
    """Create a parameter file in a well-known location."""


@task
def dbt(
    k1,
    k2
    run_name: str = "__internal__",
    **kwargs,  # noqa: U100 used to set dependencies with other tasks
):
    run_name = f"__{prefect.context.flow_name}__.{run_name}"

    create_flow_run.run(
        flow_name="a_flow_name",
        project_name=get_project_name(prefect.context.flow_id),
        run_name=run_name,
        parameters={
            "parameters_filename": create_params_file(
                k1=v1, k2=v2, ...
            ),
        },
    )
it’s almost exactly that, just with some details removed
k
And this this task is used by another file that contains the Flow right?
w
Yeah.
exactly
a file with the flow would do something like
Copy code
from thatfile import dbt

with Flow(...) as flow:
  ...
  dbt(...)
which will have the above setup in the DAG
k
Ok I can try this
I can’t replicate on Local Storage(pickled) + Local Agent. Are you using
stored_as_script
for the S3 storage?
Oh I see it’s pickled now from the first log. That’s pretty weird.
w
You know, I'm not sure. Let me get my computer open again and take a look. I'll come back here in a few. I need to undo what I did in prod just so it's not an ongoing outage haha
😆 1
@Kevin Kho this is what our flows look like still:
Copy code
with Flow(
    "my_flow",
    run_config=LocalRun(labels=["input"]),
) as flow:
k
Really not sure here. I am confused because normally this should be a
ModuleNotFoundError
but I am confused why it’s an
AttributeError
w
Haha 🙂
I am about to test “locally” with s3 storage to see what happens here.
So I'm circling back with this. I'm still working on getting confirmation, but I think the fact is that our agent box is now long-lived, and the flow itself is being registered and stored on s3, buuuutttt... When the agent box pulls it down and the flow tries to access code not in modules/methods that are in the old local files, it does not find them and gives a
FlowStorageError
. I/we had made the assumption that when flows were registered with non-local storage, it would package up any of the methods/imports it referenced. I think we were wrong.
k
All of the script based storage only pull down the file, so the dependencies need to go be installed in the execution environment separately if not containerized. Only Docker Storage holds all the dependencies. The blog I sent had Python code to explicitly clone the repo
Nowhere near yet, but changes to cloudpickle like allow bundling up the dependencies in the future.
w
Yeah, pulling down the files is something we could do. That means hacking PYTHONPATH with each flow run, ugh
we do use k8s pods, but i don't know to get a pod with the latest code (the agent pod in particular) without restarting it atm. i'm definitely not a super expert devops person with k8s stuff
k
the PYTHONPATH hacking is exactly why downloading the file on our end does work. It’s quite a mess.
Ah I know what you are saying. So there was this guy here who cloned/downloaded with the ENTRYPOINT of the Flow container and pip installed his library so that he could always pull the latest dependency code
w
lol, yeah, except the point of non-local storage is that you can have a "3rd party machine" do the registrations, which upload the flows, and then any N of your agents can pull that code and go to town
i know there's a way to have one pod get the code onto another pod, but there's some security issue with it here, and i'm not the guy that knows how to solve that. and the guys that do know how to solve it take forever 😢
k
Ah yeah in that case your can do the ENTRYPOINT hack (still need to provide a container though) if you don’t want to bundle it up with DockerStorage.
w
I checked out the PR for cloudpickle. It looks like it was merged already?
well we're not on docker in prod. we use kubernetes. is the storage compatible with each other?
k
It was but we haven’t done it on our end. It’s in cloudpickle 2.0. That is considered for the future roadmap but no timetable
Yes
KubernetesRun
takes in an image which is a Docker image and spins that up to make a pod
w
okay. for now, i'm gonna have to disable long-lived agents just so we can stop breaking prod 🙂 then i'll be looking at how we might be able to start utilizing that. you're saying if we switch from
LocalRun
to
KubernetesRun
, it'll package up all the code from the agent box?
just one question on that then: i thought agents were supposed to be long-lived and not restarting with every deploy. how do you guys handle that and making sure code is updated that gets kicked off?
k
It’s
DockerStorage
that will package stuff up for you. Then
KubernetesRun
pulls down the container for execution
Well the agent is stateless. Any container based storage (Docker, Kubernetes, ECS), just pulls down the latest container (in general) and then executes the Flow so you edit your dependencies, create a new image to run the Flow on, and then the agent is the one that spins that up
w
hmm okay. so we have a long way to go to that then, unfortunately
ohhh
so a big part of our problem is that we are still "local" with
LocalRun
k
Ah yeah for local run you need to stop and edit the environment
w
yeah. really sucks, this, but this explains some stuff
thanks for the chat. i'm going to try and start figuring out DockerStorage and KubernetesRun and see if i can kind of get us going
i may chat here again later. appreciate it again 🙂
k
Of course!
Check this for Prefect’s interface for that.
👀 1