Fabien Punin
02/04/2025, 1:54 PMMarvin
02/04/2025, 1:54 PMon_failure
callback using with_options
, you can follow these steps:
1. Define a Flow: Use the @flow
decorator to define your flow function.
2. Inject Callbacks: Utilize the with_options
method to modify your flow's behavior, such as adding an on_failure
callback.
Here's a simple example:
python
from prefect import flow, Flow
@flow
def my_flow():
# Access flow object using Flow.current
flow_obj = Flow.current
# Inject an on_failure callback
flow_obj.with_options(on_failure=[my_failure_callback])
def my_failure_callback(flow_run):
print(f"Flow {flow_run.name} failed!")
Key Points:
- Use Flow.current
to access the current flow object.
- Use with_options
to specify callbacks like on_failure
.
For more details, you can refer to the Prefect documentation on managing states and the source code for flow options.Fabien Punin
02/04/2025, 1:56 PMMarvin
02/04/2025, 1:56 PMFlow.current
in Prefect 3.x, which suggests it might not be the correct way to access the current flow object within a flow function. My previous suggestion might have been misleading.
To achieve your goal of injecting an on_failure
callback from within the flow itself, you should define the callback when you create the flow using the @flow
decorator directly, rather than attempting to access a flow object from within the flow function. Here's how you can define a flow with an on_failure
callback:
python
from prefect import flow
def my_failure_callback(flow_run):
print(f"Flow {flow_run.name} failed!")
@flow(on_failure=my_failure_callback)
def my_flow():
# Your flow logic here
pass
This method directly associates the callback with the flow during its definition. For more detailed information, you might want to explore the Prefect documentation on flow options.Fabien Punin
02/04/2025, 1:57 PMMarvin
02/04/2025, 1:57 PMon_failure
callback, you can use a closure or a mutable object that the callback can access. Here's a pattern using a closure:
python
from prefect import flow
def create_failure_callback(data_container):
def failure_callback(flow_run):
print(f"Flow {flow_run.name} failed with data: {data_container['data']}")
return failure_callback
@flow
def my_flow():
# Compute some data during flow execution
computed_data = {"result": "important data"}
# Store the data in a mutable container
data_container = {"data": computed_data}
# Inject the callback with access to the computed data
my_flow.with_options(on_failure=create_failure_callback(data_container))
# Your flow logic here
pass
Explanation:
- Mutable Container: Use a dictionary or another mutable object to store computed data.
- Closure: Define the failure_callback
within another function (create_failure_callback
) to capture the mutable container.
This way, the callback has access to the data computed during the flow execution. For more information on managing states and hooks, you can refer to the Prefect documentation on managing states.