Mateo Merlo
05/23/2022, 2:11 PMAnna Geller
05/23/2022, 2:18 PMpostgres_user = PrefectSecret("POSTGRES_USER")
postgres_pass = PrefectSecret("POSTGRES_PASS")
db_credentials = get_dbt_credentials(postgres_user, postgres_pass)
dbt_run = dbt(
command="dbt run", task_args={"name": "DBT Run"}, dbt_kwargs=db_credentials
)
full example: https://www.prefect.io/blog/flow-of-flows-orchestrating-elt-with-prefect-and-dbt/GCP_CREDENTIALS
can be tackled the same wayMateo Merlo
05/23/2022, 2:27 PM@task
def get_dbt_credentials(gcp_json):
return {"keyfile": gcp_json}
@task
def format_command(target_env):
return f"dbt build --target {target_env}"
with Flow("DBT_flow") as flow:
target_env = Parameter("target_env", required=True)
credentials = PrefectSecret("GCP_CREDENTIALS")
db_credentials = get_dbt_credentials(credentials)
dbt_run = dbt(
command=format_command(target_env),
task_args={"name": "DBT Run"},
dbt_kwargs=db_credentials
)
Anna Geller
05/23/2022, 2:29 PMMateo Merlo
05/23/2022, 2:30 PMAnna Geller
05/23/2022, 2:36 PMMateo Merlo
05/23/2022, 2:44 PMAnna Geller
05/23/2022, 2:47 PMMateo Merlo
05/23/2022, 2:47 PMAnna Geller
05/23/2022, 2:50 PMMateo Merlo
05/23/2022, 2:51 PMJoshua Greenhalgh
05/23/2022, 3:15 PMMateo Merlo
05/23/2022, 3:24 PMflow.storage = GCS(bucket="influsense_test_bucket", secrets=["GCP_CREDENTIALS"])
flow.executor = LocalDaskExecutor(scheduler="threads")
flow.run_config = VertexRun(
image="europe-west1-docker.pkg.dev/bi-staging-1-309112/my-image/my-image:latest",
labels=["vertex"],
)
Joshua Greenhalgh
05/23/2022, 3:27 PMMateo Merlo
05/23/2022, 3:29 PMJoshua Greenhalgh
05/23/2022, 3:30 PMMateo Merlo
05/23/2022, 3:31 PMJoshua Greenhalgh
05/23/2022, 3:34 PMdata "sops_file" "dbt_bq_user_service_account_key" {
source_file = "./secret_data/dbt-bq-user-sa-key.json"
}
resource "kubernetes_secret" "dbt_bq_user_service_account_key" {
metadata {
name = "dbt-bq-user-service-account-key"
namespace = "prefect"
}
data = { "dbt-bq-user-sa-key.json" = jsonencode(data.sops_file.dbt_bq_user_service_account_key.data) }
}
The sops_file
bit is just so I can keep the file in a git repo encrypted - basicly data.sops_file.dbt_bq_user_service_account_key
is the full key for the service account...
The I have the following agent template;
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: flow
imagePullPolicy: Always
volumeMounts:
- mountPath: /service_account_key/
name: dbt-bq-user-service-account-key
volumes:
- name: dbt-bq-user-service-account-key
secret:
secretName: dbt-bq-user-service-account-key
And my profile which is baked into my image looks like;
mbk_dwh:
outputs:
dev:
dataset: mbk_dwh
fixed_retries: 1
location: EU
method: service-account
priority: interactive
project: data-analytics-mbk
threads: 1
timeout_seconds: 300
type: bigquery
keyfile: /service_account_key/dbt-bq-user-sa-key.json
Mateo Merlo
05/23/2022, 3:46 PMJoshua Greenhalgh
05/23/2022, 3:55 PM/service_account_key/dbt-bq-user-sa-key.json
- all I have to do is bake the profile into the image with a ref to that locationMateo Merlo
05/23/2022, 3:59 PMJoshua Greenhalgh
05/23/2022, 4:00 PMrun_config=KubernetesRun(image=JOB_IMAGE_NAME, labels=[DEFAULT_AGENT_LABEL]),
latest
Mateo Merlo
05/23/2022, 4:03 PMJoshua Greenhalgh
05/23/2022, 4:09 PMMateo Merlo
05/23/2022, 5:02 PM