https://prefect.io logo
Title
r

Ron Budnar

06/07/2022, 9:31 PM
Hi! I have a question about pulling secrets when a flow is executing. I've followed this guide for setting up a Prefect/DBT flow and want to be able to grab fresh database credentials at runtime without having to reregister the flow. Following the guide, I've got basically this:
@task
def get_dbt_kwargs(**kwargs):
    return kwargs


with Flow(name="dbt_command_flow", run_config=docker_run, storage=STORAGE) as flow:
    dbt_repo = Parameter(dbt_repo_url", ...)

    dbt_env = get_dbt_kwargs(
        DBT_ENV_SECRET__SQL_SERVER=PrefectSecret("DBT_ENV_SECRET__SQL_SERVER"),
        DBT_ENV_SECRET__SQL_DATABASE=PrefectSecret("DBT_ENV_SECRET__SQL_DATABASE"),
        DBT_ENV_SECRET__SQL_USER=PrefectSecret("DBT_ENV_SECRET__SQL_USER"),
        DBT_ENV_SECRET__SQL_PASSWORD=PrefectSecret("DBT_ENV_SECRET__SQL_PASSWORD"),
    )
   ... # a couple of tasks omitted for brevity
   dbt_run = dbt(
        env=dbt_env,
        command=dbt_command,
        task_args={"name": "Run DBT Shell command"},
        upstream_tasks=[pull_task, dbt_env],
    )
My question is - is there a better/preferred way to do the above? Basically, I'm trying to ensure that if credentials/secrets are changed in Prefect Cloud, then the next run picks them up correctly.
As a bit of context and follow up - I was previously doing this outside the flow to define the DBT shell task:
dbt = DbtShellTask(
    env={
        "DBT_ENV_SECRET__SQL_DATABASE": Secret("DBT_ENV_SECRET__SQL_DATABASE").get(),
        "DBT_ENV_SECRET__SQL_SERVER": Secret("DBT_ENV_SECRET__SQL_SERVER").get(),
        "DBT_ENV_SECRET__SQL_USER": Secret("DBT_ENV_SECRET__SQL_USER").get(),
        "DBT_ENV_SECRET__SQL_PASSWORD": Secret("DBT_ENV_SECRET__SQL_PASSWORD").get(),
    },
    ...
)
If I changed those secrets, then subsequent runs did not appear to update correctly. Is it that the secrets were being fetched at registration time and not being updated during execution time?
k

Kevin Kho

06/07/2022, 10:02 PM
This first snippet you shared is exactly right. For the second one, those secrets are obtained and evaluated during Flow registration time
Note though that you should pass the credentials in the run method of
DbtShellTask
dbt = DbtShellTask(...)
with Flow(..) as flow:
    dbt(pass_secrets_here)
If you do:
with Flow(...) as flow:
    dbt = DbtShellTask(pass_secrets_here)
    dbt()
those are evaluated during registration time too. DbtShellTask has an init and a run. The first
()
is the init. The second is the
run()
. So if you pass it to the init, it’s used during build time. If you pass it to the
run()
, it’s used during execution time
r

Ron Budnar

06/07/2022, 10:06 PM
That makes sense to me. Thanks for clarifying!
k

Kevin Mullins

06/08/2022, 2:31 PM
I was recently looking at this and believe one needs to be careful with the get_dbt_kwargs approach based on my understanding of Prefect. Since you have a task retrieving the value from secrets and then returning them in a plain dictionary, I believe this could accidentally get serialized in task results and leak credentials. What I chose to do to prevent this is set the username and password values in the dbt_kwargs to use dbt/jinja’s env_var function to pull it from an environment variable that is supplied via k8s secrets to each job pod.
return {
        "account": snowflake_connection.account,
        "database": snowflake_connection.database,
        "role": snowflake_connection.transformer_role,
        "schema": "common",
        "threads": 4,
        "type": "snowflake",
        "user": "{{ env_var('SNOWFLAKE_USERNAME') }}",
        "password": "{{ env_var('SNOWFLAKE_PASSWORD') }}",
        "warehouse": snowflake_connection.transformer_warehouse,
    }
:upvote: 1
👍 1