<@ULVA73B9P> I'm running dbt using the system bloc...
# ask-marvin
r
@Marvin I'm running dbt using the system blocks dbt cli snowflake target configs , snowflake connector and snowflake credentials. I first run this statement:
Copy code
run = await trigger_dbt_cli_command(
                        command=model, 
                        dbt_cli_profile=self.dbt_cli_profile,
                        overwrite_profiles=True,
                        project_dir=str(self.project_dir)
                )
and then I use this method to execute the dbt build commands like this:'
Copy code
@task(name="Run dbt Transformations", retries=5, retry_delay_seconds=[5, 10, 60, 300, 900])
    async def run_dbt_transformations(self, 
                                      models: List = [],
                                      vars: Optional[str] = None) -> bool:
        """Run DBT transformations using dbt block, sequentially with dependency."""
        logger = get_run_logger()

        context = TaskRunContext.get()
        <http://logger.info|logger.info>(f"Context: {context}")
        
        try:
            # Verify we're in the right directory
            dbt_project_file = self.project_dir /  "dbt_project.yml"
            if not dbt_project_file.exists():
                raise FileNotFoundError(
                    f"dbt project file not found at {dbt_project_file}. "
                    f"Current directory contents: {list(self.project_dir.glob('*'))}"
                )
            
            <http://logger.info|logger.info>(f"Using dbt project directory: {self.project_dir}")
            
            for model in models:
                <http://logger.info|logger.info>(f"Running dbt model: {model}, vars: {vars}")
                
                extra_commands = ["--select", model]

                if vars and model.endswith("unioned"):
                    vars_dict = {'counties': vars}
                    vars_yaml = yaml.dump(vars_dict, default_flow_style=True).strip()
                    extra_commands.append(f'--vars={vars_yaml}')

                <http://logger.info|logger.info>(f"extra commands: {extra_commands}")
                    #uncomment tag
                run = await trigger_dbt_cli_command(
                    command="dbt build", 
                    # dbt_cli_profile=self.dbt_cli_profile,
                    # overwrite_profiles=True,
                    project_dir=str(self.project_dir),
                    extra_command_args=extra_commands
                )

                <http://logger.info|logger.info>(str(run))
                if not 'dbtRunnerResult(success=True' in str(run):
                    <http://logger.info|logger.info>(str(run))
                    raise Exception(f"dbt model {model} failed, aborting further dbt runs.")
                
                <http://logger.info|logger.info>(f"Successfully ran dbt model: {model}, result: {run}")

            <http://logger.info|logger.info>("All dbt models completed successfully")
            return True
            
        except Exception as e:
            
            logger.error(f"Error running dbt transformations: {str(e)}")
            raise Exception(f"Error running dbt transformations: {str(e)}")
