Hi again, we have a requirement to handle two clas...
# prefect-community
Hi again, we have a requirement to handle two classes of errors: retryable errors and non-retryable errors. I think I saw some examples how this might be achieved somewhere but cant spot them for Prefect 2. In the non-retryable scenario we'd want the task state to go immediately to Failed (bypassing any retry settings configured on the task decorator)
In Prefect 2.0 you can do: For retriable errors:
Copy code
@task(retries=2, retry_delay_seconds=60)
For non-retriable errors (no retries):
Copy code
But if I have a single task that can throw either type of error? Is there a context object that can be manipulated, I was thinking something similar to this?
Copy code
import requests
from prefect import flow, task

def call_api(url):
        response = requests.get(url)
        return response.json()
    except NonRetryableError:

def task_retry_bypass(url):
    fact_json = call_api(url)
Generally speaking, there is no need to use try-except in a Prefect flow. That's why you use Prefect so that you don't need to worry about all the different types of exceptions that might need to be caught. The
mean a maximum number of retries: 1. If the task doesn't need a retry and succeeds after its first execution, it's done. 2. If it succeeds after the first retry (even though you configured 3 of them), the same - the task ends successfully after this first retry. 3. If it still fails after 3 retries, then it will be marked as Failed.
But if you want e.g. to do something if the task throws a specific exception, you could do it within a
by acting on the
which would be an exception in such case - LMK if you need an example
Copy code
from prefect import task, flow, get_run_logger
from prefect.orion.schemas.states import Failed
import requests

@task(retries=3, retry_delay_seconds=5)
def call_api(url):
    response = requests.get(url)
    status_code = response.status_code
    logger = get_run_logger()
    if status_code != 200:
        return Failed(message="Stopping the task run immediately!")

def task_retry_bypass(url):

if __name__ == "__main__":
this way you explicitly decide when the task run is supposed to be considered as
based on the API response - raising any exception will automatically cause the task run to be considered Failed without having to manually set that using:
Copy code
Is overriding the task run state an option, it would allow us to benefit from the task level retry configuration - but also skip that when needed. A common scenario would be distinguishing between transient system failures (e.g. a service unavailable) vs permanent errors arising from unexpected application state (e.g. once an object has been deleted there's no point retrying any attempt to modify it)
Good question. You can return any State in your task to influence that: https://orion-docs.prefect.io/concepts/states/
Basically, instead of:
Copy code
you do:
Copy code
return prefect.orion.schemas.states.Failed("your_msg")
Thanks Anna, will give that a try
đź‘Ť 1
You could even do it on a flow level - having normal retries on a task level, but if the task result returns a specific value, you raise an exception on a flow level and end the flow run immediately - just mentioning this because it's also a valid option:
Copy code
from prefect import task, flow

@task(retries=3, retry_delay_seconds=5)
def call_api(url):
    response = requests.get(url)
    status_code = response.status_code
    logger = get_run_logger()
    return response.json()

def task_retry_bypass():
    res = call_api().wait().result()
    if res["sth"] == "sth":
        raise ValueError("Non retriable error!")

if __name__ == "__main__":
This one worked (no retries)
Copy code
from prefect import flow, task
import prefect

def call_api(url):
    return prefect.orion.schemas.states.StateType.FAILED

def task_retry_bypass(url):
    fact_json = call_api(url)

We'd like to make this as simple as possible (to reduce learning curve and help with developer experience), so I think we can extend the above and use an inner decorated function to hide all the details
Sure, you can customize it as you wish, it's pretty much just Python so the sky is the limit! 🙂
I hit a probem with this...if the return is
Copy code
return prefect.orion.schemas.states.StateType.FAILED
Then the retry is skipped BUT the Flow Run ends up in a Completed state - tracing through this occurs because the StateType.FAILED is not a State (so Prefect defaults to Completed for "python object returns") and if we return
Copy code
return prefect.orion.schemas.states.Failed()
Then client.py: propose_state appears to do a call to set_task_run_state, but the proposed state is rejected and we go into a retry loop
This is getting a little too involved, most of our workflows tend to be idempotent system operations, so we may be able to handle this in a different way (checking result payload in the flow as you indicated). Would be nice if Prefect had additional Exception filters it could apply for retry (similar to the tenacity library)
Thanks for the suggestion, I'll pass it on to the product team
@Marvin open “Orion: feature request to add Exception filters applicable for retries”
I’m facing a similar scenario.. When not worth retrying a task I’d like to bypass the retry configuration
đź‘Ť 1