<@ULVA73B9P> can a task change its retries propert...
# ask-community
g
@Marvin can a task change its retries property during execution?
m
In Prefect 3.x, the number of retries for a task is determined at the time the task is defined, not during its execution. You can specify custom retry behavior using options like
retry_delay_seconds
and
retry_condition_fn
, but these settings are fixed once the task starts running. If you want to dynamically control retries based on task execution, you'll need to implement logic outside the task definition, such as conditionally re-running the task or using flow-level control. For more details on task retries, you can refer to the Prefect 3.x documentation.
b
Hey Giacomo! What about the retries property would you like to change? Whether or not retries are attempted? Or something else? šŸ‘€
g
Hi @Bianca Hoch! Here is the need. To make a flow fails I need to manually check each task state and say
if any of the tasks is not in completed state then raise an exception
at the end of the flow. To do this, I gathered all the futures and I’m passing them to a last task which checks all the tasks states and before failing on purpose I send a notification elsewhere. Here is the thing: setting the retries property makes the last task executes more than once always (because it rises an exception which makes the task fails) having multiple same notifications. Instead I want the task retries only if it fails the sending notification part and not when on purpose rises the exception. Is it possible anyhow? If not any suggestion how can I overcome this?
b
Instead I want the task retries only if it fails the sending notification part and not when on purpose rises the exception.
If you can capture the notification failure, and you're able to differentiate it from another type of exception, setting a
retry_condition_fn
shuld help you out here. Here's a little example for you to follow along with:
Copy code
#Custom retry handler
def retry_handler(task, task_run, state) -> bool:
    """Custom retry handler that specifies when to retry a task"""
    try:
        # Attempt to get the result of the task
        state.result()
    except Exception as e:
        #If Exception is a TimeoutError, retry
        if isinstance(e, TimeoutError):
            return True
        #For any other exception, do not retry
        return False

#The retry_condition_fn parameter allows us to specify a custom retry condition function
@task(retries=10, retry_condition_fn=retry_handler)
def get_recent_games(team_name, start_date, end_date):
    # Generate random number
    failure_chance = random.random()
    
    # Simulate different types of failures
    if failure_chance < 0.3:
        time.sleep(2) #Allow us to see the retries in action
        raise Exception("Simulated API failure: MLB Stats API is temporarily unavailable")
    elif failure_chance >= 0.4:
        time.sleep(2) #Allow us to see the retries in action
        raise TimeoutError("Simulated timeout: Request took too long") # Simulate empty response
    
    # If no failure, proceed with actual API call
    team = statsapi.lookup_team(team_name)
    schedule = statsapi.schedule(team=team[0]["id"], start_date=start_date, end_date=end_date)
    for game in schedule:
        print(game['game_id'])
    return [game['game_id'] for game in schedule]
g
Thank you Bianca! I think that will solve šŸ™