<@ULVA73B9P> how to get task input in on_failure h...
# ask-marvin
i
@Marvin how to get task input in on_failure hook?
m
To get the input of a task in the
on_failure
hook, you need to utilize the state change hooks provided by Prefect. The approach differs slightly between Prefect 2.x and 3.x, but the general steps are similar. ### Prefect 2.x 1. Define the Hook Function: In your hook function, you can access the task run and its state. 2. Customize the Task: Use the
with_options
method to attach the hook to the task. Here is an example for Prefect 2.x:
Copy code
python
from prefect import flow, task
from prefect.engine.state import Failed
from functools import partial

# Define the hook function
def on_failure_hook(task, task_run, state, **kwargs):
    if isinstance(state, Failed):
        print(f"Task {task_run.name} failed with inputs: {kwargs}")

# Define the task with the on_failure option
@task
def my_task(x, y):
    if y == 0:
        raise ValueError("Division by zero")
    return x / y

# Attach the hook to the task
my_task_with_hook = my_task.with_options(on_failure=[partial(on_failure_hook, x=1, y=0)])

@flow
def my_flow():
    my_task_with_hook()

if __name__ == "__main__":
    my_flow()
### Prefect 3.x 1. Define the Hook Function: Similar to Prefect 2.x, define your custom hook function and use
state.data
to access the inputs. 2. Attach the Hook: Use the
with_options
method to attach the hook. Here's an example for Prefect 3.x:
Copy code
python
from prefect import flow, task
from prefect.exception import Failed
from functools import partial

# Define the hook function
def on_failure_hook(task, task_run, state, **kwargs):
    if isinstance(state, Failed):
        print(f"Task {task_run.name} failed with inputs: {kwargs}")

# Define the task with the on_failure option
@task
def my_task(x, y):
    if y == 0:
        raise ValueError("Division by zero")
    return x / y

# Attach the hook to the task
my_task_with_hook = my_task.with_options(on_failure=[partial(on_failure_hook, x=1, y=0)])

@flow
def my_flow():
    my_task_with_hook()

if __name__ == "__main__":
    my_flow()
In both cases, the
on_failure_hook
will be called when the task fails, and it will print the inputs provided to the task. References: - Prefect 2.x State Change Hooks Documentation - Prefect 3.x Manage States Documentation
i
@Marvin what if I call task using .map function?
m
When using the
.map
function in Prefect tasks, setting up the
on_failure
hook requires special handling due to the distributed nature of mapped tasks. Here's how you can achieve it for both Prefect 2.x and 3.x. ### Prefect 2.x 1. Define Hook Function: Define a custom hook to handle the failure of each task. 2. Attach Hook with Options: Use the
with_options
method to attach the hook to the mapped task. Example:
Copy code
python
from prefect import flow, task
from prefect.engine.state import Failed
from functools import partial

# Define the hook function
def on_failure_hook(task, task_run, state, **kwargs):
    if isinstance(state, Failed):
        print(f"Task {task_run.name} failed with inputs: {kwargs}")

# Define the task with the on_failure option
@task
def my_task(x, y):
    if y == 0:
        raise ValueError("Division by zero")
    return x / y

# Attach the hook to the task
my_mapped_task = my_task.with_options(on_failure=[partial(on_failure_hook, y=[0])])

@flow
def my_flow():
    results = my_mapped_task.map(x=[1, 2, 3], y=[0, 1, 0])

if __name__ == "__main__":
    my_flow()
### Prefect 3.x 1. Define Hook Function: Similar hook function to handle failures. 2. Attach Hook with Options: Use the
with_options
method for the mapped task. Example:
Copy code
python
from prefect import flow, task
from prefect.exceptions import Failed
from functools import partial

# Define the hook function
def on_failure_hook(task, task_run, state, **kwargs):
    if isinstance(state, Failed):
        print(f"Task {task_run.name} failed with inputs: {kwargs}")

# Define the task with the on_failure option
@task
def my_task(x, y):
    if y == 0:
        raise ValueError("Division by zero")
    return x / y

# Attach the hook to the task
my_mapped_task = my_task.with_options(on_failure=[partial(on_failure_hook, y=[0])])

@flow
def my_flow():
    results = my_mapped_task.map(x=[1, 2, 3], y=[0, 1, 0])

if __name__ == "__main__":
    my_flow()
