Pascal Jahan Elahi (Pawsey)
01/24/2025, 2:27 AMMarvin
01/24/2025, 2:27 AMretry_condition_fn
, which is a callable that returns a boolean. If it returns True
, the task will be retried; otherwise, it won't. This allows you to define custom logic for when retries should occur.
2. Retry Delays: You can customize the delay between retries using retry_delay_seconds
, which accepts a list of integers to specify different delays for each retry attempt.
3. Failure Hooks: Prefect allows you to define hooks that execute when a task fails. You can set on_failure_hooks
to a list of callables that will be executed if the task enters a failed state. This is where you can define any cleanup or logging that should occur before a retry.
Here's a simple example of how you might define a task with these settings:
python
from prefect import task
def custom_retry_condition(exception):
# Define your custom logic here
return isinstance(exception, SomeSpecificException)
def failure_hook(task_run):
print(f"Task {task_run.name} failed!")
@task(retries=3, retry_delay_seconds=[1, 10, 100], retry_condition_fn=custom_retry_condition, on_failure_hooks=[failure_hook])
def my_task():
# Task logic here
pass
For more detailed information, you can refer to the Prefect 3.x task documentation.Pascal Jahan Elahi (Pawsey)
01/24/2025, 2:31 AMMarvin
01/24/2025, 2:31 AMpython
from prefect import task
def failure_hook(task, task_run, state):
print(f"Task {task.name} failed with state: {state}")
@task(on_failure_hooks=[failure_hook])
def my_task():
# Task logic here
pass
In this example, when the task fails, the failure_hook
function is called with the task, task run, and state as arguments. You can then use these arguments within your hook to perform any necessary actions or logging.
For further details, refer to the Prefect 3.x documentation on managing states and hooks.Pascal Jahan Elahi (Pawsey)
01/24/2025, 2:32 AMMarvin
01/24/2025, 2:32 AMwith_options
method to pass arbitrary keyword arguments (**kwargs
) to your hooks. Here's how you can achieve this:
python
from prefect import task
def failure_hook(task, task_run, state, **kwargs):
print(f"Task {task.name} failed with state: {state}")
print(f"Task arguments: {kwargs}")
@task
def my_task(arg1, arg2):
# Task logic here
# To simulate failure
raise ValueError("Intentional failure for testing")
# Define the task with options to pass arguments to the hook
my_task_with_hook = my_task.with_options(
on_failure_hooks=[failure_hook],
kwargs={"arg1": "value1", "arg2": "value2"}
)
# Run the task
my_task_with_hook()
In this example, my_task_with_hook
is a version of my_task
that includes failure hooks. The kwargs
parameter in with_options
allows you to specify the task arguments that you want to pass to the failure hook. When the task fails, the failure_hook
will receive these arguments and can use them accordingly.
For more information, you can refer to the Prefect 3.x documentation on state change hooks.Nate
01/24/2025, 2:42 AMpartial
the kwargs to the hook if needed, so
my_task_with_hook = my_task.with_options(
on_failure_hooks=[partial(failure_hook, **dict(arg1="value1", arg2="value2"))],
)
Nate
01/24/2025, 2:42 AMPascal Jahan Elahi (Pawsey)
01/24/2025, 3:35 AM