But the models are almost always returning this error:
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 178, in wrapper
    result, success = func(*args, **kwargs)
                      ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 128, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 272, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 303, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 373, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 350, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 389, in wrapper
    setup_manifest(ctx, write=write, write_perf_info=write_perf_info)
  File "/usr/local/lib/python3.12/site-packages/dbt/cli/requires.py", line 416, in setup_manifest
    ctx.obj["manifest"] = parse_manifest(
                          ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 2123, in parse_manifest
    manifest = ManifestLoader.get_full_manifest(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 320, in get_full_manifest
    manifest = loader.load()
               ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 389, in load
    self.load_and_parse_macros(project_parser_files)
  File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 707, in load_and_parse_macros
    self.macro_depends_on()
  File "/usr/local/lib/python3.12/site-packages/dbt/parser/manifest.py", line 796, in macro_depends_on
    adapter = get_adapter(self.root_project)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/adapters/factory.py", line 207, in get_adapter
    return FACTORY.lookup_adapter(config.credentials.type)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dbt/adapters/factory.py", line 132, in lookup_adapter
    return self.adapters[adapter_name]
           ~~~~~~~~~~~~~^^^^^^^^^^^^^^
KeyError: 'snowflake'
How can I solve it?
m
thought for 509.2 seconds
Short answer: dbt can’t find the Snowflake adapter in the environment where your flow is running. The Prefect blocks (DbtCliProfile + SnowflakeTargetConfigs/Snowflake credentials) provide credentials and write a profiles.yml, but they do not install the dbt Snowflake adapter. Install dbt-snowflake (matching your dbt-core version) in the same runtime where
trigger_dbt_cli_command
executes. Why you’re seeing KeyError: 'snowflake' - dbt loads adapters via Python entry points. If
dbt-snowflake
isn’t installed (or is version-incompatible), the adapter registry won’t contain
snowflake
, and dbt raises
KeyError: 'snowflake'
. How to fix 1) Ensure the adapter is installed in the worker/runtime: - On Python 3.12, use recent dbt versions. For example:
Copy code
pip install "dbt-core==1.8.*" "dbt-snowflake==1.8.*" "prefect-dbt>=0.5.0"
or, if you prefer extras, try:
Copy code
pip install "prefect-dbt[snowflake]"  # This should install dbt + the Snowflake adapter
2) If you’re using a Prefect worker image, bake it into your image:
Copy code
FROM prefecthq/prefect:3-python3.12
RUN pip install --no-cache-dir "prefect-dbt" "dbt-core==1.8.*" "dbt-snowflake==1.8.*"
- Docker/Kubernetes workers: push and point your work pool to this image. - Process worker or local runs: install the packages in that exact environment. 3) Optional sanity checks in the runtime:
Copy code
dbt --version
python -c "import importlib.metadata as m; print('dbt-core:', m.version('dbt-core')); print('dbt-snowflake:', m.version('dbt-snowflake'))"
4) Using trigger_dbt_cli_command - Passing your DbtCliProfile once with
overwrite_profiles=True
is fine if the profiles dir persists, but it’s also safe to pass it each time. - I’d also run
dbt deps
once before model runs. Example:
Copy code
# Write profiles and confirm environment by running deps once
await trigger_dbt_cli_command(
  command="dbt deps",
  dbt_cli_profile=self.dbt_cli_profile,
  overwrite_profiles=True,
  project_dir=str(self.project_dir)
)

# Then run builds per model
for model in models:
    extra = ["--select", model]
    if vars and model.endswith("unioned"):
        vars_yaml = yaml.dump({'counties': vars}, default_flow_style=True).strip()
        extra.append(f"--vars={vars_yaml}")

    run = await trigger_dbt_cli_command(
        command="dbt build",
        dbt_cli_profile=self.dbt_cli_profile,  # safe to include
        project_dir=str(self.project_dir),
        extra_command_args=extra
    )
    # Prefer checking run.success if available instead of string parsing
    if not run or not getattr(run, "success", False):
        raise Exception(f"dbt model {model} failed")
A couple of notes - The “Snowflake connector” Prefect block is unrelated to dbt’s adapter. dbt specifically needs the
dbt-snowflake
package. - Your first call shown with
command=model
looks unusual unless
model
actually contains a valid dbt command string (like "dbt deps"). If it’s just a model name, that won’t be a valid dbt CLI command. If this still fails after installing the adapter, please share: - Where the flow runs (local process, Docker worker, Kubernetes) - Outputs of
dbt --version
and
pip show dbt-snowflake
- Your
prefect-dbt
version References - Prefect dbt collection docs: https://docs.prefect.io/latest/integrations/collections/prefect-dbt/ - dbt Snowflake setup: https://docs.getdbt.com/docs/core/connect-data-platform/snowflake-setup
r
Its still failing this is the extra information requested: If this still fails after installing the adapter, please share: - Where the flow runs: Prefect managed work pool - Outputs of
dbt --version: 1.9.6
- Your
prefect-dbt
version: 0.5.0
@Marvin Im now getting this every few runs:
Copy code
Task run failed with exception: Exception("Error running dbt transformations: [Errno 39] Directory not empty: 'dbt_packages/dbt-utils-1.3.0/integration_tests/data/sql'") - Retry 1/5 will start 5 second(s) from now
m
thought for 93.0 seconds
This is almost always a race on the dbt_packages folder. dbt cleans and re-installs packages during deps/clean; if two runs touch the same project_dir (or a retry overlaps a previous attempt) you can hit “[Errno 39] Directory not empty” while dbt is removing a package directory. Common scenarios that trigger this: - Multiple flow runs on the same worker use the same project_dir concurrently - A quick retry starts while a previous attempt is still cleaning/deps’ing - Re-running dbt deps frequently in the same shared project_dir Fix options (pick one that fits your setup): 1) Serialize deps/clean with a file lock - Ensure only one process cleans/installs packages at a time. - Example using filelock (pip install filelock):
Copy code
from pathlib import Path
from filelock import FileLock
import time

