:wave: everyone, is there a way to restart the flo...
# ask-community
m
👋 everyone, is there a way to restart the flow (up to N times) if any task throws a specific exception (e.g. the underlying dask cluster/scheduler dies)? (preferably without storing task results and with checkpointing disabled)
k
Hi @Michal Baumgartner, there is currently none, but I have an idea. Do you want to restart the whole thing or just the tasks that failed?
m
The flow would be ideal, no need to restart individual tasks
k
I think I would use a Parameter with default value 0. If the Flow fails, trigger a new Flow run and add one to the parameter value. The state handler can be responsible for triggering the new job if the Parameter value is below a certain number
m
👍 and what would be the best way to trigger a new flow run? would setting the state to
Scheduled
suffice?
k
Copy code
def flow_retry(flow, old_state, new_state):
    if new_state.is_failed():
        retry_count = prefect.context.parameters["RETRY_COUNT"]
        if retry_count < 3:
            from prefect.tasks.prefect import create_flow_run
            create_flow_run.run(flow_name="..", project_name="...", parameters={"RETRY_COUNT": (retry_count+1)})

with Flow(..., state_handlers=[flow_retry]):
    retry_count = Parameter("RETRY_COUNT", 0)()
Use the
create_flow_run
task but just call the run method in the state handler. This task gives the most flexibility with starting flow runs
Oh if it’s the same flow run, you need to set all tasks to scheduled
m
I see, also it looks like
create_flow_run
accepts idempotency key so I guess if I reuse the same one from the failed run they will count as one run in the UI?
you need to set all tasks to scheduled
just to clarify, this would be needed before returning from the flow's state handler, right?
k
Yes to idempotency key, but it will just skip the already successful tasks (and failed ones too I think). Yes there it a
set_task_run_states
mutation you can call with
client.graphql
. I personally think it’s a lot easier to just start a new flow run. You can also rename flow runs in the state handler by calling the
RenameFlowRun
task’s run method so you can keep track of the retry count in the flow run name.
m
I agree, now it seems like it would be a hassle to keep the dashboard green 😄 Thanks, will definitely try the new run + rename methods 👍
k
Actually I framed the parameter here as a number that goes up, I thought about it a bit and I suggest a number that goes down to 0 so that it’s easy to parameterize the max retry count