Michal Baumgartner
12/29/2021, 2:29 PMKevin Kho
Michal Baumgartner
12/29/2021, 3:04 PMKevin Kho
Michal Baumgartner
12/29/2021, 3:11 PMScheduled suffice?Kevin Kho
def flow_retry(flow, old_state, new_state):
if new_state.is_failed():
retry_count = prefect.context.parameters["RETRY_COUNT"]
if retry_count < 3:
from prefect.tasks.prefect import create_flow_run
create_flow_run.run(flow_name="..", project_name="...", parameters={"RETRY_COUNT": (retry_count+1)})
with Flow(..., state_handlers=[flow_retry]):
retry_count = Parameter("RETRY_COUNT", 0)()
Use the create_flow_run task but just call the run method in the state handler. This task gives the most flexibility with starting flow runsKevin Kho
Michal Baumgartner
12/29/2021, 3:18 PMcreate_flow_run accepts idempotency key so I guess if I reuse the same one from the failed run they will count as one run in the UI?
you need to set all tasks to scheduledjust to clarify, this would be needed before returning from the flow's state handler, right?
Kevin Kho
set_task_run_states mutation you can call with client.graphql. I personally think it’s a lot easier to just start a new flow run. You can also rename flow runs in the state handler by calling the RenameFlowRun task’s run method so you can keep track of the retry count in the flow run name.Michal Baumgartner
12/29/2021, 3:27 PMKevin Kho