@task
async def dbt_setup_and_deps(self):
    lock_path = Path(self.project_dir) / ".dbt_deps.lock"
    with FileLock(str(lock_path)):
        # Clean first to reduce leftover files
        await trigger_dbt_cli_command(
            command="dbt clean",
            dbt_cli_profile=self.dbt_cli_profile,
            overwrite_profiles=True,    # only if you need to rewrite profiles
            project_dir=str(self.project_dir),
        )
        # Then deps
        await trigger_dbt_cli_command(
            command="dbt deps",
            dbt_cli_profile=self.dbt_cli_profile,
            project_dir=str(self.project_dir),
        )
Call this once before your per-model builds, and do not call deps again in the same run. 2) Run each flow in an isolated working directory - Copy your project to a per-run temp dir and run dbt there so dbt_packages/target aren’t shared.
Copy code
import tempfile, shutil
from pathlib import Path

tmp_dir = Path(tempfile.mkdtemp(prefix="dbt-"))
shutil.copytree(self.project_dir, tmp_dir, dirs_exist_ok=True,
                ignore=shutil.ignore_patterns("target", "dbt_packages"))

# Now run dbt in tmp_dir
await trigger_dbt_cli_command(
    command="dbt deps",
    dbt_cli_profile=self.dbt_cli_profile,
    overwrite_profiles=True,
    project_dir=str(tmp_dir),
)
# ...then your per-model builds with project_dir=str(tmp_dir)
3) Add an explicit clean + backoff before deps - If you can’t introduce a lock/isolated dir, at least run “dbt clean” then “dbt deps” with a few retries and longer backoff to avoid overlapping attempts:
Copy code
for i, delay in enumerate([2, 5, 10, 30]):
    try:
        await trigger_dbt_cli_command(command="dbt clean", project_dir=str(self.project_dir),
                                      dbt_cli_profile=self.dbt_cli_profile)
        await trigger_dbt_cli_command(command="dbt deps", project_dir=str(self.project_dir),
                                      dbt_cli_profile=self.dbt_cli_profile)
        break
    except Exception as e:
        if i == 3: raise
        await asyncio.sleep(delay)
4) Reduce contention - Run “dbt deps” once on startup (or when the image is built) and avoid running it inside every flow. - Increase your task retry delays (5s is often too short if a previous run is still cleaning). - Ensure you’re not running multiple flows against the same on-disk project_dir concurrently on the same worker. dbt-side configs to check - In dbt_project.yml ensure clean-targets includes dbt_packages:
Copy code
clean-targets: ["target", "dbt_packages"]
- If you want stronger isolation without copying the whole project, you can route packages to a per-run folder via env vars:
Copy code
packages-install-path: "dbt_packages"
# set an env var in your flow run and include it in path if you template your project,
# but ensure it's unique and the templating works for your setup
If you can share: - Are multiple flow runs overlapping on the same worker directory? - Do you run “dbt deps” in each flow run or only sometimes? Docs: - Prefect dbt collection: https://docs.prefect.io/latest/integrations/collections/prefect-dbt/ - dbt deps: https://docs.getdbt.com/reference/commands/deps
r
@Marvin dbt is being called from an instantiated object in a threadpool execution, here's part of the code. inside we're calling dbt for each one of the counties which are 19:
Copy code
zillow_scraper_processor = await zillowScraper.create(days, min_lot_size, max_lot_size)
        data_processor = await dataProcessor.create(counties, dt)
        dbt_processor = await dbtCore.create()

        run_deps = await dbt_processor.run_deps()

        <http://logger.info|logger.info>(run_deps)

        # Run all counties concurrently
        <http://logger.info|logger.info>("Starting concurrent county processing...")
        
        futures = [scraper_orchestrator.submit(county, zillow_scraper_processor, data_processor, dbt_processor) for county in counties]

        results = [future.result() for future in futures]
