https://prefect.io logo
p

Pedro Machado

01/28/2022, 4:28 PM
Hi everyone. What is the best pattern to implement retries/ability to restart when using a flow of flows? I created a function task that combines
create_flow_run
and
wait_for_flow_run
and I am using this task to start the child flow(s) from the parent. This task is set to retry. Today, I noticed that if the child flow fails, this task won't start a new flow run when it retries. It seems to check for the same
flow_run_id
. I suspect it's somehow related to an idempotency key that is set by default. Would it be better to use an idempotency key I explicitly set that will be different across retries? Is there a better way to set this up? The goals are: 1) ability to retry the child flow when it fails and 2) ability to restart the parent flow from failed if needed. Thanks!
a

Anna Geller

01/28/2022, 4:37 PM
We need to differentiate between retries and restarts. Retries is a task-level concept and you can set it up on the task decorator. Restart is a flow-level concept and it allows you to restart a flow run from a failed task run. If your child flow run failed to start, then restart will help. But if your
create_flow_run
task succeeded (child flow run successfully triggered) but the
wait_for_flow_run
failed, then restart won’t help here, unless you mark the upstream
create_flow_run
task run as failed first. The idempotency key is relevant when you are mapping over flows - are you doing that? Regardless, there is definitely no harm in adding this if you want. By default the idempotency key is the task run ID of the create_flow_run task and you can change that to a randomly generated UUID as follows:
Copy code
with Flow("flowname") as flow:
    child_run_id = create_flow_run(
        flow_name"xxx",
        idempotency_key=str(uuid.uuid4()),
    )
p

Pedro Machado

01/28/2022, 5:47 PM
Untitled.py
Hi Anna. combined those two tasks into a single task thinking that I could retry the flow run creation + wait. Here is what I have. ☝️
I am not mapping over flows.
a

Anna Geller

01/28/2022, 7:56 PM
if you want to combine create_flow_run and wair_for_flow_run this way, you can simply use the
StartFlowRun
task with
Copy code
wait=True
- this should be safer and allows to have it in a single task. Here is an example:
Copy code
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
from prefect.run_configs import KubernetesRun

child_flow_run = StartFlowRun(
    flow_name="x",
    project_name="y",
    parameters=dict(param="value"),
    run_name="custom run name",
    run_config=KubernetesRun(),
    wait=True,
)

with Flow("flow_sample") as flow:
    flow_run_id = child_flow_run()
p

Pedro Machado

01/28/2022, 8:23 PM
A couple of questions: 1. will
StartFlowRun
stream logs from the child flow? 2. If I use
StartFlowRun
, would retries work if the child flow fails? I am OK starting a new child flow run in case of failure. 3. will this pass the parent's flow labels to the child flow run?
a

Anna Geller

01/28/2022, 8:32 PM
1. No, unfortunately 2. I haven’t tested that but I would assume so since it waits until the child flow run completion and raises if the child flow run fails 3. Do you mean agent labels? You could definitely set the label as a global variable and use in both parent flow’s run config + child flow run config:
Copy code
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
from prefect.run_configs import KubernetesRun

LABEL = "yourlabel"
child_flow_run = StartFlowRun(
    flow_name="x",
    project_name="y",
    parameters=dict(param="value"),
    run_name="custom run name",
    run_config=KubernetesRun(label=LABEL),
    wait=True,
)

with Flow("flow_sample",  run_config=KubernetesRun(label=LABEL)) as flow:
    flow_run_id = child_flow_run()
p

Pedro Machado

01/29/2022, 2:13 AM
Thanks
5 Views