<@ULVA73B9P> I have a Prefect flow that receives a...
# ask-marvin
v
@Marvin I have a Prefect flow that receives an argument my_arg = True. I want to deploy this flow in such a way that if the flow execution fails, Prefects tries it again, but this time passing my_arg = False. Is that possible? If so, how can I do that?
m
Yes, it's possible to retry a Prefect flow with different arguments upon failure, but you'll need to implement some custom logic to achieve this. Prefect doesn't natively support changing flow arguments on retries directly. However, you can use the
on_failure
hook to implement this behavior. Here's a basic outline of how you might achieve this: 1. Define your flow: Create a flow that accepts
my_arg
as a parameter. 2. Set up a failure hook: Use the
on_failure
hook to define a function that will be called when the flow fails. This function can then trigger the flow again with
my_arg=False
. 3. Trigger the flow again: Inside the failure hook, you can call the flow again with the modified argument. Here's a simplified example:
Copy code
python
from prefect import flow, task
from prefect.states import Failed

@task
def my_task(my_arg):
    if my_arg:
        raise ValueError("Intentional failure")
    return "Success"

@flow(on_failure=[lambda state, exc: my_flow(my_arg=False) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
    my_task(my_arg)

# Run the flow
my_flow(my_arg=True)
In this example, if
my_task
fails with
my_arg=True
, the
on_failure
hook will trigger
my_flow
again with
my_arg=False
. For more details, you can explore the Prefect documentation on flow retries and hooks.
v
@Marvin, I also need to limit retries to 2. So in total, if the execution fails 3 times (the first try + 2 retries), then the flow should just break
m
To limit retries to a maximum of two attempts, you can use the
retries
parameter when defining your flow. Here’s how you can integrate this with the retry mechanism that changes the argument on failure:
Copy code
python
from prefect import flow, task
from prefect.states import Failed

@task
def my_task(my_arg):
    if my_arg:
        raise ValueError("Intentional failure")
    return "Success"

@flow(retries=2, on_failure=[lambda state, exc: my_flow(my_arg=False) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
    my_task(my_arg)

# Run the flow
my_flow(my_arg=True)
In this setup: - The
retries
parameter is set to 2, allowing the flow to attempt execution a total of three times (initial run + 2 retries). - The
on_failure
hook is used to change the argument to
my_arg=False
on the first failure. For more details, you can refer to the Prefect documentation on retries and failure hooks.
v
@Marvin is it possible to run a bash script if the flow fails?
m
Yes, you can run a bash script when a flow fails by using the
on_failure
hook. This hook allows you to execute custom logic when the flow enters a failed state, and you can use it to run a shell command. Here's a basic example of how you might set this up:
Copy code
python
import subprocess
from prefect import flow, task
from prefect.states import Failed

@task
def my_task(my_arg):
    if my_arg:
        raise ValueError("Intentional failure")
    return "Success"

def run_bash_script():
    # Replace 'your_script.sh' with your actual script path
    subprocess.run(["bash", "your_script.sh"], check=True)

@flow(on_failure=[lambda state, exc: run_bash_script() if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
    my_task(my_arg)

# Run the flow
my_flow(my_arg=True)
In this example, if the flow fails, the
run_bash_script
function is called, which executes a bash script using Python's
subprocess.run
. Make sure to replace
'your_script.sh'
with the path to your actual script. For more information, you can refer to the Prefect documentation on managing states and hooks.
v
@Marvin what if run_bash_script also depends on my_arg?
m
If
run_bash_script
needs to depend on
my_arg
, you can pass
my_arg
as an argument to the function within the
on_failure
hook. Here's how you can modify the previous example to achieve this:
Copy code
python
import subprocess
from prefect import flow, task
from prefect.states import Failed

@task
def my_task(my_arg):
    if my_arg:
        raise ValueError("Intentional failure")
    return "Success"

def run_bash_script(my_arg):
    # You can use my_arg in your script logic
    script_name = "script_true.sh" if my_arg else "script_false.sh"
    subprocess.run(["bash", script_name], check=True)

@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
    my_task(my_arg)

# Run the flow
my_flow(my_arg=True)
In this setup: - The
run_bash_script
function receives
my_arg
as a parameter. - You can use this parameter to determine which script to run or modify the script's behavior accordingly. Ensure that the scripts (
script_true.sh
and
script_false.sh
) exist and are executable. Adjust the logic inside
run_bash_script
to fit your specific requirements.
v
@Marvin can the run_bash_script function have arguments that are not passed to the flow?
m
Yes, the
run_bash_script
function can have additional arguments that are not passed directly to the flow. You can define these arguments within the function itself or pass them explicitly in the
on_failure
hook. Here's how you can do it:
Copy code
python
import subprocess
from prefect import flow, task
from prefect.states import Failed

@task
def my_task(my_arg):
    if my_arg:
        raise ValueError("Intentional failure")
    return "Success"

def run_bash_script(my_arg, additional_arg):
    # Use both my_arg and additional_arg in your script logic
    script_name = "script_true.sh" if my_arg else "script_false.sh"
    print(f"Running {script_name} with additional_arg: {additional_arg}")
    subprocess.run(["bash", script_name], check=True)

@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="extra") if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
    my_task(my_arg)

# Run the flow
my_flow(my_arg=True)
In this example: -
run_bash_script
takes an additional argument
additional_arg
that is not part of the flow's parameters. - You can pass this extra argument directly in the
on_failure
hook as shown. This approach allows you to add any number of additional arguments to your failure handling logic without modifying the flow's signature.
v
@Marvin is it possible to define the value of additional_arg in the prefect.yaml file?
m
Yes, you can define custom parameters like
additional_arg
in a
prefect.yaml
file and use them in your flow. The
prefect.yaml
file is used to configure deployments, including setting parameters that can be passed to your flows. Here's a basic structure of how you might define a custom parameter in `prefect.yaml`:
Copy code
yaml
version: 2

deployments:
  - name: my-flow-deployment
    flow_name: my_flow
    parameters:
      my_arg: true
      additional_arg: "your_value"
In this configuration: -
my_arg
and
additional_arg
are defined under the
parameters
section. - You can then access these parameters in your flow or tasks. To utilize
additional_arg
in your flow, you would modify your flow and tasks to accept it as a parameter:
Copy code
python
import subprocess
from prefect import flow, task
from prefect.states import Failed

@task
def my_task(my_arg, additional_arg):
    if my_arg:
        raise ValueError(f"Intentional failure with additional_arg: {additional_arg}")
    return "Success"

def run_bash_script(my_arg, additional_arg):
    # Use additional_arg in your script logic
    script_name = "script_true.sh" if my_arg else "script_false.sh"
    print(f"Running {script_name} with additional_arg: {additional_arg}")
    subprocess.run(["bash", script_name], check=True)

@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="your_value") if isinstance(state, Failed) else None])
def my_flow(my_arg=True, additional_arg="default_value"):
    my_task(my_arg, additional_arg)

# Run the flow
my_flow()
This setup allows you to manage and modify parameters directly from the
prefect.yaml
file, providing flexibility in configuring your flow deployments. For more details, you can refer to the Prefect documentation on YAML configuration.
v
@Marvin, yeah, but in your my_flow is receiving additional_arg as well, when just the run_bash_script function needs it
m
If
additional_arg
is only needed by the
run_bash_script
function and not by the flow itself, you can still define it in the
prefect.yaml
and pass it directly within the
on_failure
hook without including it as a parameter in the flow signature. Here's how you can do it: 1. Define
additional_arg
in your `prefect.yaml`:
Copy code
yaml
version: 2

deployments:
  - name: my-flow-deployment
    flow_name: my_flow
    parameters:
      my_arg: true
    metadata:
      additional_arg: "your_value"
2. Use the
on_failure
hook to pass
additional_arg
to `run_bash_script`:
Copy code
python
import subprocess
from prefect import flow, task
from prefect.states import Failed

@task
def my_task(my_arg):
    if my_arg:
        raise ValueError("Intentional failure")
    return "Success"

def run_bash_script(my_arg, additional_arg):
    # Use additional_arg in your script logic
    script_name = "script_true.sh" if my_arg else "script_false.sh"
    print(f"Running {script_name} with additional_arg: {additional_arg}")
    subprocess.run(["bash", script_name], check=True)

@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True, additional_arg="your_value") if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
    my_task(my_arg)

