Thread
#prefect-community
    Krzysztof Ciesielski

    Krzysztof Ciesielski

    4 months ago
    Hello, I'm trying to wrap my head around the
    max_retries
    +
    retry_delay
    task parameters. I wanted to wrap some of my tasks with retries, and I thought it should be enough to add these parameters to the
    @task
    annotation. However, when a task raises an exception, it shows in the Prefect Cloud UI that it's state is "Retrying Task (after attempt 1 of 61)", but after a short while the entire flow gets cancelled. Example:
    @task(log_stdout=True, max_retries=60, retry_delay=timedelta(seconds=10))
    def perform_something(param1: Optional[str],
                                  param2: any,
                                  year_month_str: str,
                                  environment_label: str) -> datetime:
    Kevin Kho

    Kevin Kho

    4 months ago
    Is there any log when the entire flow gets cancelled?
    Krzysztof Ciesielski

    Krzysztof Ciesielski

    4 months ago
    @Kevin Kho yes:
    Kevin Kho

    Kevin Kho

    4 months ago
    This is very weird. I’ll try to replicate
    I could not replicate it with this:
    from prefect import Flow, task
    from datetime import timedelta
    
    @task(log_stdout=True, max_retries=60, retry_delay=timedelta(seconds=10))
    def abc():
        raise ValueError()
        return 1
    
    with Flow("retry test") as flow:
        abc()
    
    flow.register("databricks")
    Do you have any advice how to reproduce?
    Krzysztof Ciesielski

    Krzysztof Ciesielski

    4 months ago
    Thanks! I'll try to run this example with our setup. By the way, I have one hypothesis: task input parameters are outputs from other tasks, where we don't specify result type. We can't just use PrefectResult because some data isn't serializable and that's why we skipped this. All works well anyway, but maybe for restarting tasks it's strictly required to have the inputs cacheable?
    Kevin Kho

    Kevin Kho

    4 months ago
    Will try this in a bit
    Krzysztof Ciesielski

    Krzysztof Ciesielski

    3 months ago
    @Kevin Kho just a FYI: I found the culprit. It was a tiny piece of code located where the flows are registered. It was adding custom fail handlers:
    for task in flow_object.tasks:
            task.state_handlers.append(u.failed_state_handler)
    
    def failed_state_handler(task, old_state: State, new_state: State):
        if new_state.is_failed():
            flow_run_id = prefect.context.flow_run_id
            CancelFlowRun(flow_run_id=flow_run_id).run()
        return new_state
    This was written quite a while ago by someone else, and was left unnoticed 🙂 Removing this custom handler solves our issue with retries 😛arty-parrot:
    Kevin Kho

    Kevin Kho

    3 months ago
    Oh my bad I didn’t follow up. If i’m slow, just ping me next time. Glad you figured it out!