https://prefect.io logo
Title
r

Rachel Molloy

11/29/2022, 5:03 PM
Hi all! I'm running into an issue getting started with dbt and Prefect on the CLI. I already have dbt successfully running locally, with a Snowflake connection and profiles.yml file. Using Prefect 2.6.9, dbt 1.3.1 and Python 3.9.15 on Windows running locally. However, when I try to run a simple "dbt debug" command inside a Prefect flow, I get a "NotImplementedError" message (see thread). I've tried two different methods of running the dbt command: one referencing an existing project directory and profiles.yml file, and one manually hardcoding a profiles configuration (see thread for full code). Both give me the same "NotImplementedError". Any ideas what I'm missing?
1
a

Anna Geller

11/29/2022, 5:15 PM
can you move code into thread?
1
r

Rachel Molloy

11/29/2022, 5:18 PM
The NonImplementedError message shown below:
09:43:33.083 | INFO| Flow run 'vehement-cougar'- Executing 'trigger_dbt_cli_command-321ca940-0' immediately...
09:43:33.129 | INFO| Task run 'trigger_dbt_cli_command-321ca940-0' - Running dbt command: dbt debug --profiles-dir C:\Users\10294643\.dbt
 
09:43:33.131 | ERROR| Task run 'trigger_dbt_cli_command-321ca940-0' - Encountered exception during execution:
 Traceback (most recent call last):
  File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect\engine.py", line 1240, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect_dbt\cli\commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect_shell\commands.py", line 86, in shell_run_command
    async with await open_process(
  File "C:\Users\10294643\Anaconda3\lib\site-packages\anyio\_core\_subprocesses.py", line 97, in open_process
    return await get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout,
  File "C:\Users\10294643\Anaconda3\lib\site-packages\anyio\_backends\_asyncio.py", line 958, in open_process
    process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout,
  File "C:\Users\10294643\Anaconda3\lib\asyncio\subprocess.py", line 236, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
  File "C:\Users\10294643\Anaconda3\lib\asyncio\base_events.py", line 1676, in subprocess_exec
    transport = await self._make_subprocess_transport(
  File "C:\Users\10294643\Anaconda3\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
    raise NotImplementedError
 NotImplementedError
I've tried two methods of calling this: 1. Where I reference the existing profiles.yml file and dbt project directory (note that when I run dbt debug --profiles-dir C:\Users\10294643\.dbt --project-dir 'C:\....\dbt\snowflake_conn' in the command line, it connects to Snowflake successfully):
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command
 
@flow
def trigger_dbt_cli_command_flow() -> str:
   result = trigger_dbt_cli_command("dbt debug",
                                    profiles_dir = 'C:/Users/10294643/.dbt',
                                    project_dir = 'C:/......../dbt/snowflake_conn',
                                   overwrite_profiles = False)
   return result # Returns the last line the in CLI output
 
trigger_dbt_cli_command_flow()
2. Where I treat it as if there is no existing profiles.yml file and hard-code the credentials:
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
 
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
 
@flow
def trigger_dbt_cli_command_flow():
   connector = SnowflakeConnector(
       schema="SANDBOX",
       database="DB",
       warehouse="WAREHOUSE",
       credentials=SnowflakeCredentials(
           user="<myusername>",
           authenticator='externalbrowser',
           account="<servername.azure>",
           role="MYROLE",
       ),
   )
   target_configs = SnowflakeTargetConfigs(
       connector=connector
   )
   dbt_cli_profile = DbtCliProfile(
       name="snowflake_conn",
       target="prod",
       target_configs=target_configs,
   )
   result = trigger_dbt_cli_command(
       "dbt debug",
       dbt_cli_profile=dbt_cli_profile,
       overwrite_profiles=True
   )
   return result
 
trigger_dbt_cli_command_flow()
a

Anna Geller

11/29/2022, 8:41 PM
do your dbt commands work without prefect? can you try dbt debug from CLI? also, not sure you know, but you could e.g. use GitHub Codespaces to run it on Linux
r

Rachel Molloy

11/29/2022, 8:58 PM
yes they do work on the CLI! i can successfully connect with dbt debug and run my models without prefect. im not familiar with GitHub Codespaces but i can look into that, although im not sure that would work for me because we'll be using Azure DevOps for our version control, not GitHub
a

Anna Geller

11/29/2022, 11:54 PM
I was suggesting Codespaces only because it would give you a free Linux VM - could be easier to troubleshoot on Linux
1
r

Rachel Molloy

12/01/2022, 7:34 PM
Thanks for the suggestion @Anna Geller! I didn't get a chance to try that yet, but was able to resolve it today and determined that I was getting that error due to two different issues: 1. my project directory file path had spaces in it, so for whatever reason it would not successfully run 2. I was running this in Jupyter notebook. It works for me in Visual Studio Code (when I don't have spaces in my project directory file path) but not Jupyter notebook
🙌 1
:gratitude-thank-you: 1
a

Anna Geller

12/01/2022, 8:55 PM
thanks for the update. to run it from a jupyter notebook, you'd need to make it async
r

Rachel Molloy

12/14/2022, 5:19 PM
@Anna Geller could you expand on your comment about needing to make it async? I'm now running those same commands days later in Visual Studio Code (nothing else has changed), and again running into the same errors so it looks like my previous solutions above didn't solve the underlying issue. I see some mentions of aysnc/await calls in the output error messages, so I'm wondering if I'm missing something like that in my flow? i can send the entire error message if you'd like but it's long
a

Anna Geller

12/14/2022, 6:45 PM
if you run your flows in VSCode as scripts, it will work just fine, but if you run it in Jupyter notebook cells, they must be async due to Jupyter notebook limitations
r

Rachel Molloy

12/14/2022, 7:31 PM
Now I am running the following code as a script in VSCode and am getting the same errors as previously. It seems to actually be a broader issue with command line based flows because the following code gives me the same error:
from prefect import flow
from prefect_shell import shell_run_command

@flow
def example_shell_run_command_flow():
    return shell_run_command(command="ls .", return_all=True)

example_shell_run_command_flow()
a

Anna Geller

12/14/2022, 7:34 PM
I could not reproduce - can you try in a fresh new conda environment?
r

Rachel Molloy

12/14/2022, 9:18 PM
Okay it did work in a new virtual environment but only if i run from the command line (can't run in VSCode itself for some reason, even when i change the interpreter to point to the new virtual environment). Any idea why that could be? Appreciate you testing that and for your help!
a

Anna Geller

12/14/2022, 11:43 PM
hard to tell, when I use VSCode I always run it from the CLI so hard to tell but that's definitely the most common way to do it -- you could also use ipython