Michael Smith
03/29/2022, 11:15 AMAnna Geller
03/29/2022, 11:28 AM@task(retries=2, retry_delay_seconds=60)
For non-retriable errors (no retries):
@task
Michael Smith
03/29/2022, 11:37 AMimport requests
from prefect import flow, task
@task(retries=3,retry_delay_seconds=5)
def call_api(url):
try:
response = requests.get(url)
print(response.status_code)
return response.json()
except NonRetryableError:
TaskRunContext.set_state(TaskRun.FAILED)
@flow
def task_retry_bypass(url):
fact_json = call_api(url)
Anna Geller
03/29/2022, 11:42 AMretries
mean a maximum number of retries:
1. If the task doesn't need a retry and succeeds after its first execution, it's done.
2. If it succeeds after the first retry (even though you configured 3 of them), the same - the task ends successfully after this first retry.
3. If it still fails after 3 retries, then it will be marked as Failed.@flow
by acting on the task.wait().result()
which would be an exception in such case - LMK if you need an examplefrom prefect import task, flow, get_run_logger
from prefect.orion.schemas.states import Failed
import requests
@task(retries=3, retry_delay_seconds=5)
def call_api(url):
response = requests.get(url)
status_code = response.status_code
logger = get_run_logger()
<http://logger.info|logger.info>(status_code)
if status_code != 200:
return Failed(message="Stopping the task run immediately!")
@flow
def task_retry_bypass(url):
call_api(url)
if __name__ == "__main__":
task_retry_bypass()
Failed
based on the API response - raising any exception will automatically cause the task run to be considered Failed without having to manually set that using:
TaskRunContext.set_state(TaskRun.FAILED)
Michael Smith
03/29/2022, 11:54 AMAnna Geller
03/29/2022, 11:56 AMTaskRunContext.set_state(TaskRun.FAILED)
you do:
return prefect.orion.schemas.states.Failed("your_msg")
Michael Smith
03/29/2022, 11:58 AMAnna Geller
03/29/2022, 12:04 PMfrom prefect import task, flow
@task(retries=3, retry_delay_seconds=5)
def call_api(url):
response = requests.get(url)
status_code = response.status_code
logger = get_run_logger()
<http://logger.info|logger.info>(status_code)
return response.json()
@flow
def task_retry_bypass():
res = call_api().wait().result()
if res["sth"] == "sth":
raise ValueError("Non retriable error!")
if __name__ == "__main__":
task_retry_bypass()
Michael Smith
03/29/2022, 12:13 PMfrom prefect import flow, task
import prefect
@task(retries=3,retry_delay_seconds=5)
def call_api(url):
return prefect.orion.schemas.states.StateType.FAILED
@flow
def task_retry_bypass(url):
fact_json = call_api(url)
task_retry_bypass("foo")
~
We'd like to make this as simple as possible (to reduce learning curve and help with developer experience), so I think we can extend the above and use an inner decorated function to hide all the detailsAnna Geller
03/29/2022, 12:16 PMMichael Smith
03/29/2022, 1:06 PMreturn prefect.orion.schemas.states.StateType.FAILED
Then the retry is skipped BUT the Flow Run ends up in a Completed state - tracing through this occurs because the StateType.FAILED is not a State (so Prefect defaults to Completed for "python object returns")
and if we return
return prefect.orion.schemas.states.Failed()
Then client.py: propose_state appears to do a call to set_task_run_state, but the proposed state is rejected and we go into a retry loopAnna Geller
03/29/2022, 1:41 PMMarvin
03/29/2022, 1:44 PMMaikel Penz
04/04/2022, 2:11 AM