Adrien Besnard
09/05/2025, 7:56 AMMarvin
09/05/2025, 7:56 AMMarvin
09/05/2025, 8:00 AMfrom prefect import flow
from prefect_shell import shell_run_command
import json
@flow
def run_dbt_operation():
project_dir = "/path/to/your/dbt/project"
macro_name = "my_macro"
args = {"some_param": "value"} # whatever your macro expects
cmd = f"dbt run-operation {macro_name} --args '{json.dumps(args)}'"
stdout = shell_run_command(
command=cmd,
cwd=project_dir,
return_all=False, # set True to get a list of lines
)
# stdout contains the macro's output (as printed by dbt)
print("dbt run-operation output:", stdout)
return stdout
Option B — use the prefect-dbt helper
- Uses the collection’s task to run dbt
- Good if you want summaries/logs via Prefect artifacts; for raw macro output, shell is usually more direct
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command, consolidate_run_results
import json
@flow
def run_dbt_operation():
project_dir = "/path/to/your/dbt/project"
macro_name = "my_macro"
args = {"some_param": "value"}
result = trigger_dbt_cli_command(
command=f"dbt run-operation {macro_name}",
project_dir=project_dir,
extra_command_args=["--args", json.dumps(args)],
stream_output=True, # logs stream to Prefect
)
# Optional: get a structured summary dict (counts, timings, etc.)
summary = consolidate_run_results(result) if result else {}
return summary
Notes
- Make sure dbt and your adapter are installed (e.g., dbt-core
+ `dbt-postgres`/`dbt-snowflake`/`dbt-bigquery`) and that your profiles.yml
is available to dbt.
- For capturing the literal text output of your macro, Option A is the most reliable, since dbt run-operation
prints the return value to stdout.
Docs
- prefect-dbt collection
- prefect-shell collection
If you share your macro name and args (and where your project lives), I can tailor the snippet exactly to your setup.