Hey all - my prefect flow that calls a dbt cloud j...
# prefect-community
z
Hey all - my prefect flow that calls a dbt cloud job failed when checking on the status of it (even though the dbt cloud job finished successfully). Here's the error:
State message: Flow run encountered an exception. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Looks like there was some kind of issue with the json response from dbt cloud (maybe it was empty?) We just upgraded dbt cloud to 1.5 - has anyone else ran into something similar? Is there any way to prevent a flow from failing because of a failed status API call? Maybe some way to let it retry? Flow code in comments Prefect version: 2.8.7
flow code:
Copy code
import os
from prefect import flow, task, unmapped

from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob

dev_stage = os.environ["DEV_STAGE"]
#create dbt credentials block
dbt_cloud_credentials = DbtCloudCredentials.load("dbt-cloud-credentials")

#TODO: put into queue: dbt-{dev-stage}?
    # yes, we can treat queues like labels in prefect 1
#TODO: figure out how to include the environment variable {"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"} 
#TODO: test this when picked up by the prefect agent running on ECS (deploy to queue "test")

# set job id's to run
# TODO: pull this from a different file (or block?) that can be shared with various flows
if dev_stage.lower() == "prd":
    default_job_ids = [
        41996,  # Production - Run
        47477,  # Production - Insurance AR API
    ]
else:
    default_job_ids = [
        41997, # Test - Test and Run on PR
        # 151777, #Test - test "new" flows
    ]  

dbt_cloud_jobs = [
    DbtCloudJob(
        dbt_cloud_credentials=dbt_cloud_credentials,
        job_id=job_id,
        timeout_seconds=3600,
    )
    for job_id in default_job_ids
]

print(dbt_cloud_jobs)

@task(
    name=f' {dev_stage} - dbt - task'
)
def run_dbt_cloud_job(dbt_cloud_job: DbtCloudJob):
    dbt_cloud_job_run = dbt_cloud_job.trigger()
    dbt_cloud_job_run.wait_for_completion()
    dbt_cloud_job_run.fetch_result()
    return dbt_cloud_job_run

@flow(
	name=f' {dev_stage} - dbt - flow',
	log_prints=True,
)
def trigger_dbt_cloud_job_run_flow():
    
	#run dbt job(s)
    dbt_results = run_dbt_cloud_job.map(dbt_cloud_jobs)

if __name__ == "__main__":
    trigger_dbt_cloud_job_run_flow()