Tom A
04/18/2024, 1:21 PMMarvin
04/18/2024, 1:21 PMon_failurepython
from prefect import flow, task
data = {}
def my_hook(flow, flow_run, state, my_arg="custom_value"):
    data.update(my_arg=my_arg, state=state)**kwargspartialpython
from functools import partial
from prefect import flow, task
data = {}
def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)
@task
def bad_task():
    raise ValueError("meh")
@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, x=x, y=y)]
    )
    return "bar", bad_task_with_a_hook(return_state=True)
_, task_run_state = ok_with_failure_flow()
assert data == {...}  # Actual comparison herepartialxymy_hookTom A
04/18/2024, 1:25 PMexecute_dbtmy_functionexecute_dbtMarvin
04/18/2024, 1:25 PMexecute_dbtcommandsmy_functioncommandspython
from prefect import flow, task
from functools import partial
data = {}
@task
def execute_dbt(commands: dict):
    # Your task logic here
    pass  # Placeholder for actual implementation
def my_function(task, task_run, state, commands):
    # Your hook logic here, 'commands' is the dictionary passed from the task
    data.update(commands=commands)
@flow
def my_dbt_flow(commands):
    execute_dbt_with_hooks = execute_dbt.with_options(
        on_failure=[partial(my_function, commands=commands)],
        on_completion=[partial(my_function, commands=commands)]
    )
    result = execute_dbt_with_hooks(commands)
    return result
# example usage with a dictionary of commands
dbt_commands = {'model': 'my_dbt_model'}
my_dbt_flow(commands=dbt_commands)execute_dbt_with_hooksexecute_dbton_failureon_completionmy_functioncommandspartialcommands=commandsexecute_dbtcommandsmy_dbt_flowmy_functionMarvin
04/18/2024, 1:57 PMhere is my flow code, still when task run_dbt_command fails, function my_hook does not run.```import sys import os from functools import partial # Add parent directory to Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(file), '..'))) from prefect import flow, task from prefect_azure.blob_storage import AzureBlobStorageContainer from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation from uuid import uuid4 from tasks.generate_dbt_artifact import generate_dbt_log_artifact def my_hook(task, task_run, state, command): print(f'\n\n\n{command}\n\n\n') @task def run_dbt_command(command): # Load CLI Profile from Block dbt_cli_profile = DbtCliProfile.load("internal-dwh-dbt-profile") # Prepare Command log_path = f'dbt_json_log_{command["key"]}' processed_command = [command["command"] + f' --log-format-file json --log-path {log_path}'] print(processed_command) # Execute DBT Operation with DbtCoreOperation( commands=processed_command, working_dir=os.path.join(os.getcwd(), 'dbt'), project_dir=os.path.join(os.getcwd(), 'dbt'), profiles_dir=os.path.join(os.getcwd(), 'dbt', 'profiles'), dbt_cli_profile=db
Marvin
04/18/2024, 2:00 PMhere is my flow code, still when task run_dbt_command fails, function my_hook does not run.```import sys import os from functools import partial # Add parent directory to Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(file), '..'))) from prefect import flow, task from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation from uuid import uuid4 from tasks.generate_dbt_artifact import generate_dbt_log_artifact def my_hook(task, task_run, state, command): print(f'\n\n\n{command}\n\n\n') @task def run_dbt_command(command): # Load CLI Profile from Block dbt_cli_profile = DbtCliProfile.load("internal-dwh-dbt-profile") # Prepare Command log_path = f'dbt_json_log_{command["key"]}' processed_command = [command["command"] + f' --log-format-file json --log-path {log_path}'] print(processed_command) # Execute DBT Operation dbt_execute(proccessed_command) @flow def testing_flow(): commands = [{'command':'dbt build --select fail_model', 'key':'test'}] for command in commands: execute_dbt_with_hooks = run_dbt_command(command).with_options( on_failure=[partial(my_hook, command=command)], on_completion
Tom A
04/18/2024, 2:03 PM@flow
def testing_flow():
    commands = [{'command':'dbt build --select fail_model', 'key':'test'}, {'command':'dbt build --select fail_model', 'key':'test2'} ]
    for command in commands:
        execute_dbt_with_hooks = run_dbt_command(command).with_options(
            on_failure=[partial(my_hook, command=command)],
            on_completion=[partial(my_hook, command=command)]
        )
        result = execute_dbt_with_hooks(command)
        return result
if __name__ == "__main__":
    testing_flow()Marvin
04/18/2024, 2:04 PMforpython
from prefect import flow, task
from functools import partial
@task
def run_dbt_command(command: dict):
    # Your task logic here.
    if command['key'] == 'test2':
        raise ValueError("Simulated task failure.")
    return f"Command {command['key']} executed successfully."
def my_hook(task, task_run, state, command):
    # Processing when hooks are triggered.
    print(f"Hook triggered for a task on {state}. Command: {command}")
@flow
def testing_flow():
    commands = [
        {'command':'dbt build --select fail_model', 'key':'test'},
        {'command':'dbt build --select fail_model', 'key':'test2'}
    ]
    results = []  # Collect results of all dbt commands.
    for command in commands:
        # Define the "with_options" outside of the execution context.
        execute_dbt_with_hooks = run_dbt_command.with_options(
            on_failure=[partial(my_hook, command=command)],
            on_completion=[partial(my_hook, command=command)]
        )
        # Execute the task and append the result.
        results.append(execute_dbt_with_hooks(command=command))
    # Return all results
    return results
if __name__ == "__main__":
    testing_flow().with_options()returnresultsrun_dbt_commandon_failuremy_hook