Billy McMonagle
12/23/2022, 3:40 AMBilly McMonagle
12/23/2022, 3:41 AMdbt deps
and dbt debug
.
@flow
def trigger_dbt_cli_command_flow():
with open(SNOWFLAKE_SVC_USER_PRIVATE_KEY_FILE) as key_file:
connector = SnowflakeConnector(
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
warehouse=SNOWFLAKE_WAREHOUSE,
credentials=SnowflakeCredentials(
user=SNOWFLAKE_USERNAME, # users have default roles, so we don't provide role here
private_key=key_file.read().encode(),
account=SNOWFLAKE_ACCOUNT,
),
)
target_configs = SnowflakeTargetConfigs(connector=connector)
dbt_cli_profile = DbtCliProfile(
name=APP,
target=DBT_ENVIRONMENT,
target_configs=target_configs,
)
trigger_dbt_cli_command(
"dbt deps",
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile,
project_dir=DBT_PROJECT_DIRECTORY,
)
result = trigger_dbt_cli_command(
"dbt debug",
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile,
project_dir=DBT_PROJECT_DIRECTORY,
)
return result
Here's the error I get. The jinja is from my dbt_project.yml
file.
│ 19:50:02 Encountered an error:
│ Compilation Error
│ Could not render {{ true if target.name == 'prod' else false }}: 'target' is undefined │
│ 19:50:03.145 | ERROR | Task run 'trigger_dbt_cli_command-321ca940-0' - Encountered exception during execution:
│ Traceback (most recent call last):
│ File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1442, in orchestrate_task_run
│ result = await task.fn(*args, **kwargs)
│ File "/usr/local/lib/python3.9/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
│ result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
│ File "/usr/local/lib/python3.9/site-packages/prefect_shell/commands.py", line 103, in shell_run_command
│ raise RuntimeError(msg)
│ RuntimeError: Command failed with exit code 2:
│ Could not render {{ true if target.name == 'prod' else false }}: 'target' is undefined
I believe this error is misleading. If I comment out the corresponding lines in dbt_project.yml
, I get this error when running the above flow:
RuntimeError: Command failed with exit code 2:
in _deep_map_render, expected one of (<class 'list'>, <class 'dict'>, <class 'int'>, <class 'float'>, <class 'str'>, <class 'NoneType'>, <class 'bool'>), got <class 'bytes'>
Billy McMonagle
12/23/2022, 3:41 AM~/.dbt/profiles.yml
looks like this inside the running flow container:
app:
outputs:
qa:
account: XXXXXX.us-east-1
authenticator: snowflake
database: qa
private_key: !!binary |
XXX (redacted) XXX
schema: public
threads: 4
type: snowflake
user: svc_user_qa
warehouse: qa_qh
target: qa
If I manually replace the private_key
bytes with the following and rerun the dbt commands, it is successful:
private_key_path: /path/to/key.p8
private_key_passphrase: **redacted**
Billy McMonagle
12/23/2022, 3:42 AMSnowflakeCredentials
, SnowflakeConnector
, DbtCliProfile
, etc I don't see a way to provide the private key path rather than the private key bytes, and I believe I am following the dbt docs on this subject. Is there a better way to do this, or have I perhaps stumbled into a gap between these two collections?
I've tried to present all the info in the simplest way possible, sorry that this is a confusing issue. Thanks for your help!Billy McMonagle
12/23/2022, 4:48 PMSnowflakeCredentials
to add the desired behavior, and it works perfectly.
class SnowflakePrivateKeyCredentials(SnowflakeCredentials):
"""
SnowflakeCredentials expects private key bytes, which doesn't play
nicely with dbt. To get around this, we use a custom model that has the
right fields.
"""
_block_type_name = "Snowflake Private Key Credentials"
_block_type_slug = "snowflake-private-key-credentials"
private_key_path: Optional[str] = Field(
default=None, description="The path to the private key"
)
private_key_passphrase: Optional[SecretStr] = Field(
default=None, description="The passphrase to decrypt the private key"
)
Andrew Huang
12/23/2022, 5:27 PMBilly McMonagle
01/03/2023, 3:43 PMprefect-snowflake
and prefect-dbt
packages, my proposed solution would include adding the private_key_path
and private_key_passphrase
attributes to SnowflakeCredentials
along with a new validator method (e.g. _validate_private_key_file
). I think prefect-dbt
doesn't need to know anything about this, and get_configs
can remain as pretty much just a passthrough method with no logic.Andrew Huang
01/03/2023, 5:30 PMprivate_key_path
but perhaps we can change that behavior (deprecate password in favor for private_key_passphrase) so we don’t have to touch prefect-dbt.Billy McMonagle
01/03/2023, 8:24 PMBilly McMonagle
01/03/2023, 8:27 PMsnowflake.connector.errors.ProgrammingError: 251006: Password is empty
. Pinning to prefect-snowflake==0.2.3
fixes this error. So something in the latest release seems to affect the way the connection is authenticating, which is surprising to me. I see that that release is mostly one of your PRs, so I'm wondering if you have any thoughts as to what happened?Andrew Huang
01/03/2023, 8:28 PMAndrew Huang
01/03/2023, 8:29 PMconfigs_json = super().get_configs()
rename_keys = {
"account",
"user",
"password",
"private_key": "private_key_path",
"authenticator",
"database",
"warehouse",
"schema",
}
for key in configs_json.keys():
if key not in rename_keys:
configs_json.pop(key)
Billy McMonagle
01/03/2023, 8:30 PMEncountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "prefect/new_dbt.py", line 108, in trigger_dbt_cli_command_flow
run_dbt_command("dbt deps")
File "prefect/new_dbt.py", line 78, in run_dbt_command
connector = SnowflakeConnector(
File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 184, in __init__
self.block_initialization()
File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/database.py", line 161, in block_initialization
self._start_connection()
File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/database.py", line 156, in _start_connection
self._connection = self.get_connection()
File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/database.py", line 149, in get_connection
connection = self.credentials.get_client(**connect_kwargs, **connect_params)
File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/credentials.py", line 320, in get_client
return snowflake.connector.connect(**connect_params)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/__init__.py", line 51, in Connect
return SnowflakeConnection(**kwargs)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/connection.py", line 292, in __init__
self.connect(**kwargs)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/connection.py", line 514, in connect
self.__config(**kwargs)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/connection.py", line 908, in __config
Error.errorhandler_wrapper(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 276, in errorhandler_wrapper
handed_over = Error.hand_to_other_handler(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 334, in hand_to_other_handler
connection.errorhandler(connection, cursor, error_class, error_value)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 210, in default_errorhandler
raise error_class(
snowflake.connector.errors.ProgrammingError: 251006: Password is empty
Andrew Huang
01/03/2023, 8:31 PMBilly McMonagle
01/03/2023, 8:31 PMBilly McMonagle
01/03/2023, 8:31 PMBilly McMonagle
01/03/2023, 8:33 PMclass SnowflakePrivateKeyCredentials(SnowflakeCredentials):
"""
SnowflakeCredentials expects private key bytes, which doesn't play
nicely with dbt. To get around this, we use a custom model that has the
right fields.
"""
_block_type_name = "Snowflake Private Key Credentials"
_block_type_slug = "snowflake-private-key-credentials"
private_key_path: Optional[str] = Field(
default=None, description="The path to the private key"
)
private_key_passphrase: Optional[SecretStr] = Field(
default=None, description="The passphrase to decrypt the private key"
)
def run_dbt_command(command: str):
"""
Run the provided dbt command using the correct snowflake/dbt configuration.
This function is a wrapper for the `trigger_dbt_cli_command` prefect-dbt task.
"""
connector = SnowflakeConnector(
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
warehouse=SNOWFLAKE_WAREHOUSE,
credentials=SnowflakePrivateKeyCredentials(
user=SNOWFLAKE_USERNAME,
authenticator="snowflake",
private_key_path=SNOWFLAKE_SVC_USER_PRIVATE_KEY_FILE,
private_key_passphrase=get_ssm_parameter(
f"/snowflake/svc_user/{SSM_ENV}/PRIVATE_KEY_PASSPHRASE"
),
account=SNOWFLAKE_ACCOUNT,
),
)
target_configs = SnowflakeTargetConfigs(connector=connector)
dbt_cli_profile = DbtCliProfile(
name=APP,
target=DBT_ENVIRONMENT,
target_configs=target_configs,
)
return trigger_dbt_cli_command(
command=command,
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile,
project_dir=DBT_PROJECT_DIRECTORY,
)
Andrew Huang
01/03/2023, 8:36 PMBilly McMonagle
01/03/2023, 9:04 PMAndrew Huang
01/03/2023, 9:11 PMAndrew Huang
01/03/2023, 9:12 PMAndrew Huang
01/03/2023, 9:43 PMBilly McMonagle
01/03/2023, 9:49 PMBilly McMonagle
01/03/2023, 9:51 PMDbtCliProfile
if it were possible, but this method is good by me if it fits with the larger design goals.Andrew Huang
01/03/2023, 9:52 PMAndrew Huang
01/03/2023, 9:53 PMAndrew Huang
01/03/2023, 9:53 PMAndrew Huang
01/03/2023, 9:54 PMBilly McMonagle
01/03/2023, 9:56 PMBilly McMonagle
01/03/2023, 9:56 PMAndrew Huang
01/03/2023, 9:56 PMAndrew Huang
01/04/2023, 1:08 AMBilly McMonagle
01/04/2023, 2:56 PMprivate_key
bytes AND private_key_path
?Andrew Huang
01/04/2023, 4:16 PMAndrew Huang
01/04/2023, 5:54 PMTimo
02/07/2023, 12:02 PMsnowflake.connector.errors.ProgrammingError: 251006: Password is empty
when using private_key_path
with and without private_key_passphrase
. I'm not using dbt in this point.
Example
from prefect_snowflake.database import SnowflakeConnector,
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect import flow
@flow
def snowfalke_flow():
db_credentials = SnowflakeCredentials(
account="<<account>>",
user="<<user>>",
private_key_path="/path/to/rsa_key.p8",
role="<<role>>",
authenticator="snowflake",
)
db_connection = SnowflakeConnector(
credentials=db_credentials,
schema="public",
database="<<database>>",
warehouse="<<warehouse>>",
)
db_connection.get_connection()
If I set the passphrase as a password
, it works but it should be deprecated in favor of `private_key_passphrase`(which isn't the case in the code... only the combination with private_key
parameter is deprecated). Anyway: I didn't need passphrase at all to use a private key.
Seems there are some issues in this package to work like expected. Especially when using it as pure Snowflake.
Any thoughts about that?Timo
02/07/2023, 12:04 PMpassword
and private_key_passphrase
is forbidden so we can't make this work that way.
Do not provide both password and private_key_passphrase; specify private_key_passphrase only instead.
Andrew Huang
02/07/2023, 4:54 PMAndrew Huang
02/07/2023, 4:55 PMAndrew Huang
02/07/2023, 5:47 PMTimo
02/08/2023, 12:40 PMpassword
only
• private_key_path
only
• private key_path
+ private_key_passphrase
• private_key_path
+ password
(this should be deprecated?, there is no warning)Timo
02/08/2023, 1:04 PMprofiles.yml
is written without the credentials.
config: {}
myproj:
outputs:
dev:
schema: dv
threads: 8
type: snowflake
target: dev
Just using the dbt demo flow from the prefect-dbt website
from prefect import flow
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def trigger_dbt_cli_commands_flow():
dbt_cli_profile = DbtCliProfile.load("dbtprofileblock")
trigger_kwargs = dict(
profiles_dir=".",
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile,
)
trigger_dbt_cli_command("dbt deps", **trigger_kwargs)
result = trigger_dbt_cli_command("dbt debug", **trigger_kwargs)
return result
trigger_dbt_cli_commands_flow()
Could you confirm this?Andrew Huang
02/08/2023, 5:54 PMconfig: {}
jaffle_shop:
outputs:
dev:
account: account.region.aws
authenticator: snowflake
database: database
password: password
role: role
schema: public
threads: 4
type: snowflake
user: user
warehouse: warehouse
target: dev
Can you try using this:
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
@flow
def trigger_dbt_cli_command_flow():
connector = SnowflakeConnector(
schema="public",
database="database",
warehouse="warehouse",
credentials=SnowflakeCredentials(
user="user",
password="password",
account="account.region.aws",
role="role",
),
)
target_configs = SnowflakeTargetConfigs(
connector=connector
)
dbt_cli_profile = DbtCliProfile(
name="jaffle_shop",
target="dev",
target_configs=target_configs,
)
dbt_cli_profile.save("example-profile", overwrite=True)
dbt_cli_profile = DbtCliProfile.load("example-profile")
result = trigger_dbt_cli_command(
"dbt debug",
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile,
)
trigger_dbt_cli_command_flow()
Timo
02/09/2023, 7:54 AMTimo
02/09/2023, 9:21 AMTimo
02/09/2023, 9:31 AMfrom prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
if __name__ == "__main__":
credentials = SnowflakeCredentials(
user="user",
password="password",
account="account.region.aws",
role="role",
)
credentials.save("sf-cred", overwrite=True)
connector = SnowflakeConnector(
schema="public",
database="database",
warehouse="warehouse",
credentials=SnowflakeCredentials.load("sf-cred"),
)
connector.save("sf-conn", overwrite=True)
target_configs = SnowflakeTargetConfigs(
connector=SnowflakeConnector.load("sf-conn"), schema="TEST"
)
target_configs.save("dbt-targetconf", overwrite=True)
dbt_cli_profile = DbtCliProfile(
name="jaffle_shop",
target="dev",
target_configs=SnowflakeTargetConfigs.load("dbt-targetconf"),
)
dbt_cli_profile.save("example", overwrite=True)
dbt_cli_profile = DbtCliProfile.load("example").get_profile()
print(dbt_cli_profile)
Ends in a profile without credentials:
{'config': {}, 'jaffle_shop': {'target': 'dev', 'outputs': {'dev': {'type': 'snowflake', 'schema': 'TEST', 'threads': 4}}}}
Timo
02/09/2023, 9:40 AM