Pooyan
11/13/2023, 6:28 PMon_failure
callable so I can change input on failure? I am thinking of changing my flow or task input on retriesMarvin
11/13/2023, 6:29 PMon_failure
state handler can be used to perform actions when a task or flow fails.
However, changing the input of a task or flow on retries is not directly supported by the on_failure
state handler. The state handlers are not meant to modify the state of the task or flow, but rather to perform side effects based on the state of the task or flow.
If you want to change the behavior of a task based on whether it has failed previously, you might consider incorporating this logic into the task itself. For example, the task could check some external condition (like the presence of a certain file, or the value of a database record) and change its behavior based on that.
For retrying tasks with a delay, Prefect provides a mechanism for exponential backoff. You can specify the number of retries and the delay between retries when defining your task. Here's an example:
from prefect import task
from prefect.tasks import exponential_backoff
@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=10), retry_jitter_factor=1)
def some_task_with_exponential_backoff_retries():
...
In this example, the task will be retried three times if it fails, with an exponential backoff delay between retries. The retry_jitter_factor
parameter adds some randomness to the delay to prevent "thundering herds".
Remember that these settings will not override the retries or retry_delay_seconds that are set in the flow or task decorator.
For more complex retry logic, you might need to implement your own custom logic.Pooyan
11/13/2023, 6:33 PMMarvin
11/13/2023, 6:34 PMstate
attribute of a TaskRun
or FlowRun
object. The state
attribute contains information about the current state of the task or flow run, including whether it is currently being retried and how many retries have occurred.
However, there is no direct way to get the number of retries from the state
attribute. You would need to implement custom logic to track the number of retries. For example, you could increment a counter each time a task or flow run enters the Retrying
state.
Here is an example of how you might do this:
from prefect import Flow, task
from prefect.engine.state import Retrying
@task
def my_task():
...
with Flow("My Flow") as flow:
my_task()
state = flow.run()
if isinstance(state, Retrying):
print(f"Number of retries: {state.loop_count}")
In this example, state.loop_count
gives the number of retries that have occurred.
Please note that this is a simplified example and might not cover all use cases. For example, if a task is retried multiple times, you would need to ensure that the counter is incremented correctly each time.
Also, please note that the state
attribute and its properties are subject to change in future versions of Prefect, so this approach might not work in all versions.
For more complex use cases, you might need to use Prefect's logging or monitoring features to track the number of retries. For example, you could log a message each time a task is retried, and then use a log analysis tool to count the number of retry messages.Pooyan
11/13/2023, 6:58 PMMarvin
11/13/2023, 6:59 PMstate.loop_count
attribute in Prefect. However, there's an open issue on Prefect's GitHub repository discussing the implementation of manual flow retries, which includes tracking whether a flow is retrying or not. You can find it here.
In the current version of Prefect, there's no built-in way to track the number of retries for a task or flow run. You would need to implement custom logic to track this. One way could be to use a global variable or an external database to increment a counter each time a task or flow run enters the Retrying
state.
I recommend keeping an eye on the Prefect GitHub repository for updates on this feature.Pooyan
11/13/2023, 8:13 PMon_failure
hook does not get triggered when exception is thrown in the task. any reason why?