Preston Sharpe
09/25/2025, 3:17 PM{{env_var('database')}}
. I've confirmed that the environment variables are accessible in the flow file itself (via load_dotenv()
and printing them. I'm still unclear as to how to get those to pass to the dbt process. Happy to share snippets of code if that helps youMarvin
09/25/2025, 3:17 PMMarvin
09/25/2025, 3:26 PMdbt_build
/ run_dbt_build
), there isn’t a special parameter to pass env vars; the dbt subprocess inherits os.environ
from your flow process.
That means:
- If you run the flow locally, calling load_dotenv()
before run_dbt_build(...)
is usually enough.
- If you “invoke” a flow/deployment so it runs on a worker (Docker, Kubernetes, Process/Universal), the env vars must exist in that worker’s runtime environment (or be injected via work pool job variables, Dockerfile, Kubernetes env, etc.). Loading a .env
file locally will not affect a remotely executed flow.
Concrete options
1) Local run (single process)
- Load your .env
before calling dbt:
from prefect import flow
from prefect_dbt.cli.commands import run_dbt_build
from dotenv import load_dotenv
@flow
def dbt_flow():
load_dotenv(override=True) # ensures os.environ is populated
return run_dbt_build(
project_dir="/path/to/project",
profiles_dir="/path/to/profiles",
stream_output=True,
)
if __name__ == "__main__":
dbt_flow()
2) Deployment on a worker (recommended approaches)
- Put the env vars in the worker’s environment:
- Process/Universal worker: export them before starting the worker
- Docker worker: add ENV
lines in your Dockerfile or pass env via job variables
- Kubernetes worker: set env or secrets on the pod
- Or pass them via deployment job variables when you deploy from source:
flow = flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="flows/dbt_flow.py:dbt_flow",
)
flow.deploy(
name="dbt-build",
work_pool_name="your-pool",
job_variables={
"env": {
"database": "my_db",
# Optionally, if needed by dbt:
# "DBT_PROFILES_DIR": "/opt/dbt/profiles"
}
}
)
3) Pull from Prefect Variables or Blocks, then set os.environ before running dbt
from prefect.variables import get
import os
os.environ["database"] = get("database") # make it visible to dbt
A few gotchas to check
- The key you use in env_var('...')
must exist in the dbt process’s environment at runtime. If you use {{ env_var('database') }}
, make sure the variable name matches exactly (case sensitive on most systems).
- Ensure you call load_dotenv()
before invoking run_dbt_build
.
- If you’re running remotely, confirm your worker/container actually has those env vars (print them in the flow just before calling dbt to verify).
- You can also set DBT_PROFILES_DIR
to point dbt to your profiles directory if that’s easier than passing profiles_dir
.
Docs for reference
- Prefect + dbt integration: Prefect dbt integration
- Prefect Shell (alternative if you want explicit control over env via ShellOperation(env=...)
): Prefect Shell integration
- Prefect settings/profiles (for environment handling patterns): Prefect settings and profiles
If you can share:
- How you’re running this (local vs deployment), which worker type, and how you’re invoking the flow
- A minimal snippet showing where you call load_dotenv()
and run_dbt_build(...)
- The dbt error output (e.g., “env var not found”)
…I can pinpoint the exact fix.Preston Sharpe
09/25/2025, 3:39 PMKevin Grismore
09/25/2025, 3:45 PMPreston Sharpe
09/25/2025, 3:52 PM# from dbt_common.events.base_types import EventLevel
from prefect import task, flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings
from pathlib import Path
import os
from dotenv import load_dotenv
@task(name="Find dbt directory", log_prints=True)
def find_dbt():
parent_path = Path(__file__).parent.parent.parent
print(parent_path)
print(os.listdir(parent_path))
if "dbt" in os.listdir(parent_path):
dbt_path = os.path.join(parent_path, "dbt")
return dbt_path
else:
return None
@task(name="dbt build")
def run_dbt_build_task(path_to_dbt: str = ""):
# fail quietly? May not want to do
if not path_to_dbt:
return None
PrefectDbtRunner(
settings=PrefectDbtSettings(
project_dir=path_to_dbt,
profiles_dir=path_to_dbt,
# log_level=EventLevel.ERROR, # explicitly choose a higher log level for dbt
)
).invoke(["build"])
@flow(log_prints=True)
def run_dbt_build_flow():
# load environment to be discovered for dbt
load_dotenv(override=True)
# setup our dbt path and then invoke the build with that pass
path_to_dbt = find_dbt()
if path_to_dbt:
run_dbt_build_task(path_to_dbt=path_to_dbt)
else:
print("Did not invoke dbt as no dbt folder was found.")
if __name__ == "__main__":
run_dbt_build_flow()
Preston Sharpe
09/25/2025, 3:53 PMPreston Sharpe
09/25/2025, 4:39 PM- name: Test Run DBT Build
flow_name: Run DBT build
entrypoint: flows/dbt_flow.py:run_dbt_build_flow
work_pool:
name: local-machine-work-pool
job_variables:
env:
database: "{{ $database }}"
server: "{{ $server }}"
user: "{{ $username }}"
password: "{{ $password }}"
schedule: null
Just wondering how to do this when invoking runs locally for quicker developmentKevin Grismore
09/25/2025, 4:48 PMKevin Grismore
09/25/2025, 4:49 PMPreston Sharpe
09/25/2025, 5:45 PM