Martha Edwards
03/08/2022, 6:01 PMclass MultipleTaskLoop(Task):
def run(self):
# loop_res = prefect.context.get("task_loop_result", 1)
# print(loop_res)
with Flow('hello-world') as flow:
a = task_a()
task_b(a)
subflow_res = flow.run()
new_res = subflow_res.result[task_b]._result.value
print(new_res)
if new_res:
return new_res
raise LOOP(result=new_res)
@task(log_stdout=True)
def task_a():
return "A"
@task(log_stdout=True)
def task_b(a_arg):
print(a_arg)
print("B")
r = randrange(5)
if r < 3:
print("False")
return False
print("True")
return True
with Flow("parent-flow") as parent_flow:
loop_task = MultipleTaskLoop()
loop_task()
Unexpected error while running flow: KeyError('Task slug MultipleTaskLoop-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')
Traceback (most recent call last):
File "/Users/martha/Documents/prefect-prototype/venv/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 398, in initialize_run
task = tasks[task_run.task_slug]
KeyError: 'MultipleTaskLoop-1'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/martha/Documents/prefect-prototype/venv/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 264, in run
state, task_states, run_context, task_contexts = self.initialize_run(
File "/Users/martha/Documents/prefect-prototype/venv/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 400, in initialize_run
raise KeyError(
KeyError: 'Task slug MultipleTaskLoop-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
create_flow_run
and wait_for_flow_run
, but couldn't get that working either. Any advice or guidance would be much appreciated 🙏Kevin Kho
Martha Edwards
03/08/2022, 6:07 PMKevin Kho
create_flow_run
task. I don’t think you can have the Flow like that inside a task in productionMartha Edwards
03/08/2022, 6:10 PMKevin Kho
raise FAIL()
to fail a task btw and then set the max_retries pretty high. The code might be easier to work withMartha Edwards
03/08/2022, 6:13 PMKevin Kho
raise ENDRUN()