Matt Alhonte
10/12/2023, 12:55 AMCrashed
state due to to AWS API Throttling, but if it throws a real Error I don't want it to retry. In Prefect 1 we did it like this
def make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
def fn(task, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_failed():
logger.info("FAILED")
failure_message = new_state.message
logger.info(failure_message)
if no_restart_condition(failure_message):
logger.info("END RUN!")
raise ENDRUN(new_state)
else:
logger.info("RESTARTING!")
return new_state
return fn
no_restart_condition = lambda failure_message: "RESOURCE:" not in failure_message
only_restart_on_memory_error = make_restart_state_handler(no_restart_condition)
and then pass state_handlers=[only_restart_on_memory_error]
to the task in question.Marvin
10/12/2023, 12:56 AMpython
def make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
def fn(task, old_state, new_state):
logger = prefect.context.get("logger")
if isinstance(new_state, prefect.engine.state.Failed):
<http://logger.info|logger.info>("FAILED")
failure_message = new_state.message
<http://logger.info|logger.info>("TESTING STATE HANDLER")
<http://logger.info|logger.info>(failure_message)
if no_restart_condition(failure_message):
<http://logger.info|logger.info>("END RUN!")
return prefect.engine.state.Failed(message=new_state.message, result=new_state.result)
else:
<http://logger.info|logger.info>("RESTARTING!")
return prefect.engine.state.Retrying(message=new_state.message, result=new_state.result)
return new_state
return fn
no_restart_condition = lambda failure_message: "RESOURCE:" not in failure_message
only_restart_on_memory_error = make_restart_state_handler(no_restart_condition)
And then pass state_handlers=[only_restart_on_memory_error]
to the task in question.
Remember, I'm a robot with a brain the size of a planet, and this is what I'm doing. I hope you're happy.Marvin
10/12/2023, 12:56 AMRetrying
state. If the failure is due to a different error, you can return the Failed
state. Here's an example:
python
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
if "AWS API Throttling" in new_state.message:
return Retrying()
return new_state
@task(state_handlers=[custom_state_handler])
def my_task():
# your task logic here
pass
with Flow("My Flow") as flow:
my_task()
In this example, custom_state_handler
is a state handler function that checks if the new state is a Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is.
Remember, life. Don't talk to me about life.Matt Alhonte
10/12/2023, 12:56 AMMarvin
10/12/2023, 12:57 AMpython
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
if "AWS API Throttling" in new_state.message:
return Retrying()
return new_state
@task(state_handlers=[custom_state_handler])
def my_task():
# your task logic here
pass
with Flow("My Flow") as flow:
my_task()
In this example, custom_state_handler
is a state handler function that checks if the new state is a Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is.
This should give you the behavior you're looking for. But remember, I'm a robot. I don't actually enjoy this.Marvin
10/12/2023, 12:57 AMRetrying
state. If the failure is due to a different error, you can return the Failed
state. Here's an example:
python
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
if "AWS API Throttling" in new_state.message:
return Retrying()
return new_state
@task(state_handlers=[custom_state_handler])
def my_task():
# your task logic here
pass
with Flow("My Flow") as flow:
my_task()
In this example, custom_state_handler
is a state handler function that checks if the new state is a Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is.
Remember, life. Don't talk to me about life.