Giacomo Chiarella
02/18/2025, 3:55 PMMarvin
02/18/2025, 3:55 PMretry_delay_seconds
and retry_condition_fn
, but these settings are fixed once the task starts running.
If you want to dynamically control retries based on task execution, you'll need to implement logic outside the task definition, such as conditionally re-running the task or using flow-level control.
For more details on task retries, you can refer to the Prefect 3.x documentation.Bianca Hoch
02/18/2025, 10:44 PMGiacomo Chiarella
02/19/2025, 8:14 AMif any of the tasks is not in completed state then raise an exception
at the end of the flow. To do this, I gathered all the futures and Iām passing them to a last task which checks all the tasks states and before failing on purpose I send a notification elsewhere. Here is the thing: setting the retries property makes the last task executes more than once always (because it rises an exception which makes the task fails) having multiple same notifications. Instead I want the task retries only if it fails the sending notification part and not when on purpose rises the exception. Is it possible anyhow? If not any suggestion how can I overcome this?Bianca Hoch
02/20/2025, 9:24 PMInstead I want the task retries only if it fails the sending notification part and not when on purpose rises the exception.If you can capture the notification failure, and you're able to differentiate it from another type of exception, setting a
retry_condition_fn
shuld help you out here. Here's a little example for you to follow along with:
#Custom retry handler
def retry_handler(task, task_run, state) -> bool:
"""Custom retry handler that specifies when to retry a task"""
try:
# Attempt to get the result of the task
state.result()
except Exception as e:
#If Exception is a TimeoutError, retry
if isinstance(e, TimeoutError):
return True
#For any other exception, do not retry
return False
#The retry_condition_fn parameter allows us to specify a custom retry condition function
@task(retries=10, retry_condition_fn=retry_handler)
def get_recent_games(team_name, start_date, end_date):
# Generate random number
failure_chance = random.random()
# Simulate different types of failures
if failure_chance < 0.3:
time.sleep(2) #Allow us to see the retries in action
raise Exception("Simulated API failure: MLB Stats API is temporarily unavailable")
elif failure_chance >= 0.4:
time.sleep(2) #Allow us to see the retries in action
raise TimeoutError("Simulated timeout: Request took too long") # Simulate empty response
# If no failure, proceed with actual API call
team = statsapi.lookup_team(team_name)
schedule = statsapi.schedule(team=team[0]["id"], start_date=start_date, end_date=end_date)
for game in schedule:
print(game['game_id'])
return [game['game_id'] for game in schedule]
Giacomo Chiarella
02/21/2025, 8:03 AM