Krzysztof Ciesielski
05/18/2022, 1:10 PMmax_retries
+ retry_delay
task parameters. I wanted to wrap some of my tasks with retries, and I thought it should be enough to add these parameters to the @task
annotation. However, when a task raises an exception, it shows in the Prefect Cloud UI that it's state is "Retrying Task (after attempt 1 of 61)", but after a short while the entire flow gets cancelled.
Example:
@task(log_stdout=True, max_retries=60, retry_delay=timedelta(seconds=10))
def perform_something(param1: Optional[str],
param2: any,
year_month_str: str,
environment_label: str) -> datetime:
Kevin Kho
Krzysztof Ciesielski
05/19/2022, 5:55 AMKevin Kho
from prefect import Flow, task
from datetime import timedelta
@task(log_stdout=True, max_retries=60, retry_delay=timedelta(seconds=10))
def abc():
raise ValueError()
return 1
with Flow("retry test") as flow:
abc()
flow.register("databricks")
Do you have any advice how to reproduce?Krzysztof Ciesielski
05/20/2022, 9:13 AMKevin Kho
Krzysztof Ciesielski
06/09/2022, 10:29 AMfor task in flow_object.tasks:
task.state_handlers.append(u.failed_state_handler)
def failed_state_handler(task, old_state: State, new_state: State):
if new_state.is_failed():
flow_run_id = prefect.context.flow_run_id
CancelFlowRun(flow_run_id=flow_run_id).run()
return new_state
This was written quite a while ago by someone else, and was left unnoticed 🙂 Removing this custom handler solves our issue with retries 🦜Kevin Kho