Pedro Machado
01/28/2022, 4:28 PMcreate_flow_run
and wait_for_flow_run
and I am using this task to start the child flow(s) from the parent. This task is set to retry. Today, I noticed that if the child flow fails, this task won't start a new flow run when it retries. It seems to check for the same flow_run_id
. I suspect it's somehow related to an idempotency key that is set by default.
Would it be better to use an idempotency key I explicitly set that will be different across retries? Is there a better way to set this up?
The goals are: 1) ability to retry the child flow when it fails and 2) ability to restart the parent flow from failed if needed. Thanks!Anna Geller
create_flow_run
task succeeded (child flow run successfully triggered) but the wait_for_flow_run
failed, then restart won’t help here, unless you mark the upstream create_flow_run
task run as failed first.
The idempotency key is relevant when you are mapping over flows - are you doing that? Regardless, there is definitely no harm in adding this if you want. By default the idempotency key is the task run ID of the create_flow_run task and you can change that to a randomly generated UUID as follows:
with Flow("flowname") as flow:
child_run_id = create_flow_run(
flow_name"xxx",
idempotency_key=str(uuid.uuid4()),
)
Pedro Machado
01/28/2022, 5:47 PMAnna Geller
StartFlowRun
task with
wait=True
- this should be safer and allows to have it in a single task. Here is an example:
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
from prefect.run_configs import KubernetesRun
child_flow_run = StartFlowRun(
flow_name="x",
project_name="y",
parameters=dict(param="value"),
run_name="custom run name",
run_config=KubernetesRun(),
wait=True,
)
with Flow("flow_sample") as flow:
flow_run_id = child_flow_run()
Pedro Machado
01/28/2022, 8:23 PMStartFlowRun
stream logs from the child flow?
2. If I use StartFlowRun
, would retries work if the child flow fails? I am OK starting a new child flow run in case of failure.
3. will this pass the parent's flow labels to the child flow run?Anna Geller
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
from prefect.run_configs import KubernetesRun
LABEL = "yourlabel"
child_flow_run = StartFlowRun(
flow_name="x",
project_name="y",
parameters=dict(param="value"),
run_name="custom run name",
run_config=KubernetesRun(label=LABEL),
wait=True,
)
with Flow("flow_sample", run_config=KubernetesRun(label=LABEL)) as flow:
flow_run_id = child_flow_run()
Pedro Machado
01/29/2022, 2:13 AM