Vinicius Silva
01/14/2025, 10:15 PMMarvin
01/14/2025, 10:15 PMon_failure
hook to implement this behavior.
Here's a basic outline of how you might achieve this:
1. Define your flow: Create a flow that accepts my_arg
as a parameter.
2. Set up a failure hook: Use the on_failure
hook to define a function that will be called when the flow fails. This function can then trigger the flow again with my_arg=False
.
3. Trigger the flow again: Inside the failure hook, you can call the flow again with the modified argument.
Here's a simplified example:
python
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
@flow(on_failure=[lambda state, exc: my_flow(my_arg=False) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this example, if my_task
fails with my_arg=True
, the on_failure
hook will trigger my_flow
again with my_arg=False
.
For more details, you can explore the Prefect documentation on flow retries and hooks.Vinicius Silva
01/14/2025, 10:18 PMMarvin
01/14/2025, 10:19 PMretries
parameter when defining your flow. Here’s how you can integrate this with the retry mechanism that changes the argument on failure:
python
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
@flow(retries=2, on_failure=[lambda state, exc: my_flow(my_arg=False) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this setup:
- The retries
parameter is set to 2, allowing the flow to attempt execution a total of three times (initial run + 2 retries).
- The on_failure
hook is used to change the argument to my_arg=False
on the first failure.
For more details, you can refer to the Prefect documentation on retries and failure hooks.Vinicius Silva
01/14/2025, 10:47 PMMarvin
01/14/2025, 10:48 PMon_failure
hook. This hook allows you to execute custom logic when the flow enters a failed state, and you can use it to run a shell command.
Here's a basic example of how you might set this up:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script():
# Replace 'your_script.sh' with your actual script path
subprocess.run(["bash", "your_script.sh"], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script() if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this example, if the flow fails, the run_bash_script
function is called, which executes a bash script using Python's subprocess.run
. Make sure to replace 'your_script.sh'
with the path to your actual script.
For more information, you can refer to the Prefect documentation on managing states and hooks.Vinicius Silva
01/14/2025, 11:08 PMMarvin
01/14/2025, 11:08 PMrun_bash_script
needs to depend on my_arg
, you can pass my_arg
as an argument to the function within the on_failure
hook. Here's how you can modify the previous example to achieve this:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg):
# You can use my_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this setup:
- The run_bash_script
function receives my_arg
as a parameter.
- You can use this parameter to determine which script to run or modify the script's behavior accordingly.
Ensure that the scripts (script_true.sh
and script_false.sh
) exist and are executable. Adjust the logic inside run_bash_script
to fit your specific requirements.Vinicius Silva
01/14/2025, 11:34 PMMarvin
01/14/2025, 11:34 PMrun_bash_script
function can have additional arguments that are not passed directly to the flow. You can define these arguments within the function itself or pass them explicitly in the on_failure
hook.
Here's how you can do it:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg, additional_arg):
# Use both my_arg and additional_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="extra") if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this example:
- run_bash_script
takes an additional argument additional_arg
that is not part of the flow's parameters.
- You can pass this extra argument directly in the on_failure
hook as shown.
This approach allows you to add any number of additional arguments to your failure handling logic without modifying the flow's signature.Vinicius Silva
01/14/2025, 11:35 PMMarvin
01/14/2025, 11:36 PMadditional_arg
in a prefect.yaml
file and use them in your flow. The prefect.yaml
file is used to configure deployments, including setting parameters that can be passed to your flows.
Here's a basic structure of how you might define a custom parameter in `prefect.yaml`:
yaml
version: 2
deployments:
- name: my-flow-deployment
flow_name: my_flow
parameters:
my_arg: true
additional_arg: "your_value"
In this configuration:
- my_arg
and additional_arg
are defined under the parameters
section.
- You can then access these parameters in your flow or tasks.
To utilize additional_arg
in your flow, you would modify your flow and tasks to accept it as a parameter:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg, additional_arg):
if my_arg:
raise ValueError(f"Intentional failure with additional_arg: {additional_arg}")
return "Success"
def run_bash_script(my_arg, additional_arg):
# Use additional_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="your_value") if isinstance(state, Failed) else None])
def my_flow(my_arg=True, additional_arg="default_value"):
my_task(my_arg, additional_arg)
# Run the flow
my_flow()
This setup allows you to manage and modify parameters directly from the prefect.yaml
file, providing flexibility in configuring your flow deployments. For more details, you can refer to the Prefect documentation on YAML configuration.Vinicius Silva
01/14/2025, 11:37 PMMarvin
01/14/2025, 11:37 PMadditional_arg
is only needed by the run_bash_script
function and not by the flow itself, you can still define it in the prefect.yaml
and pass it directly within the on_failure
hook without including it as a parameter in the flow signature. Here's how you can do it:
1. Define additional_arg
in your `prefect.yaml`:
yaml
version: 2
deployments:
- name: my-flow-deployment
flow_name: my_flow
parameters:
my_arg: true
metadata:
additional_arg: "your_value"
2. Use the on_failure
hook to pass additional_arg
to `run_bash_script`:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg, additional_arg):
# Use additional_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="your_value") if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow()
In this setup:
- additional_arg
is stored in the metadata
section of the prefect.yaml
.
- You can retrieve it from the deployment context or configuration and pass it to run_bash_script
within the on_failure
hook. Note that accessing metadata
might require additional logic to fetch it depending on how your environment is set up. This example assumes manual entry for simplicity.Vinicius Silva
01/14/2025, 11:55 PMMarvin
01/14/2025, 11:55 PMadditional_arg
is sourced directly from the prefect.yaml
file without setting a default value in your code, you'll need to retrieve it from the flow's context or configuration dynamically. Here's how you can achieve this:
1. Define additional_arg
in your `prefect.yaml`:
yaml
version: 2
deployments:
- name: my-flow-deployment
flow_name: my_flow
parameters:
my_arg: true
metadata:
additional_arg: "your_value"
2. Access additional_arg
from the Prefect configuration or context:
python
import subprocess
from prefect import flow, task
from prefect.context import FlowRunContext
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg):
# Access additional_arg from the flow's context or configuration
flow_run_context = FlowRunContext.get()
additional_arg = flow_run_context.metadata.get("additional_arg", None)
if additional_arg is None:
raise ValueError("additional_arg is not set in the metadata")
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow()
In this setup:
- additional_arg
is retrieved from the FlowRunContext
metadata, which should be populated based on your prefect.yaml
configuration.
- The code checks if additional_arg
is None
and raises an error if it's not set, ensuring that the value must come from the prefect.yaml
file.