https://prefect.io logo
Title
m

Martha Edwards

03/08/2022, 6:01 PM
Hi! Is it possible in Prefect to create a sub-flow that works as follows: • task A runs and may fail. Retry A until success. • task B runs and may fail (or the task itself succeeds, but it returns a result indicating failure of the sub-flow). In that case, loop back to retry A. This seems like it could be possible to do using looping on a nested flow as described in this stackoverflow post.
:discourse: 1
Here's the code I am trying:
class MultipleTaskLoop(Task):
    def run(self):
        # loop_res = prefect.context.get("task_loop_result", 1)
        # print(loop_res)

        with Flow('hello-world') as flow:
            a = task_a()
            task_b(a)

        subflow_res = flow.run()
        new_res = subflow_res.result[task_b]._result.value
        print(new_res)

        if new_res:
            return new_res
        raise LOOP(result=new_res)


@task(log_stdout=True)
def task_a():
    return "A"


@task(log_stdout=True)
def task_b(a_arg):
    print(a_arg)
    print("B")
    r = randrange(5)
    if r < 3:
        print("False")
        return False
    print("True")
    return True


with Flow("parent-flow") as parent_flow:
    loop_task = MultipleTaskLoop()
    loop_task()
This leads to a KeyError:
Unexpected error while running flow: KeyError('Task slug MultipleTaskLoop-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?')
Traceback (most recent call last):
  File "/Users/martha/Documents/prefect-prototype/venv/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 398, in initialize_run
    task = tasks[task_run.task_slug]
KeyError: 'MultipleTaskLoop-1'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/martha/Documents/prefect-prototype/venv/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 264, in run
    state, task_states, run_context, task_contexts = self.initialize_run(
  File "/Users/martha/Documents/prefect-prototype/venv/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 400, in initialize_run
    raise KeyError(
KeyError: 'Task slug MultipleTaskLoop-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
I also attempted this using
create_flow_run
and
wait_for_flow_run
, but couldn't get that working either. Any advice or guidance would be much appreciated 🙏
k

Kevin Kho

03/08/2022, 6:04 PM
I have to digest this one second lol
:thank-you: 1
I dont think this is possible in Prefect 1.0 , Orion supports this though. You likely would need to combine Task A and B in the subflow, but then they dont get individual observability. What is the use case you are trying to accomplish?
m

Martha Edwards

03/08/2022, 6:07 PM
Dispatching an asynchronous task to another API and then listening for a success response. No worries, combining them into 1 task would be fine.
k

Kevin Kho

03/08/2022, 6:09 PM
I see yeah you have to combine them, register, and then use the
create_flow_run
task. I don’t think you can have the Flow like that inside a task in production
m

Martha Edwards

03/08/2022, 6:10 PM
That sounds fine. I think if we are combining them then there's no real need for a subflow anyway. The two operations can just live together inside of one top-level task in the main flow.
k

Kevin Kho

03/08/2022, 6:12 PM
You can also
raise FAIL()
to fail a task btw and then set the max_retries pretty high. The code might be easier to work with
m

Martha Edwards

03/08/2022, 6:13 PM
Gotcha, yeah I think that is what we would do
k

Kevin Kho

03/08/2022, 6:17 PM
Actually that might infinite loop. If you want to terminate, use
raise ENDRUN()
👍 1