<@ULVA73B9P> I need to use arguments to the flow f...
# ask-marvin
s
@Marvin I need to use arguments to the flow function in on_failure callable in a task. Is it possible to access flow context from on_failure callable?
m
In Prefect 3.x, it is possible to access the flow context from an
on_failure
callable in a task. The
on_failure
hook can be defined to accept the flow, flow run, and state as arguments. This allows you to interact with the flow's context or any arguments you may have passed to it. Here's a brief example of how you might define such a callable:
Copy code
python
from prefect import task, flow

def on_failure(flow, flow_run, state):
    # Access flow arguments or context here
    # Example: print flow run parameters
    print(flow_run.parameters)

@task(on_failure=on_failure)
def my_task():
    raise ValueError("Task failed!")

@flow
def my_flow(param1, param2):
    my_task()

my_flow(param1="value1", param2="value2")
In this example, the
on_failure
function is set to handle failures in
my_task
, and it can access the parameters passed to the flow. For more details, you might want to explore the Prefect source code or documentation on flow_engine.py and the Prefect API documentation.
s
@Marvin It seems that on_failure callable for tasks has "task" and "task_run" arguments, not "flow"
m
You're correct. In Prefect 3.x, the
on_failure
callable for tasks is typically provided with
task
and
task_run
arguments, not directly with the flow context. This means you can't directly access flow arguments from the
on_failure
callable of a task. However, you can work around this limitation by passing necessary context or arguments as part of the task's parameters or by storing them in a shared state that the task can access when it fails. Here's an example of how you might structure your code:
Copy code
python
from prefect import task, flow, get_run_logger

def on_failure(task, task_run, state):
    logger = get_run_logger()
    # Log task details or handle failure
    logger.error(f"Task {task.name} failed with state: {state}")

@task(on_failure=on_failure)
def my_task(param1, param2):
    # Task logic here
    raise ValueError("Task failed!")

@flow
def my_flow(param1, param2):
    my_task(param1, param2)

my_flow(param1="value1", param2="value2")
In this setup,
param1
and
param2
are passed to
my_task
, and you can handle the failure with the
on_failure
callable using the task and task run context. For more details, you can refer to the Prefect task documentation and the Prefect API documentation.
s
@Marvin can you tell me about "storing them in a shared state". I don't want to make a variable global
m
In Prefect 3.x, you can manage shared state between tasks without resorting to global variables by using Prefect's ResultStorage or by leveraging task outputs as inputs to other tasks. This allows you to pass data between tasks in a controlled manner. Here's a brief overview of how you might achieve this: 1. Task Outputs as Inputs: You can design your flow such that the output of one task serves as the input to another. This is a straightforward way to share data between tasks.
Copy code
python
   from prefect import task, flow

   @task
   def task_a():
       return "Shared data"

   @task
   def task_b(data):
       print(f"Received data: {data}")

   @flow
   def my_flow():
       data = task_a()
       task_b(data)

   my_flow()
2. Using Prefect's ResultStorage: Prefect allows you to store task results using various backends. This can be useful if you need to persist data between runs or across different tasks without directly passing them as arguments. 3. Custom Cache Keys: Prefect supports caching, and you can use custom cache keys to manage shared state. This approach requires more setup but can be useful for complex workflows. For more detailed information on these strategies, you can explore the Prefect task caching documentation.