William Grim
02/18/2022, 8:39 PMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'create_params_file\' on <module \'our_filename.py\' from \'/our_filename.py\'>")')
The signature of create_params_file
, which is not a task but a method that can be called looks like:
def create_params_file(base_filename: str, **kwargs) -> str:
Kevin Kho
William Grim
02/18/2022, 8:48 PMour_filename.py
has just this task and the method in question, which is not a task. It exists outside the flow itself and has worked with other flows.Kevin Kho
our_filename.py
like this?
class SomeClass:
@staticmethod
def create_params_file(base_filename: str, **kwargs) -> str:
return something
@task
def some_other_task():
return something
William Grim
02/18/2022, 8:53 PMcreate_flow_run.run(
flow_name="a_flow_name",
project_name=get_project_name(prefect.context.flow_id),
run_name=run_name,
parameters={
"parameters_filename": create_params_file(
k1=v1, k2=v2, ...
),
},
)
.run()
have anything to do with it? Like maybe it’ll work if I put it above the method and then pass the result in? All the code I just posted is in a @task def my_task
kind of setupimport prefect
from prefect import task
from prefect.tasks.prefect import create_flow_run
def create_params_file(base_filename: str, **kwargs) -> str:
"""Create a parameter file in a well-known location."""
@task
def dbt(
k1,
k2
run_name: str = "__internal__",
**kwargs, # noqa: U100 used to set dependencies with other tasks
):
run_name = f"__{prefect.context.flow_name}__.{run_name}"
create_flow_run.run(
flow_name="a_flow_name",
project_name=get_project_name(prefect.context.flow_id),
run_name=run_name,
parameters={
"parameters_filename": create_params_file(
k1=v1, k2=v2, ...
),
},
)
Kevin Kho
William Grim
02/18/2022, 8:57 PMfrom thatfile import dbt
with Flow(...) as flow:
...
dbt(...)
which will have the above setup in the DAGKevin Kho
stored_as_script
for the S3 storage?William Grim
02/18/2022, 9:16 PMwith Flow(
"my_flow",
run_config=LocalRun(labels=["input"]),
) as flow:
Kevin Kho
ModuleNotFoundError
but I am confused why it’s an AttributeError
William Grim
02/18/2022, 10:38 PMFlowStorageError
. I/we had made the assumption that when flows were registered with non-local storage, it would package up any of the methods/imports it referenced. I think we were wrong.Kevin Kho
William Grim
02/23/2022, 12:31 AMKevin Kho
William Grim
02/23/2022, 12:35 AMKevin Kho
William Grim
02/23/2022, 12:37 AMKevin Kho
KubernetesRun
takes in an image which is a Docker image and spins that up to make a podWilliam Grim
02/23/2022, 12:42 AMLocalRun
to KubernetesRun
, it'll package up all the code from the agent box?Kevin Kho
DockerStorage
that will package stuff up for you. Then KubernetesRun
pulls down the container for executionWilliam Grim
02/23/2022, 12:44 AMLocalRun
Kevin Kho
William Grim
02/23/2022, 12:47 AMKevin Kho