Tom A
04/18/2024, 1:21 PMMarvin
04/18/2024, 1:21 PMon_failure
hook, you have a couple of options:
1. Define your hook to have additional default arguments: You can define your hook function to include additional arguments that you want to pass by default.
python
from prefect import flow, task
data = {}
def my_hook(flow, flow_run, state, my_arg="custom_value"):
data.update(my_arg=my_arg, state=state)
2. Define your hook to accept arbitrary keyword arguments: You can also have your hook function accept **kwargs
and pass any additional keyword arguments when attaching the hook to your task using partial
.
python
from functools import partial
from prefect import flow, task
data = {}
def my_hook(task, task_run, state, **kwargs):
data.update(state=state, **kwargs)
@task
def bad_task():
raise ValueError("meh")
@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
bad_task_with_a_hook = bad_task.with_options(
on_failure=[partial(my_hook, x=x, y=y)]
)
return "bar", bad_task_with_a_hook(return_state=True)
_, task_run_state = ok_with_failure_flow()
assert data == {...} # Actual comparison here
By using partial
, you are pre-filling the x
and y
arguments to my_hook
when it is called upon task failure.
For more details, refer to the Prefect States documentation.Tom A
04/18/2024, 1:25 PMexecute_dbt
that accepts a dictionary called commands, how can I write a floow that calls my_function
as a hook on completion, or failure, passing in the commands dictionary passed to execute_dbt
Marvin
04/18/2024, 1:25 PMexecute_dbt
which accepts a dictionary called commands
, you can write a flow that calls my_function
as a hook on completion or failure, passing in the commands
dictionary to it. Here's how you might do it using a hook that accepts additional keyword arguments:
python
from prefect import flow, task
from functools import partial
data = {}
@task
def execute_dbt(commands: dict):
# Your task logic here
pass # Placeholder for actual implementation
def my_function(task, task_run, state, commands):
# Your hook logic here, 'commands' is the dictionary passed from the task
data.update(commands=commands)
@flow
def my_dbt_flow(commands):
execute_dbt_with_hooks = execute_dbt.with_options(
on_failure=[partial(my_function, commands=commands)],
on_completion=[partial(my_function, commands=commands)]
)
result = execute_dbt_with_hooks(commands)
return result
# example usage with a dictionary of commands
dbt_commands = {'model': 'my_dbt_model'}
my_dbt_flow(commands=dbt_commands)
In this example, execute_dbt_with_hooks
is a version of your execute_dbt
task equipped with on_failure
and on_completion
hooks that call my_function
and pass the commands
dictionary along. The partial
function is used to create a new function where commands=commands
is predefined. The original task, execute_dbt
, takes commands
as an argument, so you pass it to the flow when calling my_dbt_flow
.
Remember to adjust the actual task logic and hook function (my_function
) according to your specific requirements.Marvin
04/18/2024, 1:57 PMhere is my flow code, still when task run_dbt_command fails, function my_hook does not run.```import sys import os from functools import partial # Add parent directory to Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(file), '..'))) from prefect import flow, task from prefect_azure.blob_storage import AzureBlobStorageContainer from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation from uuid import uuid4 from tasks.generate_dbt_artifact import generate_dbt_log_artifact def my_hook(task, task_run, state, command): print(f'\n\n\n{command}\n\n\n') @task def run_dbt_command(command): # Load CLI Profile from Block dbt_cli_profile = DbtCliProfile.load("internal-dwh-dbt-profile") # Prepare Command log_path = f'dbt_json_log_{command["key"]}' processed_command = [command["command"] + f' --log-format-file json --log-path {log_path}'] print(processed_command) # Execute DBT Operation with DbtCoreOperation( commands=processed_command, working_dir=os.path.join(os.getcwd(), 'dbt'), project_dir=os.path.join(os.getcwd(), 'dbt'), profiles_dir=os.path.join(os.getcwd(), 'dbt', 'profiles'), dbt_cli_profile=db
Marvin
04/18/2024, 2:00 PMhere is my flow code, still when task run_dbt_command fails, function my_hook does not run.```import sys import os from functools import partial # Add parent directory to Python path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(file), '..'))) from prefect import flow, task from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation from uuid import uuid4 from tasks.generate_dbt_artifact import generate_dbt_log_artifact def my_hook(task, task_run, state, command): print(f'\n\n\n{command}\n\n\n') @task def run_dbt_command(command): # Load CLI Profile from Block dbt_cli_profile = DbtCliProfile.load("internal-dwh-dbt-profile") # Prepare Command log_path = f'dbt_json_log_{command["key"]}' processed_command = [command["command"] + f' --log-format-file json --log-path {log_path}'] print(processed_command) # Execute DBT Operation dbt_execute(proccessed_command) @flow def testing_flow(): commands = [{'command':'dbt build --select fail_model', 'key':'test'}] for command in commands: execute_dbt_with_hooks = run_dbt_command(command).with_options( on_failure=[partial(my_hook, command=command)], on_completion
Tom A
04/18/2024, 2:03 PM@flow
def testing_flow():
commands = [{'command':'dbt build --select fail_model', 'key':'test'}, {'command':'dbt build --select fail_model', 'key':'test2'} ]
for command in commands:
execute_dbt_with_hooks = run_dbt_command(command).with_options(
on_failure=[partial(my_hook, command=command)],
on_completion=[partial(my_hook, command=command)]
)
result = execute_dbt_with_hooks(command)
return result
if __name__ == "__main__":
testing_flow()
Marvin
04/18/2024, 2:04 PMfor
loop is set up to return the result immediately after the first iteration, which means the flow will only attempt to run the first command, and it will conclude before any subsequent commands are executed or any hooks are triggered on those subsequent commands.
Let me correct the flow to ensure that all hooks are properly attached and that all commands run as expected:
python
from prefect import flow, task
from functools import partial
@task
def run_dbt_command(command: dict):
# Your task logic here.
if command['key'] == 'test2':
raise ValueError("Simulated task failure.")
return f"Command {command['key']} executed successfully."
def my_hook(task, task_run, state, command):
# Processing when hooks are triggered.
print(f"Hook triggered for a task on {state}. Command: {command}")
@flow
def testing_flow():
commands = [
{'command':'dbt build --select fail_model', 'key':'test'},
{'command':'dbt build --select fail_model', 'key':'test2'}
]
results = [] # Collect results of all dbt commands.
for command in commands:
# Define the "with_options" outside of the execution context.
execute_dbt_with_hooks = run_dbt_command.with_options(
on_failure=[partial(my_hook, command=command)],
on_completion=[partial(my_hook, command=command)]
)
# Execute the task and append the result.
results.append(execute_dbt_with_hooks(command=command))
# Return all results
return results
if __name__ == "__main__":
testing_flow()
Notable changes:
- Removed .with_options()
from inside the execution call; instead, it's used to configure the task before execution.
- Removed return
from inside the loop so it doesn't exit after the first execution, allowing the flow to process all commands.
- Added a results
list to store the results of executed dbt commands.
With the updated flow, when run_dbt_command
fails, it should now properly trigger the on_failure
hook, which will call my_hook
with the respective command's data.