Olivér Atanaszov
02/18/2022, 4:06 PM[2022-02-17 122919+0000] INFO - prefect.wait_for_flow_run | Flow 'affable-piculet-flow-build│
-dataset': Entered state <Failed>: Unexpected error while running flow: KeyError('Task slug do_someting-1 is not found in the current Flow. This is usually caused by a mismatch between th│
e flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did│
you change the flow without re-registering it?\n- Did you register the flow without updating │
it in your storage location (if applicable)?') │when trying to run a flow of flows like this:
from flows.a import flow as flow_a # flow's name is "flow-a"
from flows.b import flow as flow_b # flow's name is "flow-b"
kwargs = {
"project_name": "foo",
"labels": ["test"]
}
flow_a.register(**kwargs)
flow_b.register(**kwargs)
with Flow("flow-of-flows", run_config=run_config, storage=storage, result=result) as flow:
run_id_flow_a = create_flow_run(flow_name="flow-a", **kwargs)
wait_for_flow_a = wait_for_flow_run(run_id_flow_a, raise_final_state=True)
output_a = get_task_run_result(run_id_flow_a, task_slug="do_something-1")
run_id_flow_b = create_flow_run(flow_name="flow-b", parameters={"input": output_a}, **kwargs)
run_id_flow_b.set_upstream(wait_for_flow_a)
flow.register(**kwargs)
@George CoyneKevin Kho
George Coyne
02/18/2022, 5:43 PMSo the error seems to be from the underlying API that get_result_from_task is calling.
Couple things here. I personally like to avoid flow.register() unless I am using an ancillary function that registers flows. I use the CLI to register generally.
That way I can run my code without worrying about commenting out registration lines.
Next: Passing in task results as parameters is dicey because of the typing. Parameters are essentially JSON and support all json types. Task results are serialized objects though.
Even so, your task slug seems to be not found, are you sure you are passing the correct task slug?
My preference is to pass metadata between flows, if that. So like, createflow runs with parameters passed in, use those parameters to write to object storage, and then again to inform the locations in object storage for downstream flows.
Hope that is helpful!
name_of_thing = "foo_bar"
runa = create_flow_run(parameters={"inputa":name_of_thing})
wait
runb = create_flow_run(parameters={"inputb":name_of_thing})
And then within each flow handle the interpolation to read from/write to the appropriate spot