<@ULVA73B9P> how can I pass variables from a task ...
# ask-marvin
t
@Marvin how can I pass variables from a task to a on_failure hook?
m
To pass variables from a task to an
on_failure
hook, you have a couple of options: 1. Define your hook to have additional default arguments: You can define your hook function to include additional arguments that you want to pass by default.
Copy code
python
from prefect import flow, task

data = {}
def my_hook(flow, flow_run, state, my_arg="custom_value"):
    data.update(my_arg=my_arg, state=state)
2. Define your hook to accept arbitrary keyword arguments: You can also have your hook function accept
**kwargs
and pass any additional keyword arguments when attaching the hook to your task using
partial
.
Copy code
python
from functools import partial
from prefect import flow, task

data = {}
def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)

@task
def bad_task():
    raise ValueError("meh")

@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, x=x, y=y)]
    )
    return "bar", bad_task_with_a_hook(return_state=True)

_, task_run_state = ok_with_failure_flow()
assert data == {...}  # Actual comparison here
By using
partial
, you are pre-filling the
x
and
y
arguments to
my_hook
when it is called upon task failure. For more details, refer to the Prefect States documentation.
t
@Marvin Let's say I have a task called
execute_dbt
that accepts a dictionary called commands, how can I write a floow that calls
my_function
as a hook on completion, or failure, passing in the commands dictionary passed to
execute_dbt
m
I'm sorry, but it appears that the code example provided is not directly related to your request about using hooks in a Prefect flow. Let me construct a conceptual example tailored to your question. Assuming you have a task
execute_dbt
which accepts a dictionary called
commands
, you can write a flow that calls
my_function
as a hook on completion or failure, passing in the
commands
dictionary to it. Here's how you might do it using a hook that accepts additional keyword arguments:
Copy code
python
from prefect import flow, task
from functools import partial

data = {}

@task
def execute_dbt(commands: dict):
    # Your task logic here
    pass  # Placeholder for actual implementation

def my_function(task, task_run, state, commands):
    # Your hook logic here, 'commands' is the dictionary passed from the task
    data.update(commands=commands)

@flow
def my_dbt_flow(commands):
    execute_dbt_with_hooks = execute_dbt.with_options(
        on_failure=[partial(my_function, commands=commands)],
        on_completion=[partial(my_function, commands=commands)]
    )
    result = execute_dbt_with_hooks(commands)
    return result

# example usage with a dictionary of commands
dbt_commands = {'model': 'my_dbt_model'}
my_dbt_flow(commands=dbt_commands)
In this example,
execute_dbt_with_hooks
is a version of your
execute_dbt
task equipped with
on_failure
and
on_completion
hooks that call
my_function
and pass the
commands
dictionary along. The
partial
function is used to create a new function where
commands=commands
is predefined. The original task,
execute_dbt
, takes
commands
as an argument, so you pass it to the flow when calling
my_dbt_flow
. Remember to adjust the actual task logic and hook function (
my_function
) according to your specific requirements.
Your message was too long by 126 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
here is my flow code, still when task run_dbt_command fails, function my_hook does not run.
```import sys import os from functools import partial # Add parent directory to Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(file), '..'))) from prefect import flow, task from prefect_azure.blob_storage import AzureBlobStorageContainer from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation from uuid import uuid4 from tasks.generate_dbt_artifact import generate_dbt_log_artifact def my_hook(task, task_run, state, command): print(f'\n\n\n{command}\n\n\n') @task def run_dbt_command(command): # Load CLI Profile from Block dbt_cli_profile = DbtCliProfile.load("internal-dwh-dbt-profile") # Prepare Command log_path = f'dbt_json_log_{command["key"]}' processed_command = [command["command"] + f' --log-format-file json --log-path {log_path}'] print(processed_command) # Execute DBT Operation with DbtCoreOperation( commands=processed_command, working_dir=os.path.join(os.getcwd(), 'dbt'), project_dir=os.path.join(os.getcwd(), 'dbt'), profiles_dir=os.path.join(os.getcwd(), 'dbt', 'profiles'), dbt_cli_profile=db
Your message was too long by 38 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
here is my flow code, still when task run_dbt_command fails, function my_hook does not run.
```import sys import os from functools import partial # Add parent directory to Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(file), '..'))) from prefect import flow, task from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation from uuid import uuid4 from tasks.generate_dbt_artifact import generate_dbt_log_artifact def my_hook(task, task_run, state, command): print(f'\n\n\n{command}\n\n\n') @task def run_dbt_command(command): # Load CLI Profile from Block dbt_cli_profile = DbtCliProfile.load("internal-dwh-dbt-profile") # Prepare Command log_path = f'dbt_json_log_{command["key"]}' processed_command = [command["command"] + f' --log-format-file json --log-path {log_path}'] print(processed_command) # Execute DBT Operation dbt_execute(proccessed_command) @flow def testing_flow(): commands = [{'command':'dbt build --select fail_model', 'key':'test'}] for command in commands: execute_dbt_with_hooks = run_dbt_command(command).with_options( on_failure=[partial(my_hook, command=command)], on_completion
t
@Marvin Here is my main flow that I use, it's when run_dbt_command fails, it does not trigger the hook my_hook
Copy code
@flow
def testing_flow():
    commands = [{'command':'dbt build --select fail_model', 'key':'test'}, {'command':'dbt build --select fail_model', 'key':'test2'} ]
    for command in commands:
        execute_dbt_with_hooks = run_dbt_command(command).with_options(
            on_failure=[partial(my_hook, command=command)],
            on_completion=[partial(my_hook, command=command)]
        )
        result = execute_dbt_with_hooks(command)
        return result

if __name__ == "__main__":
    testing_flow()
m
It seems there might be a misunderstanding in the way tasks are being used within the flow. Particularly, the
for
loop is set up to return the result immediately after the first iteration, which means the flow will only attempt to run the first command, and it will conclude before any subsequent commands are executed or any hooks are triggered on those subsequent commands. Let me correct the flow to ensure that all hooks are properly attached and that all commands run as expected:
Copy code
python
from prefect import flow, task
from functools import partial

@task
def run_dbt_command(command: dict):
    # Your task logic here.
    if command['key'] == 'test2':
        raise ValueError("Simulated task failure.")
    return f"Command {command['key']} executed successfully."

def my_hook(task, task_run, state, command):
    # Processing when hooks are triggered.
    print(f"Hook triggered for a task on {state}. Command: {command}")

@flow
def testing_flow():
    commands = [
        {'command':'dbt build --select fail_model', 'key':'test'},
        {'command':'dbt build --select fail_model', 'key':'test2'}
    ]
    results = []  # Collect results of all dbt commands.
    for command in commands:
        # Define the "with_options" outside of the execution context.
        execute_dbt_with_hooks = run_dbt_command.with_options(
            on_failure=[partial(my_hook, command=command)],
            on_completion=[partial(my_hook, command=command)]
        )
        # Execute the task and append the result.
        results.append(execute_dbt_with_hooks(command=command))
    # Return all results
    return results

if __name__ == "__main__":
    testing_flow()
Notable changes: - Removed
.with_options()
from inside the execution call; instead, it's used to configure the task before execution. - Removed
return
from inside the loop so it doesn't exit after the first execution, allowing the flow to process all commands. - Added a
results
list to store the results of executed dbt commands. With the updated flow, when
run_dbt_command
fails, it should now properly trigger the
on_failure
hook, which will call
my_hook
with the respective command's data.
👍 1