<@ULVA73B9P>, how can I define what should be done...
# ask-marvin
p
@Marvin, how can I define what should be done when a task fails and before it is retried?
m
In Prefect 3.x, you can define custom behavior for when a task fails and before it is retried using hooks and retry conditions. Here's a concise guide: 1. Retry Conditions: You can specify a retry condition function using
retry_condition_fn
, which is a callable that returns a boolean. If it returns
True
, the task will be retried; otherwise, it won't. This allows you to define custom logic for when retries should occur. 2. Retry Delays: You can customize the delay between retries using
retry_delay_seconds
, which accepts a list of integers to specify different delays for each retry attempt. 3. Failure Hooks: Prefect allows you to define hooks that execute when a task fails. You can set
on_failure_hooks
to a list of callables that will be executed if the task enters a failed state. This is where you can define any cleanup or logging that should occur before a retry. Here's a simple example of how you might define a task with these settings:
Copy code
python
from prefect import task

def custom_retry_condition(exception):
    # Define your custom logic here
    return isinstance(exception, SomeSpecificException)

def failure_hook(task_run):
    print(f"Task {task_run.name} failed!")

@task(retries=3, retry_delay_seconds=[1, 10, 100], retry_condition_fn=custom_retry_condition, on_failure_hooks=[failure_hook])
def my_task():
    # Task logic here
    pass
For more detailed information, you can refer to the Prefect 3.x task documentation.
p
@Marvin, how can I pass arguments to the failure hook?
m
In Prefect 3.x, failure hooks can receive specific arguments that are automatically passed to them when they are called. These arguments typically include the task or flow, the task or flow run, and the state. Here's how you can define a failure hook and use these arguments:
Copy code
python
from prefect import task

def failure_hook(task, task_run, state):
    print(f"Task {task.name} failed with state: {state}")

@task(on_failure_hooks=[failure_hook])
def my_task():
    # Task logic here
    pass
In this example, when the task fails, the
failure_hook
function is called with the task, task run, and state as arguments. You can then use these arguments within your hook to perform any necessary actions or logging. For further details, refer to the Prefect 3.x documentation on managing states and hooks.
p
@Marvin, how can I pass to a failure hook, arguments passed to the task itself?
m
To pass arguments from a task to a failure hook in Prefect 3.x, you can use the
with_options
method to pass arbitrary keyword arguments (
**kwargs
) to your hooks. Here's how you can achieve this:
Copy code
python
from prefect import task

def failure_hook(task, task_run, state, **kwargs):
    print(f"Task {task.name} failed with state: {state}")
    print(f"Task arguments: {kwargs}")

@task
def my_task(arg1, arg2):
    # Task logic here
    # To simulate failure
    raise ValueError("Intentional failure for testing")

# Define the task with options to pass arguments to the hook
my_task_with_hook = my_task.with_options(
    on_failure_hooks=[failure_hook],
    kwargs={"arg1": "value1", "arg2": "value2"}
)

# Run the task
my_task_with_hook()
In this example,
my_task_with_hook
is a version of
my_task
that includes failure hooks. The
kwargs
parameter in
with_options
allows you to specify the task arguments that you want to pass to the failure hook. When the task fails, the
failure_hook
will receive these arguments and can use them accordingly. For more information, you can refer to the Prefect 3.x documentation on state change hooks.
n
marvin's a little off here, there's an example but you just
partial
the kwargs to the hook if needed, so
Copy code
my_task_with_hook = my_task.with_options(
    on_failure_hooks=[partial(failure_hook, **dict(arg1="value1", arg2="value2"))],
)
p
Thanks @Nate!