Vinicius Silva
01/14/2025, 10:15 PMMarvin
01/14/2025, 10:15 PMon_failure hook to implement this behavior.
Here's a basic outline of how you might achieve this:
1. Define your flow: Create a flow that accepts my_arg as a parameter.
2. Set up a failure hook: Use the on_failure hook to define a function that will be called when the flow fails. This function can then trigger the flow again with my_arg=False.
3. Trigger the flow again: Inside the failure hook, you can call the flow again with the modified argument.
Here's a simplified example:
python
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
@flow(on_failure=[lambda state, exc: my_flow(my_arg=False) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this example, if my_task fails with my_arg=True, the on_failure hook will trigger my_flow again with my_arg=False.
For more details, you can explore the Prefect documentation on flow retries and hooks.Vinicius Silva
01/14/2025, 10:18 PMMarvin
01/14/2025, 10:19 PMretries parameter when defining your flow. Here’s how you can integrate this with the retry mechanism that changes the argument on failure:
python
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
@flow(retries=2, on_failure=[lambda state, exc: my_flow(my_arg=False) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this setup:
- The retries parameter is set to 2, allowing the flow to attempt execution a total of three times (initial run + 2 retries).
- The on_failure hook is used to change the argument to my_arg=False on the first failure.
For more details, you can refer to the Prefect documentation on retries and failure hooks.Vinicius Silva
01/14/2025, 10:47 PMMarvin
01/14/2025, 10:48 PMon_failure hook. This hook allows you to execute custom logic when the flow enters a failed state, and you can use it to run a shell command.
Here's a basic example of how you might set this up:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script():
# Replace 'your_script.sh' with your actual script path
subprocess.run(["bash", "your_script.sh"], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script() if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this example, if the flow fails, the run_bash_script function is called, which executes a bash script using Python's subprocess.run. Make sure to replace 'your_script.sh' with the path to your actual script.
For more information, you can refer to the Prefect documentation on managing states and hooks.Vinicius Silva
01/14/2025, 11:08 PMMarvin
01/14/2025, 11:08 PMrun_bash_script needs to depend on my_arg, you can pass my_arg as an argument to the function within the on_failure hook. Here's how you can modify the previous example to achieve this:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg):
# You can use my_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this setup:
- The run_bash_script function receives my_arg as a parameter.
- You can use this parameter to determine which script to run or modify the script's behavior accordingly.
Ensure that the scripts (script_true.sh and script_false.sh) exist and are executable. Adjust the logic inside run_bash_script to fit your specific requirements.Vinicius Silva
01/14/2025, 11:34 PMMarvin
01/14/2025, 11:34 PMrun_bash_script function can have additional arguments that are not passed directly to the flow. You can define these arguments within the function itself or pass them explicitly in the on_failure hook.
Here's how you can do it:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg, additional_arg):
# Use both my_arg and additional_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="extra") if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow(my_arg=True)
In this example:
- run_bash_script takes an additional argument additional_arg that is not part of the flow's parameters.
- You can pass this extra argument directly in the on_failure hook as shown.
This approach allows you to add any number of additional arguments to your failure handling logic without modifying the flow's signature.Vinicius Silva
01/14/2025, 11:35 PMMarvin
01/14/2025, 11:36 PMadditional_arg in a prefect.yaml file and use them in your flow. The prefect.yaml file is used to configure deployments, including setting parameters that can be passed to your flows.
Here's a basic structure of how you might define a custom parameter in `prefect.yaml`:
yaml
version: 2
deployments:
- name: my-flow-deployment
flow_name: my_flow
parameters:
my_arg: true
additional_arg: "your_value"
In this configuration:
- my_arg and additional_arg are defined under the parameters section.
- You can then access these parameters in your flow or tasks.
To utilize additional_arg in your flow, you would modify your flow and tasks to accept it as a parameter:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg, additional_arg):
if my_arg:
raise ValueError(f"Intentional failure with additional_arg: {additional_arg}")
return "Success"
def run_bash_script(my_arg, additional_arg):
# Use additional_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="your_value") if isinstance(state, Failed) else None])
def my_flow(my_arg=True, additional_arg="default_value"):
my_task(my_arg, additional_arg)
# Run the flow
my_flow()
This setup allows you to manage and modify parameters directly from the prefect.yaml file, providing flexibility in configuring your flow deployments. For more details, you can refer to the Prefect documentation on YAML configuration.Vinicius Silva
01/14/2025, 11:37 PMMarvin
01/14/2025, 11:37 PMadditional_arg is only needed by the run_bash_script function and not by the flow itself, you can still define it in the prefect.yaml and pass it directly within the on_failure hook without including it as a parameter in the flow signature. Here's how you can do it:
1. Define additional_arg in your `prefect.yaml`:
yaml
version: 2
deployments:
- name: my-flow-deployment
flow_name: my_flow
parameters:
my_arg: true
metadata:
additional_arg: "your_value"
2. Use the on_failure hook to pass additional_arg to `run_bash_script`:
python
import subprocess
from prefect import flow, task
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg, additional_arg):
# Use additional_arg in your script logic
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="your_value") if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow()
In this setup:
- additional_arg is stored in the metadata section of the prefect.yaml.
- You can retrieve it from the deployment context or configuration and pass it to run_bash_script within the on_failure hook. Note that accessing metadata might require additional logic to fetch it depending on how your environment is set up. This example assumes manual entry for simplicity.Vinicius Silva
01/14/2025, 11:55 PMMarvin
01/14/2025, 11:55 PMadditional_arg is sourced directly from the prefect.yaml file without setting a default value in your code, you'll need to retrieve it from the flow's context or configuration dynamically. Here's how you can achieve this:
1. Define additional_arg in your `prefect.yaml`:
yaml
version: 2
deployments:
- name: my-flow-deployment
flow_name: my_flow
parameters:
my_arg: true
metadata:
additional_arg: "your_value"
2. Access additional_arg from the Prefect configuration or context:
python
import subprocess
from prefect import flow, task
from prefect.context import FlowRunContext
from prefect.states import Failed
@task
def my_task(my_arg):
if my_arg:
raise ValueError("Intentional failure")
return "Success"
def run_bash_script(my_arg):
# Access additional_arg from the flow's context or configuration
flow_run_context = FlowRunContext.get()
additional_arg = flow_run_context.metadata.get("additional_arg", None)
if additional_arg is None:
raise ValueError("additional_arg is not set in the metadata")
script_name = "script_true.sh" if my_arg else "script_false.sh"
print(f"Running {script_name} with additional_arg: {additional_arg}")
subprocess.run(["bash", script_name], check=True)
@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
my_task(my_arg)
# Run the flow
my_flow()
In this setup:
- additional_arg is retrieved from the FlowRunContext metadata, which should be populated based on your prefect.yaml configuration.
- The code checks if additional_arg is None and raises an error if it's not set, ensuring that the value must come from the prefect.yaml file.