Vincenzo
03/21/2023, 9:05 AMVincenzo
03/21/2023, 9:06 AMfrom prefect import flow, task
from <http://prefect_dbt.cloud.jobs|prefect_dbt.cloud.jobs> import run_dbt_cloud_job
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
your_dbt_api_key = "API" # (1) insert your dbt Cloud API key
your_dbt_account_id = "ACCOUNT" # (2) insert your dbt Cloud account ID
your_dbt_job_id = "JOB" # (3) insert your dbt Cloud job ID
dbt_cloud_credential_name = "magic-the-gathering-credentials"
DbtCloudCredentials(api_key=your_dbt_api_key, account_id=your_dbt_account_id).save(
dbt_cloud_credential_name, overwrite=True
)
dbt_cloud_credentials = DbtCloudCredentials.load(dbt_cloud_credential_name)
@task(log_prints=True, name="Run dbt Cloud Job")
def run_dbt_job_flow():
"""Runs the dbt Cloud job and returns the result"""
result = run_dbt_cloud_job(
dbt_cloud_job=DbtCloudJob.load(dbt_cloud_credentials, your_dbt_job_id),
targeted_retries=5,
)
return result
@flow(log_prints=True, name="[Magic: The Gathering] Load latest data to BigQuery")
def dbt_job_orchestration():
"""Orchestrates the flow of the dbt Cloud job"""
run_dbt_job_flow()
Chris Reuter
03/21/2023, 11:34 AMVincenzo
03/21/2023, 3:28 PMAdam
03/21/2023, 3:36 PMtrigger_dbt_cloud_job_run_and_wait_for_completion()
Adam
03/21/2023, 3:36 PMfrom <http://prefect_dbt.cloud.jobs|prefect_dbt.cloud.jobs> import trigger_dbt_cloud_job_run_and_wait_for_completion
trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials = DbtCloudCredentials.load("test-creds"),
job_id = x,
poll_frequency_seconds = 60
)
Adam
03/21/2023, 3:37 PMVincenzo
03/21/2023, 4:24 PMAdam
03/21/2023, 4:26 PMVincenzo
03/21/2023, 4:28 PMVincenzo
03/22/2023, 7:04 AMVincenzo
03/22/2023, 7:05 AMAdam
03/22/2023, 3:20 PM