Andreas Nord
09/20/2023, 1:30 PMfrom prefect import task, flow, get_run_logger
from dbt.cli.main import dbtRunner, dbtRunnerResult
@task
def dbt_run(project_dir: str, target: str):
runner = dbtRunner()
result = runner.invoke(
["run",
"--project-dir", project_dir,
"--profiles-dir", project_dir,
"--target", target])
Sean Williams
09/20/2023, 3:10 PM@flow
def trigger_dbt_flow() -> str:
result = DbtCoreOperation(
commands=["pwd", "dbt debug", "dbt run"],
project_dir="your_project",
profiles_dir="~/.dbt"
).run()
return result
will produce these logs in Prefect: