Hi there, I'm working on migration a flow from Pre...
# ask-community
b
Hi there, I'm working on migration a flow from Prefect 1 to 2. I use Snowflake, and am having some trouble with the `prefect-dbt`/`prefect-snowflake` collections. I think there is an issue related to my use of private key Snowflake authentication. Here are some details... by the way, I know many of us have slowed down for the holidays and I'm not in a rush for a response.
👀 2
My flow looks like this... right now it just runs
dbt deps
and
dbt debug
.
Copy code
@flow
def trigger_dbt_cli_command_flow():
    with open(SNOWFLAKE_SVC_USER_PRIVATE_KEY_FILE) as key_file:
        connector = SnowflakeConnector(
            database=SNOWFLAKE_DATABASE,
            schema=SNOWFLAKE_SCHEMA,
            warehouse=SNOWFLAKE_WAREHOUSE,
            credentials=SnowflakeCredentials(
                user=SNOWFLAKE_USERNAME,  # users have default roles, so we don't provide role here
                private_key=key_file.read().encode(),
                account=SNOWFLAKE_ACCOUNT,
            ),
        )
    target_configs = SnowflakeTargetConfigs(connector=connector)
    dbt_cli_profile = DbtCliProfile(
        name=APP,
        target=DBT_ENVIRONMENT,
        target_configs=target_configs,
    )
    trigger_dbt_cli_command(
        "dbt deps",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
        project_dir=DBT_PROJECT_DIRECTORY,
    )
    result = trigger_dbt_cli_command(
        "dbt debug",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
        project_dir=DBT_PROJECT_DIRECTORY,
    )
    return result
Here's the error I get. The jinja is from my
dbt_project.yml
file.
Copy code
│ 19:50:02  Encountered an error:                                                                                                                                                                                                                   
│ Compilation Error                                                                                                                                                                                                                                 
│   Could not render {{ true if target.name == 'prod' else false }}: 'target' is undefined                                                                                                                                                          │                                                                                                                                                                                                                                
│ 19:50:03.145 | ERROR   | Task run 'trigger_dbt_cli_command-321ca940-0' - Encountered exception during execution:                                                                                                                                  
│ Traceback (most recent call last):                                                                                                                                                                                                                
│   File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1442, in orchestrate_task_run                                                                                                                                             
│     result = await task.fn(*args, **kwargs)                                                                                                                                                                                                       
│   File "/usr/local/lib/python3.9/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command                                                                                                                                 
│     result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)                                                                                                                                                              
│   File "/usr/local/lib/python3.9/site-packages/prefect_shell/commands.py", line 103, in shell_run_command                                                                                                                                         
│     raise RuntimeError(msg)                                                                                                                                                                                                                       
│ RuntimeError: Command failed with exit code 2:                                                                                                                                                                                                    
│   Could not render {{ true if target.name == 'prod' else false }}: 'target' is undefined
I believe this error is misleading. If I comment out the corresponding lines in
dbt_project.yml
, I get this error when running the above flow:
Copy code
RuntimeError: Command failed with exit code 2:
  in _deep_map_render, expected one of (<class 'list'>, <class 'dict'>, <class 'int'>, <class 'float'>, <class 'str'>, <class 'NoneType'>, <class 'bool'>), got <class 'bytes'>
I am fairly certain that the error has to do with how my private key is being written to the dbt profile.
~/.dbt/profiles.yml
looks like this inside the running flow container:
Copy code
app:
  outputs:
    qa:
      account: XXXXXX.us-east-1
      authenticator: snowflake
      database: qa
      private_key: !!binary |
        XXX (redacted) XXX
      schema: public
      threads: 4
      type: snowflake
      user: svc_user_qa
      warehouse: qa_qh
  target: qa
If I manually replace the
private_key
bytes with the following and rerun the dbt commands, it is successful:
Copy code
private_key_path: /path/to/key.p8
      private_key_passphrase: **redacted**
From examining
SnowflakeCredentials
,
SnowflakeConnector
,
DbtCliProfile
, etc I don't see a way to provide the private key path rather than the private key bytes, and I believe I am following the dbt docs on this subject. Is there a better way to do this, or have I perhaps stumbled into a gap between these two collections? I've tried to present all the info in the simplest way possible, sorry that this is a confusing issue. Thanks for your help!
I'm not sure what the "right" solution is, but I did get this working. I extended
SnowflakeCredentials
to add the desired behavior, and it works perfectly.
Copy code
class SnowflakePrivateKeyCredentials(SnowflakeCredentials):
    """
    SnowflakeCredentials expects private key bytes, which doesn't play
    nicely with dbt. To get around this, we use a custom model that has the
    right fields.
    """

    _block_type_name = "Snowflake Private Key Credentials"
    _block_type_slug = "snowflake-private-key-credentials"
    private_key_path: Optional[str] = Field(
        default=None, description="The path to the private key"
    )
    private_key_passphrase: Optional[SecretStr] = Field(
        default=None, description="The passphrase to decrypt the private key"
    )
👀 1
a
Thanks for reporting this and experimenting with a solution! Would you like to contribute a PR to fix this? It would involve editing: https://github.com/PrefectHQ/prefect-dbt/blob/main/prefect_dbt/cli/configs/snowflake.py#L71-L79 Then, if a private_key is detected, perform some logic to use the right keys; like how BigQueryTargetConfigs does it: https://github.com/PrefectHQ/prefect-dbt/blob/main/prefect_dbt/cli/configs/bigquery.py#L105-L121 added an issue here to track it: https://github.com/PrefectHQ/prefect-dbt/issues/108
b
Hi @Andrew Huang thanks for the suggestion... I'm happy to take a crack at it. If I'm properly understanding the separation of concerns in the
prefect-snowflake
and
prefect-dbt
packages, my proposed solution would include adding the
private_key_path
and
private_key_passphrase
attributes to
SnowflakeCredentials
along with a new validator method (e.g.
_validate_private_key_file
). I think
prefect-dbt
doesn't need to know anything about this, and
get_configs
can remain as pretty much just a passthrough method with no logic.
a
Yes, that’d be great! Thank you! I am not sure if we want a separate private_key_passphrase, since previously, we just used the password field for that, but I like the idea of
private_key_path
but perhaps we can change that behavior (deprecate password in favor for private_key_passphrase) so we don’t have to touch prefect-dbt.
👍 1
b
That makes sense.
I've noticed one more interesting thing... when I upgraded to the latest version of prefect-snowflake, my solution above fails in my flow and I receive the error
snowflake.connector.errors.ProgrammingError: 251006: Password is empty
. Pinning to
prefect-snowflake==0.2.3
fixes this error. So something in the latest release seems to affect the way the connection is authenticating, which is surprising to me. I see that that release is mostly one of your PRs, so I'm wondering if you have any thoughts as to what happened?
a
are you using a private key + password together? would you be able to provide a traceback?
also, in prefect-dbt, I’m planning to only allow specific keys to decouple prefect-dbt from other collections (in the works)
Copy code
configs_json = super().get_configs()
        rename_keys = {
            "account",
            "user",
            "password",
            "private_key": "private_key_path",
            "authenticator",
            "database",
            "warehouse",
            "schema",
        }
        for key in configs_json.keys():
            if key not in rename_keys:
                configs_json.pop(key)
