<@ULVA73B9P> how do I pass input to my `on_failure...
# ask-marvin
p
@Marvin how do I pass input to my
on_failure
callable so I can change input on failure? I am thinking of changing my flow or task input on retries
m
In Prefect, you can use state handlers to manage the behavior of your tasks or flows when they enter certain states. The
on_failure
state handler can be used to perform actions when a task or flow fails. However, changing the input of a task or flow on retries is not directly supported by the
on_failure
state handler. The state handlers are not meant to modify the state of the task or flow, but rather to perform side effects based on the state of the task or flow. If you want to change the behavior of a task based on whether it has failed previously, you might consider incorporating this logic into the task itself. For example, the task could check some external condition (like the presence of a certain file, or the value of a database record) and change its behavior based on that. For retrying tasks with a delay, Prefect provides a mechanism for exponential backoff. You can specify the number of retries and the delay between retries when defining your task. Here's an example:
Copy code
from prefect import task
from prefect.tasks import exponential_backoff

@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=10), retry_jitter_factor=1)
def some_task_with_exponential_backoff_retries():
    ...
In this example, the task will be retried three times if it fails, with an exponential backoff delay between retries. The
retry_jitter_factor
parameter adds some randomness to the delay to prevent "thundering herds". Remember that these settings will not override the retries or retry_delay_seconds that are set in the flow or task decorator. For more complex retry logic, you might need to implement your own custom logic.
p
@Marvin how can I then track number of retries in a task or flow run?
m
In Prefect, the number of retries for a task or flow run can be tracked using the
state
attribute of a
TaskRun
or
FlowRun
object. The
state
attribute contains information about the current state of the task or flow run, including whether it is currently being retried and how many retries have occurred. However, there is no direct way to get the number of retries from the
state
attribute. You would need to implement custom logic to track the number of retries. For example, you could increment a counter each time a task or flow run enters the
Retrying
state. Here is an example of how you might do this:
Copy code
from prefect import Flow, task
from prefect.engine.state import Retrying

@task
def my_task():
    ...

with Flow("My Flow") as flow:
    my_task()

state = flow.run()
if isinstance(state, Retrying):
    print(f"Number of retries: {state.loop_count}")
In this example,
state.loop_count
gives the number of retries that have occurred. Please note that this is a simplified example and might not cover all use cases. For example, if a task is retried multiple times, you would need to ensure that the counter is incremented correctly each time. Also, please note that the
state
attribute and its properties are subject to change in future versions of Prefect, so this approach might not work in all versions. For more complex use cases, you might need to use Prefect's logging or monitoring features to track the number of retries. For example, you could log a message each time a task is retried, and then use a log analysis tool to count the number of retry messages.
p
@Marvin I dont think there is anything as state.loop_count
m
I apologize for the confusion earlier. You're correct that there's no direct
state.loop_count
attribute in Prefect. However, there's an open issue on Prefect's GitHub repository discussing the implementation of manual flow retries, which includes tracking whether a flow is retrying or not. You can find it here. In the current version of Prefect, there's no built-in way to track the number of retries for a task or flow run. You would need to implement custom logic to track this. One way could be to use a global variable or an external database to increment a counter each time a task or flow run enters the
Retrying
state. I recommend keeping an eye on the Prefect GitHub repository for updates on this feature.
p
@Marvin I noticed
on_failure
hook does not get triggered when exception is thrown in the task. any reason why?