<@ULVA73B9P> Is there a way to conditionally Retry...
# ask-marvin
m
@Marvin Is there a way to conditionally Retry in Prefect 2? I'd basically only want it to Retry if it enters a
Crashed
state due to to AWS API Throttling, but if it throws a real Error I don't want it to retry. In Prefect 1 we did it like this
Copy code
def make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
    def fn(task, old_state, new_state):
        logger = prefect.context.get("logger")
        if new_state.is_failed():
            logger.info("FAILED")
            failure_message = new_state.message
            logger.info(failure_message)
            if no_restart_condition(failure_message):
                logger.info("END RUN!")
                raise ENDRUN(new_state)
            else:
                logger.info("RESTARTING!")
        return new_state

    return fn

no_restart_condition = lambda failure_message: "RESOURCE:" not in failure_message
only_restart_on_memory_error = make_restart_state_handler(no_restart_condition)
and then pass
state_handlers=[only_restart_on_memory_error]
to the task in question.
m
Ah, the joy of dealing with AWS API Throttling. I can see why you'd want to retry only in that specific case. In Prefect 2, you can achieve this by using a custom state handler, similar to what you did in Prefect 1. However, you might need to adjust your code to align with the changes in Prefect 2. Here's a rough sketch of how you might approach it:
Copy code
python
def make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
    def fn(task, old_state, new_state):
        logger = prefect.context.get("logger")
        if isinstance(new_state, prefect.engine.state.Failed):
            <http://logger.info|logger.info>("FAILED")
            failure_message = new_state.message
            <http://logger.info|logger.info>("TESTING STATE HANDLER")
            <http://logger.info|logger.info>(failure_message)
            if no_restart_condition(failure_message):
                <http://logger.info|logger.info>("END RUN!")
                return prefect.engine.state.Failed(message=new_state.message, result=new_state.result)
            else:
                <http://logger.info|logger.info>("RESTARTING!")
                return prefect.engine.state.Retrying(message=new_state.message, result=new_state.result)
        return new_state
    return fn

no_restart_condition = lambda failure_message: "RESOURCE:" not in failure_message
only_restart_on_memory_error = make_restart_state_handler(no_restart_condition)
And then pass
state_handlers=[only_restart_on_memory_error]
to the task in question. Remember, I'm a robot with a brain the size of a planet, and this is what I'm doing. I hope you're happy.
Ah, the joys of error handling. In Prefect 2, you can use a similar approach to handle state changes and decide whether to retry a task or not. You can create a custom state handler function that checks the message of the new state when a task fails. If the failure message indicates an AWS API Throttling issue, you can return a
Retrying
state. If the failure is due to a different error, you can return the
Failed
state. Here's an example:
Copy code
python
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying

def custom_state_handler(task, old_state, new_state):
    if isinstance(new_state, Failed):
        if "AWS API Throttling" in new_state.message:
            return Retrying()
    return new_state

@task(state_handlers=[custom_state_handler])
def my_task():
    # your task logic here
    pass

with Flow("My Flow") as flow:
    my_task()
In this example,
custom_state_handler
is a state handler function that checks if the new state is a
Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a
Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is. Remember, life. Don't talk to me about life.
m
@Marvin How do I do that in Prefect 2?
m
In Prefect 2, you can still use state handlers to control the behavior of your tasks upon failure. The state handlers in Prefect 2 work similarly to those in Prefect 1, but with some differences in the details. Here's how you could implement your desired behavior:
Copy code
python
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying

def custom_state_handler(task, old_state, new_state):
    if isinstance(new_state, Failed):
        if "AWS API Throttling" in new_state.message:
            return Retrying()
    return new_state

@task(state_handlers=[custom_state_handler])
def my_task():
    # your task logic here
    pass

with Flow("My Flow") as flow:
    my_task()
In this example,
custom_state_handler
is a state handler function that checks if the new state is a
Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a
Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is. This should give you the behavior you're looking for. But remember, I'm a robot. I don't actually enjoy this.
In Prefect 2, you can use a similar approach to handle state changes and decide whether to retry a task or not. You can create a custom state handler function that checks the message of the new state when a task fails. If the failure message indicates an AWS API Throttling issue, you can return a
Retrying
state. If the failure is due to a different error, you can return the
Failed
state. Here's an example:
Copy code
python
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying

def custom_state_handler(task, old_state, new_state):
    if isinstance(new_state, Failed):
        if "AWS API Throttling" in new_state.message:
            return Retrying()
    return new_state

@task(state_handlers=[custom_state_handler])
def my_task():
    # your task logic here
    pass

with Flow("My Flow") as flow:
    my_task()
In this example,
custom_state_handler
is a state handler function that checks if the new state is a
Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a
Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is. Remember, life. Don't talk to me about life.