b
ah i see, that makes sense... i already had to bump prefect-core from 2.4 to 2.7 today to play nice with latest prefect-dbt 🙂 (due to new credentialblock thing) on the latest error, my traceback is...
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "prefect/new_dbt.py", line 108, in trigger_dbt_cli_command_flow
    run_dbt_command("dbt deps")
  File "prefect/new_dbt.py", line 78, in run_dbt_command
    connector = SnowflakeConnector(
  File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 184, in __init__
    self.block_initialization()
  File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/database.py", line 161, in block_initialization
    self._start_connection()
  File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/database.py", line 156, in _start_connection
    self._connection = self.get_connection()
  File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/database.py", line 149, in get_connection
    connection = self.credentials.get_client(**connect_kwargs, **connect_params)
  File "/usr/local/lib/python3.9/site-packages/prefect_snowflake/credentials.py", line 320, in get_client
    return snowflake.connector.connect(**connect_params)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/__init__.py", line 51, in Connect
    return SnowflakeConnection(**kwargs)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/connection.py", line 292, in __init__
    self.connect(**kwargs)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/connection.py", line 514, in connect
    self.__config(**kwargs)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/connection.py", line 908, in __config
    Error.errorhandler_wrapper(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 276, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 334, in hand_to_other_handler
    connection.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.9/site-packages/snowflake/connector/errors.py", line 210, in default_errorhandler
    raise error_class(
snowflake.connector.errors.ProgrammingError: 251006: Password is empty
👀 1
a
thanks! can you show how you’re instantiating the block
b
i'm not using prefect-snowflake to actually connect to snowflake, i'm using it purely to populate the dbt profile and then run dbt. so as long as dbt thinks the profile is valid i am happy.
sure...
Copy code
class SnowflakePrivateKeyCredentials(SnowflakeCredentials):
    """
    SnowflakeCredentials expects private key bytes, which doesn't play
    nicely with dbt. To get around this, we use a custom model that has the
    right fields.
    """

    _block_type_name = "Snowflake Private Key Credentials"
    _block_type_slug = "snowflake-private-key-credentials"
    private_key_path: Optional[str] = Field(
        default=None, description="The path to the private key"
    )
    private_key_passphrase: Optional[SecretStr] = Field(
        default=None, description="The passphrase to decrypt the private key"
    )

def run_dbt_command(command: str):
    """
    Run the provided dbt command using the correct snowflake/dbt configuration.
    This function is a wrapper for the `trigger_dbt_cli_command` prefect-dbt task.
    """
    connector = SnowflakeConnector(
        database=SNOWFLAKE_DATABASE,
        schema=SNOWFLAKE_SCHEMA,
        warehouse=SNOWFLAKE_WAREHOUSE,
        credentials=SnowflakePrivateKeyCredentials(
            user=SNOWFLAKE_USERNAME,
            authenticator="snowflake",
            private_key_path=SNOWFLAKE_SVC_USER_PRIVATE_KEY_FILE,
            private_key_passphrase=get_ssm_parameter(
                f"/snowflake/svc_user/{SSM_ENV}/PRIVATE_KEY_PASSPHRASE"
            ),
            account=SNOWFLAKE_ACCOUNT,
        ),
    )
    target_configs = SnowflakeTargetConfigs(connector=connector)
    dbt_cli_profile = DbtCliProfile(
        name=APP,
        target=DBT_ENVIRONMENT,
        target_configs=target_configs,
    )
    return trigger_dbt_cli_command(
        command=command,
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
        project_dir=DBT_PROJECT_DIRECTORY,
    )
🙏 1
a
I don’t have an answer right now, but I’m working on dbt today so I’ll get back to you shortly!
b
Spectacular. Happy to help if I can.
a
Here’s a draft of what I’m planning to do to decouple the two collections https://github.com/PrefectHQ/prefect-dbt/pull/112/files
testing your snippet now…
thanks for sharing your snippet! I think this PR fixes it: https://github.com/PrefectHQ/prefect-snowflake/pull/58
b
Ooh that seems like a nice improvement. Thanks!
Personally, I would happily provide a set of key/values directly to the
DbtCliProfile
if it were possible, but this method is good by me if it fits with the larger design goals.
a
thanks! to explain the earlier error: snowflake.connector.errors.ProgrammingError: 251006: Password is empty, I think it’s because snowflake-connector-python accepts “password” as kwarg, not private_key_passphrase, while dbt profile accepts private_key_passphrase
message has been deleted
also, yes you can use DbtCliProfile without the DbtTargetConfigs by using TargetConfigs + extras
b
that's super interesting, thanks for the explanation. that might be more appropriate in my situation.
this separation into different collection packages was an extremely good idea.
🙌 1
a
yes thank you for reporting it! 😄
Okay, I created a PR to support private_key_path + private_key_passphrase here: https://github.com/PrefectHQ/prefect-snowflake/pull/59 wondering if that works for you!
b
Nice! That looks perfect. Is it worth validating that the user hasn't provided both
private_key
bytes AND
private_key_path
?
a
Yes that is a good point
t
Hi @Andrew Huang, I'm facing this
snowflake.connector.errors.ProgrammingError: 251006: Password is empty
when using
private_key_path
with and without
private_key_passphrase
. I'm not using dbt in this point. Example
Copy code
from prefect_snowflake.database import SnowflakeConnector,
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect import flow

@flow
def snowfalke_flow():
    
    db_credentials = SnowflakeCredentials(
        account="<<account>>",
        user="<<user>>",
        private_key_path="/path/to/rsa_key.p8",
        role="<<role>>",
        authenticator="snowflake",
    )
    db_connection = SnowflakeConnector(
        credentials=db_credentials,
        schema="public",
        database="<<database>>",
        warehouse="<<warehouse>>",
    )
    db_connection.get_connection()
If I set the passphrase as a
password
, it works but it should be deprecated in favor of `private_key_passphrase`(which isn't the case in the code... only the combination with
private_key
parameter is deprecated). Anyway: I didn't need passphrase at all to use a private key. Seems there are some issues in this package to work like expected. Especially when using it as pure Snowflake. Any thoughts about that?
Edit: providing both,
password
and
private_key_passphrase
is forbidden so we can't make this work that way.
Do not provide both password and private_key_passphrase; specify private_key_passphrase only instead.
a
Hi Timo, thanks for bringing this up. Previously, my understanding was that password/private_key_passphrase was used to decode the private_der_key: https://github.com/PrefectHQ/prefect-snowflake/blob/main/prefect_snowflake/credentials.py#L228-L237 then, the password gets dropped entirely: https://github.com/PrefectHQ/prefect-snowflake/blob/main/prefect_snowflake/credentials.py#L352-L357 would you be interested in updating it and making a PR to fix it?
Okay I might have a fix here; let me know if this works for you! https://github.com/PrefectHQ/prefect-snowflake/pull/67
t
Thanks @Andrew Huang. I can confirm this version works when using... •
password
only •
private_key_path
only •
private key_path
+
private_key_passphrase
private_key_path
+
password
(this should be deprecated?, there is no warning)
But I now figured out that the dbt profiles are not written correctly. This is not related to your fix but maybe u have an idea. I configured a DbtCliProfile block which uses a Snowflake Target Config block which uses a Snowflake Connector block which uses a Snowflake Credentials block. But my
profiles.yml
is written without the credentials.
Copy code
config: {}
myproj:
  outputs:
    dev:
      schema: dv
      threads: 8
      type: snowflake
  target: dev
Just using the dbt demo flow from the prefect-dbt website
Copy code
from prefect import flow

from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@flow
def trigger_dbt_cli_commands_flow():
    dbt_cli_profile = DbtCliProfile.load("dbtprofileblock")

    trigger_kwargs = dict(
        profiles_dir=".",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
    )

    trigger_dbt_cli_command("dbt deps", **trigger_kwargs)

    result = trigger_dbt_cli_command("dbt debug", **trigger_kwargs)

    return result


trigger_dbt_cli_commands_flow()
Could you confirm this?
a
I appreciate you testing! I’ll wrap up that PR today and look into the deprecation warning not showing. In regards to the profile not written correctly, I cannot reproduce:
Copy code
config: {}
jaffle_shop:
  outputs:
    dev:
      account: account.region.aws
      authenticator: snowflake
      database: database
      password: password
      role: role
      schema: public
      threads: 4
      type: snowflake
      user: user
      warehouse: warehouse
  target: dev
Can you try using this:
Copy code
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector

from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs

@flow
def trigger_dbt_cli_command_flow():
    connector = SnowflakeConnector(
        schema="public",
        database="database",
        warehouse="warehouse",
        credentials=SnowflakeCredentials(
            user="user",
            password="password",
            account="account.region.aws",
            role="role",
        ),
    )
    target_configs = SnowflakeTargetConfigs(
        connector=connector
    )
    dbt_cli_profile = DbtCliProfile(
        name="jaffle_shop",
        target="dev",
        target_configs=target_configs,
    )
    dbt_cli_profile.save("example-profile", overwrite=True)

    dbt_cli_profile = DbtCliProfile.load("example-profile")
    result = trigger_dbt_cli_command(
        "dbt debug",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
    )

trigger_dbt_cli_command_flow()
t
Yeah this is working. But you only safe a CLIProfile. When you look at the block config there is no TargetConfig linked (screenshot). I think this is saved in the CLIProfile anyway. But in my case I linked all the blocks together and this is not working. Every block config is a own block object which can then be edited in the UI.
So the main problem seems to be, that the CLIProfile block configuration in combination with a SnowflakeTargetConfig don't populate the credentials/connection which are configured in the SnowflakeTargetConfig. This is maybe because the normal TargetConfigs don't have a connector block which we can apply.
Doing block configuration like this:
Copy code
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector

from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs


if __name__ == "__main__":
    credentials = SnowflakeCredentials(
        user="user",
        password="password",
        account="account.region.aws",
        role="role",
    )
    credentials.save("sf-cred", overwrite=True)

    connector = SnowflakeConnector(
        schema="public",
        database="database",
        warehouse="warehouse",
        credentials=SnowflakeCredentials.load("sf-cred"),
    )
    connector.save("sf-conn", overwrite=True)

    target_configs = SnowflakeTargetConfigs(
        connector=SnowflakeConnector.load("sf-conn"), schema="TEST"
    )
    target_configs.save("dbt-targetconf", overwrite=True)

    dbt_cli_profile = DbtCliProfile(
        name="jaffle_shop",
        target="dev",
        target_configs=SnowflakeTargetConfigs.load("dbt-targetconf"),
    )
    dbt_cli_profile.save("example", overwrite=True)

    dbt_cli_profile = DbtCliProfile.load("example").get_profile()

    print(dbt_cli_profile)
Ends in a profile without credentials:
{'config': {}, 'jaffle_shop': {'target': 'dev', 'outputs': {'dev': {'type': 'snowflake', 'schema': 'TEST', 'threads': 4}}}}