Adam
07/09/2025, 2:54 PMMarvin
07/09/2025, 2:54 PMMarvin
07/09/2025, 2:55 PMpython
from prefect import flow, task
from typing import List, Optional
@flow
def run_dbt_commands(
commands: List[str],
project_dir: str = "./dbt",
profiles_dir: Optional[str] = None,
target: Optional[str] = None
):
for command in commands:
# You might want to wrap this in a task
result = subprocess.run(
f"dbt {command}",
shell=True,
cwd=project_dir,
env={
**os.environ,
**({"DBT_PROFILES_DIR": profiles_dir} if profiles_dir else {}),
**({"DBT_TARGET": target} if target else {})
}
)
if result.returncode != 0:
raise Exception(f"dbt command failed: {command}")
You can then create multiple deployments using this same flow with different parameters:
python
from prefect.deployments import Deployment
# Deployment for daily model refresh
daily_models = Deployment.build_from_flow(
flow=run_dbt_commands,
name="daily-refresh",
parameters={
"commands": ["run --select daily_models", "test --select daily_models"],
"project_dir": "/path/to/dbt",
"target": "prod"
}
)
# Deployment for weekly full refresh
weekly_full = Deployment.build_from_flow(
flow=run_dbt_commands,
name="weekly-full",
parameters={
"commands": ["run --full-refresh", "test"],
"project_dir": "/path/to/dbt",
"target": "prod"
}
)
This approach has several benefits:
1. DRY (Don't Repeat Yourself): You maintain a single flow definition
2. Flexibility: You can easily create new deployments with different parameters
3. Maintainability: Changes to the core dbt execution logic only need to be made in one place
4. Testing: Easier to test since you have a single flow implementation
You might also consider creating separate tasks for different dbt commands (run, test, seed, etc.) and composing them in your flow, but the parameterized approach above is often sufficient for many use cases.
For more complex scenarios, you might want to look into using the prefect-dbt
collection, which provides more sophisticated dbt integrations, including built-in retries and better error handling.
Note: The deployment examples above use Prefect 2.x syntax. In Prefect 3.x, you would use:
python
from prefect import serve
serve(
run_dbt_commands,
name="daily-refresh",
parameters={
"commands": ["run --select daily_models", "test --select daily_models"],
"project_dir": "/path/to/dbt",
"target": "prod"
}
)
Let me know if you'd like to see more specific examples or have questions about handling particular dbt scenarios!