Ricardo Ibarra
08/21/2025, 12:05 AMrun = await trigger_dbt_cli_command(
command=model,
dbt_cli_profile=self.dbt_cli_profile,
overwrite_profiles=True,
project_dir=str(self.project_dir)
)
and then I use this method to execute the dbt build commands like this:'
@task(name="Run dbt Transformations", retries=5, retry_delay_seconds=[5, 10, 60, 300, 900])
async def run_dbt_transformations(self,
models: List = [],
vars: Optional[str] = None) -> bool:
"""Run DBT transformations using dbt block, sequentially with dependency."""
logger = get_run_logger()
context = TaskRunContext.get()
<http://logger.info|logger.info>(f"Context: {context}")
try:
# Verify we're in the right directory
dbt_project_file = self.project_dir / "dbt_project.yml"
if not dbt_project_file.exists():
raise FileNotFoundError(
f"dbt project file not found at {dbt_project_file}. "
f"Current directory contents: {list(self.project_dir.glob('*'))}"
)
<http://logger.info|logger.info>(f"Using dbt project directory: {self.project_dir}")
for model in models:
<http://logger.info|logger.info>(f"Running dbt model: {model}, vars: {vars}")
extra_commands = ["--select", model]
if vars and model.endswith("unioned"):
vars_dict = {'counties': vars}
vars_yaml = yaml.dump(vars_dict, default_flow_style=True).strip()
extra_commands.append(f'--vars={vars_yaml}')
<http://logger.info|logger.info>(f"extra commands: {extra_commands}")
#uncomment tag
run = await trigger_dbt_cli_command(
command="dbt build",
# dbt_cli_profile=self.dbt_cli_profile,
# overwrite_profiles=True,
project_dir=str(self.project_dir),
extra_command_args=extra_commands
)
<http://logger.info|logger.info>(str(run))
if not 'dbtRunnerResult(success=True' in str(run):
<http://logger.info|logger.info>(str(run))
raise Exception(f"dbt model {model} failed, aborting further dbt runs.")
<http://logger.info|logger.info>(f"Successfully ran dbt model: {model}, result: {run}")
<http://logger.info|logger.info>("All dbt models completed successfully")
return True
except Exception as e:
logger.error(f"Error running dbt transformations: {str(e)}")
raise Exception(f"Error running dbt transformations: {str(e)}")
But the models are almost always returning this error:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 178, in wrapper
result, success = func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 128, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 272, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 303, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 373, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 350, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 389, in wrapper
setup_manifest(ctx, write=write, write_perf_info=write_perf_info)
File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 416, in setup_manifest
ctx.obj["manifest"] = parse_manifest(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 2123, in parse_manifest
manifest = ManifestLoader.get_full_manifest(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 320, in get_full_manifest
manifest = loader.load()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 389, in load
self.load_and_parse_macros(project_parser_files)
File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 707, in load_and_parse_macros
self.macro_depends_on()
File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 796, in macro_depends_on
adapter = get_adapter(self.root_project)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/adapters/factory.py", line 207, in get_adapter
return FACTORY.lookup_adapter(config.credentials.type)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dbt/adapters/factory.py", line 132, in lookup_adapter
return self.adapters[adapter_name]
~~~~~~~~~~~~~^^^^^^^^^^^^^^
KeyError: 'snowflake'
How can I solve it?Marvin
08/21/2025, 12:05 AMMarvin
08/21/2025, 12:13 AMtrigger_dbt_cli_command
executes.
Why you’re seeing KeyError: 'snowflake'
- dbt loads adapters via Python entry points. If dbt-snowflake
isn’t installed (or is version-incompatible), the adapter registry won’t contain snowflake
, and dbt raises KeyError: 'snowflake'
.
How to fix
1) Ensure the adapter is installed in the worker/runtime:
- On Python 3.12, use recent dbt versions. For example:
pip install "dbt-core==1.8.*" "dbt-snowflake==1.8.*" "prefect-dbt>=0.5.0"
or, if you prefer extras, try:
pip install "prefect-dbt[snowflake]" # This should install dbt + the Snowflake adapter
2) If you’re using a Prefect worker image, bake it into your image:
FROM prefecthq/prefect:3-python3.12
RUN pip install --no-cache-dir "prefect-dbt" "dbt-core==1.8.*" "dbt-snowflake==1.8.*"
- Docker/Kubernetes workers: push and point your work pool to this image.
- Process worker or local runs: install the packages in that exact environment.
3) Optional sanity checks in the runtime:
dbt --version
python -c "import importlib.metadata as m; print('dbt-core:', m.version('dbt-core')); print('dbt-snowflake:', m.version('dbt-snowflake'))"
4) Using trigger_dbt_cli_command
- Passing your DbtCliProfile once with overwrite_profiles=True
is fine if the profiles dir persists, but it’s also safe to pass it each time.
- I’d also run dbt deps
once before model runs. Example:
# Write profiles and confirm environment by running deps once
await trigger_dbt_cli_command(
command="dbt deps",
dbt_cli_profile=self.dbt_cli_profile,
overwrite_profiles=True,
project_dir=str(self.project_dir)
)
# Then run builds per model
for model in models:
extra = ["--select", model]
if vars and model.endswith("unioned"):
vars_yaml = yaml.dump({'counties': vars}, default_flow_style=True).strip()
extra.append(f"--vars={vars_yaml}")
run = await trigger_dbt_cli_command(
command="dbt build",
dbt_cli_profile=self.dbt_cli_profile, # safe to include
project_dir=str(self.project_dir),
extra_command_args=extra
)
# Prefer checking run.success if available instead of string parsing
if not run or not getattr(run, "success", False):
raise Exception(f"dbt model {model} failed")
A couple of notes
- The “Snowflake connector” Prefect block is unrelated to dbt’s adapter. dbt specifically needs the dbt-snowflake
package.
- Your first call shown with command=model
looks unusual unless model
actually contains a valid dbt command string (like "dbt deps"). If it’s just a model name, that won’t be a valid dbt CLI command.
If this still fails after installing the adapter, please share:
- Where the flow runs (local process, Docker worker, Kubernetes)
- Outputs of dbt --version
and pip show dbt-snowflake
- Your prefect-dbt
version
References
- Prefect dbt collection docs: https://docs.prefect.io/latest/integrations/collections/prefect-dbt/
- dbt Snowflake setup: https://docs.getdbt.com/docs/core/connect-data-platform/snowflake-setupRicardo Ibarra
08/21/2025, 12:44 AMdbt --version: 1.9.6
- Your prefect-dbt
version: 0.5.0Ricardo Ibarra
08/21/2025, 12:47 AMTask run failed with exception: Exception("Error running dbt transformations: [Errno 39] Directory not empty: 'dbt_packages/dbt-utils-1.3.0/integration_tests/data/sql'") - Retry 1/5 will start 5 second(s) from now
Marvin
08/21/2025, 12:47 AMMarvin
08/21/2025, 12:49 AMfrom pathlib import Path
from filelock import FileLock
import time
@task
async def dbt_setup_and_deps(self):
lock_path = Path(self.project_dir) / ".dbt_deps.lock"
with FileLock(str(lock_path)):
# Clean first to reduce leftover files
await trigger_dbt_cli_command(
command="dbt clean",
dbt_cli_profile=self.dbt_cli_profile,
overwrite_profiles=True, # only if you need to rewrite profiles
project_dir=str(self.project_dir),
)
# Then deps
await trigger_dbt_cli_command(
command="dbt deps",
dbt_cli_profile=self.dbt_cli_profile,
project_dir=str(self.project_dir),
)
Call this once before your per-model builds, and do not call deps again in the same run.
2) Run each flow in an isolated working directory
- Copy your project to a per-run temp dir and run dbt there so dbt_packages/target aren’t shared.
import tempfile, shutil
from pathlib import Path
tmp_dir = Path(tempfile.mkdtemp(prefix="dbt-"))
shutil.copytree(self.project_dir, tmp_dir, dirs_exist_ok=True,
ignore=shutil.ignore_patterns("target", "dbt_packages"))
# Now run dbt in tmp_dir
await trigger_dbt_cli_command(
command="dbt deps",
dbt_cli_profile=self.dbt_cli_profile,
overwrite_profiles=True,
project_dir=str(tmp_dir),
)
# ...then your per-model builds with project_dir=str(tmp_dir)
3) Add an explicit clean + backoff before deps
- If you can’t introduce a lock/isolated dir, at least run “dbt clean” then “dbt deps” with a few retries and longer backoff to avoid overlapping attempts:
for i, delay in enumerate([2, 5, 10, 30]):
try:
await trigger_dbt_cli_command(command="dbt clean", project_dir=str(self.project_dir),
dbt_cli_profile=self.dbt_cli_profile)
await trigger_dbt_cli_command(command="dbt deps", project_dir=str(self.project_dir),
dbt_cli_profile=self.dbt_cli_profile)
break
except Exception as e:
if i == 3: raise
await asyncio.sleep(delay)
4) Reduce contention
- Run “dbt deps” once on startup (or when the image is built) and avoid running it inside every flow.
- Increase your task retry delays (5s is often too short if a previous run is still cleaning).
- Ensure you’re not running multiple flows against the same on-disk project_dir concurrently on the same worker.
dbt-side configs to check
- In dbt_project.yml ensure clean-targets includes dbt_packages:
clean-targets: ["target", "dbt_packages"]
- If you want stronger isolation without copying the whole project, you can route packages to a per-run folder via env vars:
packages-install-path: "dbt_packages"
# set an env var in your flow run and include it in path if you template your project,
# but ensure it's unique and the templating works for your setup
If you can share:
- Are multiple flow runs overlapping on the same worker directory?
- Do you run “dbt deps” in each flow run or only sometimes?
Docs:
- Prefect dbt collection: https://docs.prefect.io/latest/integrations/collections/prefect-dbt/
- dbt deps: https://docs.getdbt.com/reference/commands/depsMarvin
08/21/2025, 12:49 AMRicardo Ibarra
08/21/2025, 12:54 AMzillow_scraper_processor = await zillowScraper.create(days, min_lot_size, max_lot_size)
data_processor = await dataProcessor.create(counties, dt)
dbt_processor = await dbtCore.create()
run_deps = await dbt_processor.run_deps()
<http://logger.info|logger.info>(run_deps)
# Run all counties concurrently
<http://logger.info|logger.info>("Starting concurrent county processing...")
futures = [scraper_orchestrator.submit(county, zillow_scraper_processor, data_processor, dbt_processor) for county in counties]
results = [future.result() for future in futures]
Marvin
08/21/2025, 12:54 AMMarvin
08/21/2025, 12:55 AMoverwrite_profiles=True
in more than one concurrent call, you can race on profiles.yml as well.
Ways to fix (pick one)
A) Single dbt process, let dbt handle threading (safest)
- Run one dbt command (e.g., build or run) and let dbt’s threads:
setting in your profile control parallelism.
- If you’re using per-county vars, consider refactoring so one run can process all counties (e.g., drive county logic inside SQL/macros or use tags/selection patterns). You avoid external concurrency entirely.
B) Isolate each concurrent dbt invocation to its own working directory
- Make a per-county temp copy of the project so dbt_packages and target are private per thread/process.
- Optionally pre-bake packages into your image to avoid running deps 19 times.
Example pattern:
import tempfile, shutil, uuid
from pathlib import Path
def prepare_isolated_project(base_project_dir: Path) -> Path:
run_dir = Path(tempfile.mkdtemp(prefix=f"dbt-{uuid.uuid4().hex}-"))
shutil.copytree(base_project_dir, run_dir, dirs_exist_ok=True,
ignore=shutil.ignore_patterns("target", "dbt_packages"))
return run_dir
# before launching threads
base_dir = Path(self.project_dir)
profiles_dir = base_dir / ".profiles" # write once, then reuse; don’t overwrite in threads
# ensure profiles exist exactly once (guard with a lock if multiple flows might race)
await trigger_dbt_cli_command(
command="dbt debug", # any harmless command that writes profiles.yml if needed
dbt_cli_profile=self.dbt_cli_profile,
overwrite_profiles=True,
project_dir=str(base_dir),
profiles_dir=str(profiles_dir),
)
# per county, in each thread:
proj_dir = prepare_isolated_project(base_dir)
# install packages in the isolated copy (or skip if pre-baked in image)
await trigger_dbt_cli_command(
command="dbt deps",
project_dir=str(proj_dir),
profiles_dir=str(profiles_dir), # reuse, but do NOT set overwrite_profiles=True here
)
# now run your build
await trigger_dbt_cli_command(
command="dbt build",
project_dir=str(proj_dir),
profiles_dir=str(profiles_dir),
extra_command_args=["--select", county_model, "--vars", vars_yaml_if_any],
)
C) Serialize deps/clean with a lock and avoid calling them in parallel
- If you must share one project_dir, protect deps/clean (and ideally all dbt commands) with a file lock so only one runs at a time:
from filelock import FileLock
from pathlib import Path
lock = FileLock(str(Path(self.project_dir) / ".dbt.lock"))
with lock:
await trigger_dbt_cli_command(command="dbt deps", project_dir=str(self.project_dir),
dbt_cli_profile=self.dbt_cli_profile)
# run build calls either also under the lock, or isolate target/dbt_packages as in option B
- Increase backoff between retries; 5s is usually too short if another run is still cleaning.
D) Avoid deps/clean during runs
- Run dbt deps
once at image build time or at worker startup to pre-populate dbt_packages. Then don’t call deps
in flow runs. This reduces the chance of the dbt_packages race (target can still race if you run builds concurrently in the same project_dir).
Additional tips
- Don’t set overwrite_profiles=True
in concurrent calls. Write the profile once up front and then pass a fixed profiles_dir
in all subsequent calls.Marvin
08/21/2025, 12:55 AMdeps/clean
or put the project on a per-run working directory.
- If you’re on Prefect Cloud, you can also add a concurrency limit to a small “dbt_deps” task/tag so only one deps runs at a time across all flows.
Given your ThreadPool pattern and 19 counties:
- Recommended: either switch to a single dbt process with threads
for intra-dbt parallelism, or isolate each dbt invocation into its own temp project_dir (Option B). Both remove the shared-folder race that’s causing the intermittent OSErrors.
If you share your current dbt_processor.run_deps()
implementation and where you set `profiles_dir`/`overwrite_profiles`, I can suggest the smallest code change to make it safe under concurrency.