Olivér Atanaszov

02/18/2022, 4:06 PM
Hi, I'm having the probably infamous issue with task slugs:
[2022-02-17 122919+0000] INFO - prefect.wait_for_flow_run | Flow 'affable-piculet-flow-build│
-dataset': Entered state <Failed>: Unexpected error while running flow: KeyError('Task slug do_someting-1 is not found in the current Flow. This is usually caused by a mismatch between th│
e flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did│
 you change the flow without re-registering it?\n- Did you register the flow without updating │
it in your storage location (if applicable)?')                                                │
when trying to run a flow of flows like this:
Copy code
from flows.a import flow as flow_a   # flow's name is "flow-a"
from flows.b import flow as flow_b   # flow's name is "flow-b"

kwargs = {
"project_name": "foo",
"labels": ["test"]


with Flow("flow-of-flows", run_config=run_config, storage=storage, result=result) as flow:

   run_id_flow_a = create_flow_run(flow_name="flow-a", **kwargs)
   wait_for_flow_a = wait_for_flow_run(run_id_flow_a, raise_final_state=True)
   output_a = get_task_run_result(run_id_flow_a, task_slug="do_something-1")
   run_id_flow_b = create_flow_run(flow_name="flow-b", parameters={"input": output_a}, **kwargs)

@George Coyne

Kevin Kho

02/18/2022, 4:13 PM
Hi @Olivér Atanaszov, I think we’ll need to see `flow_a`’s code for more clues. Any reason you’re tagging George? Did you discuss this with him previously?

George Coyne

02/18/2022, 5:43 PM
This was a DM originally. So Where we left off was some ideas on my side:
So the error seems to be from the underlying API that get_result_from_task is calling.
Couple things here. I personally like to avoid flow.register() unless I am using an ancillary function that registers flows. I use the CLI to register generally.
That way I can run my code without worrying about commenting out registration lines.
Next: Passing in task results as parameters is dicey because of the typing. Parameters are essentially JSON and support all json types. Task results are serialized objects though.
Even so, your task slug seems to be not found, are you sure you are passing the correct task slug?
My preference is to pass metadata between flows, if that. So like, createflow runs with parameters passed in, use those parameters to write to object storage, and then again to inform the locations in object storage for downstream flows.
Hope that is helpful!
So my move is like:
Copy code
name_of_thing = "foo_bar"
runa = create_flow_run(parameters={"inputa":name_of_thing})
runb = create_flow_run(parameters={"inputb":name_of_thing})
And then within each flow handle the interpolation to read from/write to the appropriate spot