Rachel Molloy
11/29/2022, 5:03 PMAnna Geller
11/29/2022, 5:15 PMRachel Molloy
11/29/2022, 5:18 PM09:43:33.083 | INFO| Flow run 'vehement-cougar'- Executing 'trigger_dbt_cli_command-321ca940-0' immediately...
09:43:33.129 | INFO| Task run 'trigger_dbt_cli_command-321ca940-0' - Running dbt command: dbt debug --profiles-dir C:\Users\10294643\.dbt
09:43:33.131 | ERROR| Task run 'trigger_dbt_cli_command-321ca940-0' - Encountered exception during execution:
Traceback (most recent call last):
File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect\engine.py", line 1240, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect_dbt\cli\commands.py", line 158, in trigger_dbt_cli_command
result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
File "C:\Users\10294643\Anaconda3\lib\site-packages\prefect_shell\commands.py", line 86, in shell_run_command
async with await open_process(
File "C:\Users\10294643\Anaconda3\lib\site-packages\anyio\_core\_subprocesses.py", line 97, in open_process
return await get_asynclib().open_process(command, shell=shell, stdin=stdin, stdout=stdout,
File "C:\Users\10294643\Anaconda3\lib\site-packages\anyio\_backends\_asyncio.py", line 958, in open_process
process = await asyncio.create_subprocess_exec(*command, stdin=stdin, stdout=stdout,
File "C:\Users\10294643\Anaconda3\lib\asyncio\subprocess.py", line 236, in create_subprocess_exec
transport, protocol = await loop.subprocess_exec(
File "C:\Users\10294643\Anaconda3\lib\asyncio\base_events.py", line 1676, in subprocess_exec
transport = await self._make_subprocess_transport(
File "C:\Users\10294643\Anaconda3\lib\asyncio\base_events.py", line 498, in _make_subprocess_transport
raise NotImplementedError
NotImplementedError
I've tried two methods of calling this:
1. Where I reference the existing profiles.yml file and dbt project directory (note that when I run dbt debug --profiles-dir C:\Users\10294643\.dbt --project-dir 'C:\....\dbt\snowflake_conn' in the command line, it connects to Snowflake successfully):
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def trigger_dbt_cli_command_flow() -> str:
result = trigger_dbt_cli_command("dbt debug",
profiles_dir = 'C:/Users/10294643/.dbt',
project_dir = 'C:/......../dbt/snowflake_conn',
overwrite_profiles = False)
return result # Returns the last line the in CLI output
trigger_dbt_cli_command_flow()
2. Where I treat it as if there is no existing profiles.yml file and hard-code the credentials:
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
@flow
def trigger_dbt_cli_command_flow():
connector = SnowflakeConnector(
schema="SANDBOX",
database="DB",
warehouse="WAREHOUSE",
credentials=SnowflakeCredentials(
user="<myusername>",
authenticator='externalbrowser',
account="<servername.azure>",
role="MYROLE",
),
)
target_configs = SnowflakeTargetConfigs(
connector=connector
)
dbt_cli_profile = DbtCliProfile(
name="snowflake_conn",
target="prod",
target_configs=target_configs,
)
result = trigger_dbt_cli_command(
"dbt debug",
dbt_cli_profile=dbt_cli_profile,
overwrite_profiles=True
)
return result
trigger_dbt_cli_command_flow()
Anna Geller
11/29/2022, 8:41 PMRachel Molloy
11/29/2022, 8:58 PMAnna Geller
11/29/2022, 11:54 PMRachel Molloy
12/01/2022, 7:34 PMAnna Geller
12/01/2022, 8:55 PMRachel Molloy
12/14/2022, 5:19 PMAnna Geller
12/14/2022, 6:45 PMRachel Molloy
12/14/2022, 7:31 PMfrom prefect import flow
from prefect_shell import shell_run_command
@flow
def example_shell_run_command_flow():
return shell_run_command(command="ls .", return_all=True)
example_shell_run_command_flow()
Anna Geller
12/14/2022, 7:34 PMRachel Molloy
12/14/2022, 9:18 PMAnna Geller
12/14/2022, 11:43 PM