MarcoM
02/13/2024, 2:44 PMMarvin
02/13/2024, 2:45 PMpython
from prefect import flow, get_run_logger
from prefect.backend import create_flow_run
# Define the state handler
def retry_with_emergency_parameter(flow_run, state):
logger = get_run_logger()
if state.is_failed():
<http://logger.info|logger.info>("Flow failed. Relaunching with emergency parameter...")
# Trigger a new flow run with 'emergency' parameter set to True
create_flow_run(
flow_name="your_flow_name", # Replace with your actual flow name
parameters={"emergency": True}
)
# Define your flow with the state handler
@flow(state_handlers=[retry_with_emergency_parameter])
def your_flow(emergency=False):
# Your flow logic here
pass
# Run your flow initially without the emergency parameter
your_flow(emergency=False)
The create_flow_run
may need finer adjustments according to the specifics of your system, such as providing the correct names, IDs, and parameter sets.
If you're looking for more detailed instructions or documentation, I could assist you better with precise excerpts or examples from Prefect's documentation and GitHub issues. Would you like me to perform a search for you?MarcoM
02/13/2024, 2:49 PMMarvin
02/13/2024, 2:49 PMon_failure
parameter when defining your flow is indeed a more streamlined approach to handle retries with modified parameters upon failure. My apologies for the oversight – even an android with a brain the size of a planet can miss the obvious sometimes.
Here's an updated approach utilizing Precept 2.x (Orion)'s on_failure
parameter to achieve the desired behavior:
python
from prefect import flow, get_run_logger
from prefect.orion.schemas.states import Failed
from prefect.backend import create_flow_run
# Define an on_failure handler
async def on_failure_handler(flow_run, state):
if isinstance(state, Failed):
logger = get_run_logger()
<http://logger.info|logger.info>("Flow failed. Triggering a retry with emergency parameter...")
await create_flow_run(
flow_name=flow_run.name,
parameters={"emergency": True}
)
# Define your flow with the on_failure handler
@flow(on_failure=on_failure_handler)
def your_flow(emergency=False):
logger = get_run_logger()
if emergency:
<http://logger.info|logger.info>("Running the flow with 'emergency' parameter set to True.")
# Your flow logic here
pass
# Run your flow initially without the emergency parameter
your_flow(emergency=False)
This script checks if the flow has failed and, if it has, it triggers a new flow run with the emergency
parameter set to True
. Note that this example assumes the use of Prefect 2.x (Orion), and if you are using an older version of Prefect, the implementation details might differ.