m
thought for 92.6 seconds
Thanks — that stack + your ThreadPool pattern explains it. You’re launching many dbt CLI processes against the same dbt project directory at the same time. dbt writes to and deletes from shared folders like dbt_packages and target. When multiple processes touch those paths concurrently, you get intermittent “[Errno 39] Directory not empty” and other cleanup/install races. What’s happening - dbt deps/clean modifies dbt_packages; dbt build writes to target. Multiple concurrent processes in the same project_dir collide on those folders. - If you also pass
overwrite_profiles=True
in more than one concurrent call, you can race on profiles.yml as well. Ways to fix (pick one) A) Single dbt process, let dbt handle threading (safest) - Run one dbt command (e.g., build or run) and let dbt’s
threads:
setting in your profile control parallelism. - If you’re using per-county vars, consider refactoring so one run can process all counties (e.g., drive county logic inside SQL/macros or use tags/selection patterns). You avoid external concurrency entirely. B) Isolate each concurrent dbt invocation to its own working directory - Make a per-county temp copy of the project so dbt_packages and target are private per thread/process. - Optionally pre-bake packages into your image to avoid running deps 19 times. Example pattern:
Copy code
import tempfile, shutil, uuid
from pathlib import Path

def prepare_isolated_project(base_project_dir: Path) -> Path:
    run_dir = Path(tempfile.mkdtemp(prefix=f"dbt-{uuid.uuid4().hex}-"))
    shutil.copytree(base_project_dir, run_dir, dirs_exist_ok=True,
                    ignore=shutil.ignore_patterns("target", "dbt_packages"))
    return run_dir

# before launching threads
base_dir = Path(self.project_dir)
profiles_dir = base_dir / ".profiles"  # write once, then reuse; don’t overwrite in threads

# ensure profiles exist exactly once (guard with a lock if multiple flows might race)
await trigger_dbt_cli_command(
    command="dbt debug",             # any harmless command that writes profiles.yml if needed
    dbt_cli_profile=self.dbt_cli_profile,
    overwrite_profiles=True,
    project_dir=str(base_dir),
    profiles_dir=str(profiles_dir),
)

# per county, in each thread:
proj_dir = prepare_isolated_project(base_dir)

# install packages in the isolated copy (or skip if pre-baked in image)
await trigger_dbt_cli_command(
    command="dbt deps",
    project_dir=str(proj_dir),
    profiles_dir=str(profiles_dir),  # reuse, but do NOT set overwrite_profiles=True here
)

# now run your build
await trigger_dbt_cli_command(
    command="dbt build",
    project_dir=str(proj_dir),
    profiles_dir=str(profiles_dir),
    extra_command_args=["--select", county_model, "--vars", vars_yaml_if_any],
)
C) Serialize deps/clean with a lock and avoid calling them in parallel - If you must share one project_dir, protect deps/clean (and ideally all dbt commands) with a file lock so only one runs at a time:
Copy code
from filelock import FileLock
from pathlib import Path

lock = FileLock(str(Path(self.project_dir) / ".dbt.lock"))
with lock:
    await trigger_dbt_cli_command(command="dbt deps", project_dir=str(self.project_dir),
                                  dbt_cli_profile=self.dbt_cli_profile)
# run build calls either also under the lock, or isolate target/dbt_packages as in option B
- Increase backoff between retries; 5s is usually too short if another run is still cleaning. D) Avoid deps/clean during runs - Run
dbt deps
once at image build time or at worker startup to pre-populate dbt_packages. Then don’t call
deps
in flow runs. This reduces the chance of the dbt_packages race (target can still race if you run builds concurrently in the same project_dir). Additional tips - Don’t set
overwrite_profiles=True
in concurrent calls. Write the profile once up front and then pass a fixed
profiles_dir
in all subsequent calls.
- If multiple separate flow runs can overlap on the same machine/project_dir, use a machine-wide lock around
deps/clean
or put the project on a per-run working directory. - If you’re on Prefect Cloud, you can also add a concurrency limit to a small “dbt_deps” task/tag so only one deps runs at a time across all flows. Given your ThreadPool pattern and 19 counties: - Recommended: either switch to a single dbt process with
threads
for intra-dbt parallelism, or isolate each dbt invocation into its own temp project_dir (Option B). Both remove the shared-folder race that’s causing the intermittent OSErrors. If you share your current
dbt_processor.run_deps()
implementation and where you set `profiles_dir`/`overwrite_profiles`, I can suggest the smallest code change to make it safe under concurrency.