In both examples, the custom
on_failure_hook
function will be invoked for each task instance that fails within the mapped tasks. References: - Prefect 2.x State Change Hooks Documentation - Prefect 3.x State Management Documentation
i
@Marvin y=[0] passes [0], not the item itself
m
You're right. When using
.map
with options, each invocation of the mapped task needs to pass the specific input for that particular task run. Let's adjust this to correctly handle individual inputs for the mapped tasks. ### Updated Approach To handle this properly: - Use
partial
in combination with
map
, ensuring the correct values are used for each invocation. ### Prefect 2.x
Copy code
python
from prefect import flow, task
from prefect.engine.state import Failed
from functools import partial

# Define the hook function
def on_failure_hook(task, task_run, state, **kwargs):
    if isinstance(state, Failed):
        print(f"Task {task_run.name} failed with inputs: {kwargs}")

# Define the task
@task
def my_task(x, y):
    if y == 0:
        raise ValueError("Division by zero")
    return x / y

# Wrapper to attach the correct hooks for mapped tasks
def create_task_with_hook(x, y):
    return my_task.with_options(
        on_failure=[partial(on_failure_hook, x=x, y=y)]
    )(x, y)

@flow
def my_flow():
    results = list(map(lambda params: create_task_with_hook(*params), zip([1, 2, 3], [0, 1, 0])))

if __name__ == "__main__":
    my_flow()
### Prefect 3.x
Copy code
python
from prefect import flow, task
from prefect.exceptions import Failed
from functools import partial

# Define the hook function
def on_failure_hook(task, task_run, state, **kwargs):
    if isinstance(state, Failed):
        print(f"Task {task_run.name} failed with inputs: {kwargs}")

# Define the task
@task
def my_task(x, y):
    if y == 0:
        raise ValueError("Division by zero")
    return x / y

# Wrapper to attach the correct hooks for mapped tasks
def create_task_with_hook(x, y):
    return my_task.with_options(
        on_failure=[partial(on_failure_hook, x=x, y=y)]
    )(x, y)

@flow
def my_flow():
    results = list(map(lambda params: create_task_with_hook(*params), zip([1, 2, 3], [0, 1, 0])))

if __name__ == "__main__":
    my_flow()
In the updated examples, the
create_task_with_hook
function properly attaches the
on_failure_hook
with the correct individual parameters for each mapped task. References: - Prefect 2.x State Change Hooks Documentation - Prefect 3.x State Management Documentation
n
you can use partial here, but it might be easier to use the run context
Copy code
In [1]: from prefect.context import get_run_context

In [2]: from prefect import flow, task

In [3]: def print_params(task, task_run, state):
   ...:     print(get_run_context().parameters)
   ...:

In [4]: @task(on_failure=[print_params])
   ...: def t(x): raise RuntimeError()

In [5]: @flow(log_prints=True)
   ...: def f():
   ...:     state = t(42, return_state=True)
   ...:     return "its fine"
   ...:

In [6]: f()
15:10:48.418 | INFO    | prefect.engine - Created flow run 'watchful-caiman' for flow 'f'
15:10:48.668 | INFO    | Task run 't-874' - Created task run 't-874' for task 't'
15:10:48.678 | ERROR   | Task run 't-874' - Task run failed with exception: RuntimeError() - Retries are exhausted
Traceback (most recent call last):
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/task_engine.py", line 763, in run_context
    yield self
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/task_engine.py", line 1323, in run_task_sync
    engine.call_task_fn(txn)
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/task_engine.py", line 786, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nate/github.com/prefecthq/prefect/src/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "<ipython-input-4-ec29f317cd1e>", line 2, in t
    def t(x): raise RuntimeError()
              ^^^^^^^^^^^^^^^^^^^^
RuntimeError
15:10:48.684 | INFO    | Task run 't-874' - Running hook 'print_params' in response to entering state 'Failed'
15:10:48.685 | INFO    | Task run 't-874' - {'x': 42}
15:10:48.685 | INFO    | Task run 't-874' - Hook 'print_params' finished running successfully
15:10:48.686 | ERROR   | Task run 't-874' - Finished in state Failed('Task run encountered an exception RuntimeError: ')
15:10:48.849 | INFO    | Flow run 'watchful-caiman' - Finished in state Completed()
Out[6]: 'its fine'