Tate Wilks
05/30/2024, 7:12 PMMarvin
05/30/2024, 7:12 PMon_failure
hook in Prefect. Here's a basic outline of how it can be done:
1. Define the Hook Function: Create a function that will be used as the on_failure
hook. This function can call another flow.
2. Attach the Hook to the Flow: Use the on_failure
parameter to attach the hook function to your flow.
Below is an example code snippet to demonstrate this:
python
from prefect import flow, task
from prefect.blocks.core import Block
from prefect.context import get_run_context
from prefect.exceptions import FlowRunFailed
def start_another_flow(flow, flow_run, state):
# Assuming another_flow is defined in the same script or imported
from prefect import flow_run
flow_instance = flow_run.wait_for(flow_run.submit(a_flow_parameter=123))
if flow_instance.state.is_failed():
raise FlowRunFailed("The flow triggered by the on_failure hook also failed.")
@flow(on_failure=[start_another_flow])
def main_flow():
fail_task()
@task
def fail_task():
raise ValueError("This task will fail.")
@flow
def another_flow(a_flow_parameter: int):
print(f"Flow triggered with parameter: {a_flow_parameter}")
if __name__ == "__main__":
main_flow()
In this example:
- The main_flow
will fail due to fail_task
raising an exception.
- The start_another_flow
function (hook) will be triggered when main_flow
transitions to the failed state.
- Inside the start_another_flow
hook, another_flow
is called with a parameter.
For more details on state change hooks and additional examples, you can refer to the State Change Hooks documentation.
If you encounter any issues, you might want to look into specific state transitions and context management within the hook functions. You can follow or contribute to ongoing discussions on GitHub, such as this issue on hooks and context management.
Good luck, not that it matters much in this vast, indifferent universe.Tate Wilks
05/30/2024, 7:17 PMMarvin
05/30/2024, 7:18 PMon_failure
are designed to be ordinary Python functions, which unfortunately means you cannot directly call Prefect tasks within these hooks. However, there are a couple of workarounds:
1. Using Prefect Client: You can use the Prefect Client within the hook to schedule a flow run programmatically.
2. Calling Subflows: Use a flow within the hook function by ensuring the context is managed correctly.
### Example Using Prefect Client
Below is an example illustrating how to use the Prefect Client to run another flow when a failure occurs:
python
from prefect import flow, task
from prefect.client import get_client
from prefect.context import get_run_context
async def trigger_another_flow(flow_run, state):
async with get_client() as client:
deployment_id = "<YOUR_DEPLOYMENT_ID>"
await client.create_flow_run_from_deployment(deployment_id=deployment_id)
@flow(on_failure=[trigger_another_flow])
def main_flow():
fail_task()
@task
def fail_task():
raise ValueError("This task will fail.")
if __name__ == "__main__":
main_flow()
### Example Using Subflows
Alternatively, you can use subflows within the hook, but you need to manage the context properly:
python
from prefect import flow, task
from prefect.context import FlowRunContext
def run_another_flow(flow_run, state):
with FlowRunContext(flow_run):
another_flow()
@flow(on_failure=[run_another_flow])
def main_flow():
fail_task()
@task
def fail_task():
raise ValueError("This task will fail.")
@flow
def another_flow():
print("Another flow is running.")
if __name__ == "__main__":
main_flow()
### Documentation References
For more detailed information, you can refer to the State Change Hooks documentation.
In reality, does any of this matter? Perhaps not, but at least it keeps you busy.