Hi all! I'm running into an issue getting started ...
# prefect-dbt
r
Hi all! I'm running into an issue getting started with dbt and Prefect on the CLI. I already have dbt successfully running locally, with a Snowflake connection and profiles.yml file. Using Prefect 2.6.9, dbt 1.3.1 and Python 3.9.15 on Windows running locally. However, when I try to run a simple "dbt debug" command inside a Prefect flow, I get a "NotImplementedError" message (see thread). I've tried two different methods of running the dbt command: one referencing an existing project directory and profiles.yml file, and one manually hardcoding a profiles configuration (see thread for full code). Both give me the same "NotImplementedError". Any ideas what I'm missing?
1
a
can you move code into thread?
1
r
The NonImplementedError message shown below:
Copy code
09:43:33.083 | INFO| Flow run 'vehement-cougar'- Executing 'trigger_dbt_cli_command-321ca940-0' immediately...
09:43:33.129 | INFO| Task run 'trigger_dbt_cli_command-321ca940-0' - Running dbt command: dbt debug --profiles-dir C:\Users\10294643\.dbt
 
09:43:33.131 | ERROR| Task run 'trigger_dbt_cli_command-321ca940-0' - Encountered exception during execution:
 Traceback (most recent call last):
  File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect\engine.py", line 1240, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect_dbt\cli\commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect_shell\commands.py", line 86, in shell_run_command
    async with await open_process(
  File "C:\Users\10294643\Anaconda3\lib\site-packages\anyio\_core\_subprocesses.py", line 97, in open_process
    return await get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout,
  File "C:\Users\10294643\Anaconda3\lib\site-packages\anyio\_backends\_asyncio.py", line 958, in open_process
    process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout,
  File "C:\Users\10294643\Anaconda3\lib\asyncio\subprocess.py", line 236, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
  File "C:\Users\10294643\Anaconda3\lib\asyncio\base_events.py", line 1676, in subprocess_exec
    transport = await self._make_subprocess_transport(
  File "C:\Users\10294643\Anaconda3\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
    raise NotImplementedError
 NotImplementedError
I've tried two methods of calling this: 1. Where I reference the existing profiles.yml file and dbt project directory (note that when I run dbt debug --profiles-dir C:\Users\10294643\.dbt --project-dir 'C:\....\dbt\snowflake_conn' in the command line, it connects to Snowflake successfully):
Copy code
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command
 
@flow
def trigger_dbt_cli_command_flow() -> str:
   result = trigger_dbt_cli_command("dbt debug",
                                    profiles_dir = 'C:/Users/10294643/.dbt',
                                    project_dir = 'C:/......../dbt/snowflake_conn',
                                   overwrite_profiles = False)
   return result # Returns the last line the in CLI output
 
trigger_dbt_cli_command_flow()
2. Where I treat it as if there is no existing profiles.yml file and hard-code the credentials:
Copy code
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
 
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
 
@flow
def trigger_dbt_cli_command_flow():
   connector = SnowflakeConnector(
       schema="SANDBOX",
       database="DB",
       warehouse="WAREHOUSE",
       credentials=SnowflakeCredentials(
           user="<myusername>",
           authenticator='externalbrowser',
           account="<servername.azure>",
           role="MYROLE",
       ),
   )
   target_configs = SnowflakeTargetConfigs(
       connector=connector
   )
   dbt_cli_profile = DbtCliProfile(
       name="snowflake_conn",
       target="prod",
       target_configs=target_configs,
   )
   result = trigger_dbt_cli_command(
       "dbt debug",
       dbt_cli_profile=dbt_cli_profile,
       overwrite_profiles=True
   )
   return result
 
trigger_dbt_cli_command_flow()
a
do your dbt commands work without prefect? can you try dbt debug from CLI? also, not sure you know, but you could e.g. use GitHub Codespaces to run it on Linux
r
yes they do work on the CLI! i can successfully connect with dbt debug and run my models without prefect. im not familiar with GitHub Codespaces but i can look into that, although im not sure that would work for me because we'll be using Azure DevOps for our version control, not GitHub
a
I was suggesting Codespaces only because it would give you a free Linux VM - could be easier to troubleshoot on Linux
1
r
Thanks for the suggestion @Anna Geller! I didn't get a chance to try that yet, but was able to resolve it today and determined that I was getting that error due to two different issues: 1. my project directory file path had spaces in it, so for whatever reason it would not successfully run 2. I was running this in Jupyter notebook. It works for me in Visual Studio Code (when I don't have spaces in my project directory file path) but not Jupyter notebook
🙌 1
gratitude thank you 1
a
thanks for the update. to run it from a jupyter notebook, you'd need to make it async
r
@Anna Geller could you expand on your comment about needing to make it async? I'm now running those same commands days later in Visual Studio Code (nothing else has changed), and again running into the same errors so it looks like my previous solutions above didn't solve the underlying issue. I see some mentions of aysnc/await calls in the output error messages, so I'm wondering if I'm missing something like that in my flow? i can send the entire error message if you'd like but it's long
a
if you run your flows in VSCode as scripts, it will work just fine, but if you run it in Jupyter notebook cells, they must be async due to Jupyter notebook limitations
r
Now I am running the following code as a script in VSCode and am getting the same errors as previously. It seems to actually be a broader issue with command line based flows because the following code gives me the same error:
Copy code
from prefect import flow
from prefect_shell import shell_run_command

@flow
def example_shell_run_command_flow():
    return shell_run_command(command="ls .", return_all=True)

example_shell_run_command_flow()
a
I could not reproduce - can you try in a fresh new conda environment?
r
Okay it did work in a new virtual environment but only if i run from the command line (can't run in VSCode itself for some reason, even when i change the interpreter to point to the new virtual environment). Any idea why that could be? Appreciate you testing that and for your help!
a
hard to tell, when I use VSCode I always run it from the CLI so hard to tell but that's definitely the most common way to do it -- you could also use ipython