Ron Budnar
06/07/2022, 9:31 PM@task
def get_dbt_kwargs(**kwargs):
return kwargs
with Flow(name="dbt_command_flow", run_config=docker_run, storage=STORAGE) as flow:
dbt_repo = Parameter(dbt_repo_url", ...)
dbt_env = get_dbt_kwargs(
DBT_ENV_SECRET__SQL_SERVER=PrefectSecret("DBT_ENV_SECRET__SQL_SERVER"),
DBT_ENV_SECRET__SQL_DATABASE=PrefectSecret("DBT_ENV_SECRET__SQL_DATABASE"),
DBT_ENV_SECRET__SQL_USER=PrefectSecret("DBT_ENV_SECRET__SQL_USER"),
DBT_ENV_SECRET__SQL_PASSWORD=PrefectSecret("DBT_ENV_SECRET__SQL_PASSWORD"),
)
... # a couple of tasks omitted for brevity
dbt_run = dbt(
env=dbt_env,
command=dbt_command,
task_args={"name": "Run DBT Shell command"},
upstream_tasks=[pull_task, dbt_env],
)
My question is - is there a better/preferred way to do the above? Basically, I'm trying to ensure that if credentials/secrets are changed in Prefect Cloud, then the next run picks them up correctly.dbt = DbtShellTask(
env={
"DBT_ENV_SECRET__SQL_DATABASE": Secret("DBT_ENV_SECRET__SQL_DATABASE").get(),
"DBT_ENV_SECRET__SQL_SERVER": Secret("DBT_ENV_SECRET__SQL_SERVER").get(),
"DBT_ENV_SECRET__SQL_USER": Secret("DBT_ENV_SECRET__SQL_USER").get(),
"DBT_ENV_SECRET__SQL_PASSWORD": Secret("DBT_ENV_SECRET__SQL_PASSWORD").get(),
},
...
)
If I changed those secrets, then subsequent runs did not appear to update correctly. Is it that the secrets were being fetched at registration time and not being updated during execution time?Kevin Kho
06/07/2022, 10:02 PMDbtShellTask
dbt = DbtShellTask(...)
with Flow(..) as flow:
dbt(pass_secrets_here)
If you do:
with Flow(...) as flow:
dbt = DbtShellTask(pass_secrets_here)
dbt()
those are evaluated during registration time too.
DbtShellTask has an init and a run. The first ()
is the init. The second is the run()
. So if you pass it to the init, it’s used during build time. If you pass it to the run()
, it’s used during execution timeRon Budnar
06/07/2022, 10:06 PMKevin Mullins
06/08/2022, 2:31 PMreturn {
"account": snowflake_connection.account,
"database": snowflake_connection.database,
"role": snowflake_connection.transformer_role,
"schema": "common",
"threads": 4,
"type": "snowflake",
"user": "{{ env_var('SNOWFLAKE_USERNAME') }}",
"password": "{{ env_var('SNOWFLAKE_PASSWORD') }}",
"warehouse": snowflake_connection.transformer_warehouse,
}