# Run the flow
my_flow()
In this setup: -
additional_arg
is stored in the
metadata
section of the
prefect.yaml
. - You can retrieve it from the deployment context or configuration and pass it to
run_bash_script
within the
on_failure
hook. Note that accessing
metadata
might require additional logic to fetch it depending on how your environment is set up. This example assumes manual entry for simplicity.
v
@Marvin, what if I don't want additional_arg to have a default value? Here you're setting additional_arg = "your_value". I need this value to come straight from the prefect.yaml file
m
To ensure
additional_arg
is sourced directly from the
prefect.yaml
file without setting a default value in your code, you'll need to retrieve it from the flow's context or configuration dynamically. Here's how you can achieve this: 1. Define
additional_arg
in your `prefect.yaml`:
Copy code
yaml
version: 2

deployments:
  - name: my-flow-deployment
    flow_name: my_flow
    parameters:
      my_arg: true
    metadata:
      additional_arg: "your_value"
2. Access
additional_arg
from the Prefect configuration or context:
Copy code
python
import subprocess
from prefect import flow, task
from prefect.context import FlowRunContext
from prefect.states import Failed

@task
def my_task(my_arg):
    if my_arg:
        raise ValueError("Intentional failure")
    return "Success"

def run_bash_script(my_arg):
    # Access additional_arg from the flow's context or configuration
    flow_run_context = FlowRunContext.get()
    additional_arg = flow_run_context.metadata.get("additional_arg", None)
    
    if additional_arg is None:
        raise ValueError("additional_arg is not set in the metadata")

    script_name = "script_true.sh" if my_arg else "script_false.sh"
    print(f"Running {script_name} with additional_arg: {additional_arg}")
    subprocess.run(["bash", script_name], check=True)

@flow(on_failure=[lambda state, exc: run_bash_script(my_arg=True) if isinstance(state, Failed) else None])
def my_flow(my_arg=True):
    my_task(my_arg)

# Run the flow
my_flow()
In this setup: -
additional_arg
is retrieved from the
FlowRunContext
metadata, which should be populated based on your
prefect.yaml
configuration. - The code checks if
additional_arg
is
None
and raises an error if it's not set, ensuring that the value must come from the
prefect.yaml
file.