Hello, I'm trying to wrap my head around the `max_...
# prefect-community
k
Hello, I'm trying to wrap my head around the
max_retries
+
retry_delay
task parameters. I wanted to wrap some of my tasks with retries, and I thought it should be enough to add these parameters to the
@task
annotation. However, when a task raises an exception, it shows in the Prefect Cloud UI that it's state is "Retrying Task (after attempt 1 of 61)", but after a short while the entire flow gets cancelled. Example:
Copy code
@task(log_stdout=True, max_retries=60, retry_delay=timedelta(seconds=10))
def perform_something(param1: Optional[str],
                              param2: any,
                              year_month_str: str,
                              environment_label: str) -> datetime:
k
Is there any log when the entire flow gets cancelled?
k
@Kevin Kho yes:
k
This is very weird. I’ll try to replicate
I could not replicate it with this:
Copy code
from prefect import Flow, task
from datetime import timedelta

@task(log_stdout=True, max_retries=60, retry_delay=timedelta(seconds=10))
def abc():
    raise ValueError()
    return 1

with Flow("retry test") as flow:
    abc()

flow.register("databricks")
Do you have any advice how to reproduce?
k
Thanks! I'll try to run this example with our setup. By the way, I have one hypothesis: task input parameters are outputs from other tasks, where we don't specify result type. We can't just use PrefectResult because some data isn't serializable and that's why we skipped this. All works well anyway, but maybe for restarting tasks it's strictly required to have the inputs cacheable?
k
Will try this in a bit
k
@Kevin Kho just a FYI: I found the culprit. It was a tiny piece of code located where the flows are registered. It was adding custom fail handlers:
Copy code
for task in flow_object.tasks:
        task.state_handlers.append(u.failed_state_handler)

def failed_state_handler(task, old_state: State, new_state: State):
    if new_state.is_failed():
        flow_run_id = prefect.context.flow_run_id
        CancelFlowRun(flow_run_id=flow_run_id).run()
    return new_state
This was written quite a while ago by someone else, and was left unnoticed 🙂 Removing this custom handler solves our issue with retries 🦜
k
Oh my bad I didn’t follow up. If i’m slow, just ping me next time. Glad you figured it out!