Samuel Hinton
12/24/2024, 3:36 AMBianca Hoch
12/24/2024, 3:55 PMretry_condition_fn
with a custom retry handler to achieve this. Here's a small example for you:
from prefect import task, flow
def no_retry(task, task_run, exception):
return False # Always return False to prevent retries
@task(retries=2, retry_delay_seconds=5, retry_condition_fn=no_retry)
def my_task():
raise ValueError("This task will fail without retrying")
@flow
def my_flow():
my_task()
if __name__ == "__main__":
my_flow()
Bianca Hoch
12/24/2024, 3:57 PMimport httpx
from prefect import flow, task
def retry_handler(task, task_run, state) -> bool:
"""This is a custom retry handler to handle when we want to retry a task"""
try:
# Attempt to get the result of the task
state.result()
except httpx.HTTPStatusError as exc:
# Retry on any HTTP status code that is not 401 or 404
do_not_retry_on_these_codes = [401, 404]
return exc.response.status_code not in do_not_retry_on_these_codes
except httpx.ConnectError:
# Do not retry
return False
# For any other exception, retry
return True
@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
response = httpx.get(url)
response.raise_for_status()
return response.json()
@flow
def get_data_flow(url):
my_api_call_task(url=url)
if __name__ == "__main__":
get_data_flow(url="<https://httpbin.org/status/503>")
Samuel Hinton
12/24/2024, 9:01 PM