Michal Baumgartner
12/29/2021, 2:29 PMKevin Kho
Michal Baumgartner
12/29/2021, 3:04 PMKevin Kho
Michal Baumgartner
12/29/2021, 3:11 PMScheduled
suffice?Kevin Kho
def flow_retry(flow, old_state, new_state):
if new_state.is_failed():
retry_count = prefect.context.parameters["RETRY_COUNT"]
if retry_count < 3:
from prefect.tasks.prefect import create_flow_run
create_flow_run.run(flow_name="..", project_name="...", parameters={"RETRY_COUNT": (retry_count+1)})
with Flow(..., state_handlers=[flow_retry]):
retry_count = Parameter("RETRY_COUNT", 0)()
Use the create_flow_run
task but just call the run method in the state handler. This task gives the most flexibility with starting flow runsKevin Kho
Michal Baumgartner
12/29/2021, 3:18 PMcreate_flow_run
accepts idempotency key so I guess if I reuse the same one from the failed run they will count as one run in the UI?
you need to set all tasks to scheduledjust to clarify, this would be needed before returning from the flow's state handler, right?
Kevin Kho
set_task_run_states
mutation you can call with client.graphql
. I personally think it’s a lot easier to just start a new flow run. You can also rename flow runs in the state handler by calling the RenameFlowRun
task’s run method so you can keep track of the retry count in the flow run name.Michal Baumgartner
12/29/2021, 3:27 PMKevin Kho