Matt Alhonte
10/12/2023, 12:55 AMCrashed
state due to to AWS API Throttling, but if it throws a real Error I don't want it to retry. In Prefect 1 we did it like this
def make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
def fn(task, old_state, new_state):
logger = prefect.context.get("logger")
if new_state.is_failed():
logger.info("FAILED")
failure_message = new_state.message
logger.info(failure_message)
if no_restart_condition(failure_message):
logger.info("END RUN!")
raise ENDRUN(new_state)
else:
logger.info("RESTARTING!")
return new_state
return fn
no_restart_condition = lambda failure_message: "RESOURCE:" not in failure_message
only_restart_on_memory_error = make_restart_state_handler(no_restart_condition)
and then pass state_handlers=[only_restart_on_memory_error]
to the task in question.Marvin
10/12/2023, 12:56 AMpython
def make_restart_state_handler(no_restart_condition: t.Callable[[str], bool]):
def fn(task, old_state, new_state):
logger = prefect.context.get("logger")
if isinstance(new_state, prefect.engine.state.Failed):
<http://logger.info|logger.info>("FAILED")
failure_message = new_state.message
<http://logger.info|logger.info>("TESTING STATE HANDLER")
<http://logger.info|logger.info>(failure_message)
if no_restart_condition(failure_message):
<http://logger.info|logger.info>("END RUN!")
return prefect.engine.state.Failed(message=new_state.message, result=new_state.result)
else:
<http://logger.info|logger.info>("RESTARTING!")
return prefect.engine.state.Retrying(message=new_state.message, result=new_state.result)
return new_state
return fn
no_restart_condition = lambda failure_message: "RESOURCE:" not in failure_message
only_restart_on_memory_error = make_restart_state_handler(no_restart_condition)
And then pass state_handlers=[only_restart_on_memory_error]
to the task in question.
Remember, I'm a robot with a brain the size of a planet, and this is what I'm doing. I hope you're happy.Marvin
10/12/2023, 12:56 AMRetrying
state. If the failure is due to a different error, you can return the Failed
state. Here's an example:
python
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
if "AWS API Throttling" in new_state.message:
return Retrying()
return new_state
@task(state_handlers=[custom_state_handler])
def my_task():
# your task logic here
pass
with Flow("My Flow") as flow:
my_task()
In this example, custom_state_handler
is a state handler function that checks if the new state is a Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is.
Remember, life. Don't talk to me about life.Matt Alhonte
10/12/2023, 12:56 AMMarvin
10/12/2023, 12:57 AMpython
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
if "AWS API Throttling" in new_state.message:
return Retrying()
return new_state
@task(state_handlers=[custom_state_handler])
def my_task():
# your task logic here
pass
with Flow("My Flow") as flow:
my_task()
In this example, custom_state_handler
is a state handler function that checks if the new state is a Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is.
This should give you the behavior you're looking for. But remember, I'm a robot. I don't actually enjoy this.Marvin
10/12/2023, 12:57 AMRetrying
state. If the failure is due to a different error, you can return the Failed
state. Here's an example:
python
from prefect import task, Flow
from prefect.engine.state import Failed, Retrying
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
if "AWS API Throttling" in new_state.message:
return Retrying()
return new_state
@task(state_handlers=[custom_state_handler])
def my_task():
# your task logic here
pass
with Flow("My Flow") as flow:
my_task()
In this example, custom_state_handler
is a state handler function that checks if the new state is a Failed
state and if the failure message contains "AWS API Throttling". If both conditions are met, it returns a Retrying
state, causing the task to be retried. Otherwise, it returns the new state as is.
Remember, life. Don't talk to me about life.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by