Hi again, we have a requirement to handle two clas...
# prefect-community
m
Hi again, we have a requirement to handle two classes of errors: retryable errors and non-retryable errors. I think I saw some examples how this might be achieved somewhere but cant spot them for Prefect 2. In the non-retryable scenario we'd want the task state to go immediately to Failed (bypassing any retry settings configured on the task decorator)
a
In Prefect 2.0 you can do: For retriable errors:
Copy code
@task(retries=2, retry_delay_seconds=60)
For non-retriable errors (no retries):
Copy code
@task
m
But if I have a single task that can throw either type of error? Is there a context object that can be manipulated, I was thinking something similar to this?
Copy code
import requests
from prefect import flow, task

@task(retries=3,retry_delay_seconds=5)
def call_api(url):
    try: 
        response = requests.get(url)
        print(response.status_code)
        return response.json()
    except NonRetryableError:
        TaskRunContext.set_state(TaskRun.FAILED)

@flow
def task_retry_bypass(url):
    fact_json = call_api(url)
a
Generally speaking, there is no need to use try-except in a Prefect flow. That's why you use Prefect so that you don't need to worry about all the different types of exceptions that might need to be caught. The
retries
mean a maximum number of retries: 1. If the task doesn't need a retry and succeeds after its first execution, it's done. 2. If it succeeds after the first retry (even though you configured 3 of them), the same - the task ends successfully after this first retry. 3. If it still fails after 3 retries, then it will be marked as Failed.
But if you want e.g. to do something if the task throws a specific exception, you could do it within a
@flow
by acting on the
task.wait().result()
which would be an exception in such case - LMK if you need an example
Copy code
from prefect import task, flow, get_run_logger
from prefect.orion.schemas.states import Failed
import requests


@task(retries=3, retry_delay_seconds=5)
def call_api(url):
    response = requests.get(url)
    status_code = response.status_code
    logger = get_run_logger()
    <http://logger.info|logger.info>(status_code)
    if status_code != 200:
        return Failed(message="Stopping the task run immediately!")


@flow
def task_retry_bypass(url):
    call_api(url)


if __name__ == "__main__":
    task_retry_bypass()
this way you explicitly decide when the task run is supposed to be considered as
Failed
based on the API response - raising any exception will automatically cause the task run to be considered Failed without having to manually set that using:
Copy code
TaskRunContext.set_state(TaskRun.FAILED)
m
Is overriding the task run state an option, it would allow us to benefit from the task level retry configuration - but also skip that when needed. A common scenario would be distinguishing between transient system failures (e.g. a service unavailable) vs permanent errors arising from unexpected application state (e.g. once an object has been deleted there's no point retrying any attempt to modify it)
a
Good question. You can return any State in your task to influence that: https://orion-docs.prefect.io/concepts/states/
Basically, instead of:
Copy code
TaskRunContext.set_state(TaskRun.FAILED)
you do:
Copy code
return prefect.orion.schemas.states.Failed("your_msg")
m
Thanks Anna, will give that a try
đź‘Ť 1
a
You could even do it on a flow level - having normal retries on a task level, but if the task result returns a specific value, you raise an exception on a flow level and end the flow run immediately - just mentioning this because it's also a valid option:
Copy code
from prefect import task, flow


@task(retries=3, retry_delay_seconds=5)
def call_api(url):
    response = requests.get(url)
    status_code = response.status_code
    logger = get_run_logger()
    <http://logger.info|logger.info>(status_code)
    return response.json()


@flow
def task_retry_bypass():
    res = call_api().wait().result()
    if res["sth"] == "sth":
        raise ValueError("Non retriable error!")


if __name__ == "__main__":
    task_retry_bypass()
m
This one worked (no retries)
Copy code
from prefect import flow, task
import prefect

@task(retries=3,retry_delay_seconds=5)
def call_api(url):
    return prefect.orion.schemas.states.StateType.FAILED

@flow
def task_retry_bypass(url):
    fact_json = call_api(url)

task_retry_bypass("foo")
~
We'd like to make this as simple as possible (to reduce learning curve and help with developer experience), so I think we can extend the above and use an inner decorated function to hide all the details
a
Sure, you can customize it as you wish, it's pretty much just Python so the sky is the limit! 🙂
m
I hit a probem with this...if the return is
Copy code
return prefect.orion.schemas.states.StateType.FAILED
Then the retry is skipped BUT the Flow Run ends up in a Completed state - tracing through this occurs because the StateType.FAILED is not a State (so Prefect defaults to Completed for "python object returns") and if we return
Copy code
return prefect.orion.schemas.states.Failed()
Then client.py: propose_state appears to do a call to set_task_run_state, but the proposed state is rejected and we go into a retry loop
This is getting a little too involved, most of our workflows tend to be idempotent system operations, so we may be able to handle this in a different way (checking result payload in the flow as you indicated). Would be nice if Prefect had additional Exception filters it could apply for retry (similar to the tenacity library)
a
Thanks for the suggestion, I'll pass it on to the product team
@Marvin open “Orion: feature request to add Exception filters applicable for retries”
m
m
I’m facing a similar scenario.. When not worth retrying a task I’d like to bypass the retry configuration
đź‘Ť 1