Zachary Loertscher
07/13/2023, 1:45 PMState message: Flow run encountered an exception. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Looks like there was some kind of issue with the json response from dbt cloud (maybe it was empty?) We just upgraded dbt cloud to 1.5 - has anyone else ran into something similar? Is there any way to prevent a flow from failing because of a failed status API call? Maybe some way to let it retry?
Flow code in comments
Prefect version: 2.8.7import os
from prefect import flow, task, unmapped
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
dev_stage = os.environ["DEV_STAGE"]
#create dbt credentials block
dbt_cloud_credentials = DbtCloudCredentials.load("dbt-cloud-credentials")
#TODO: put into queue: dbt-{dev-stage}?
# yes, we can treat queues like labels in prefect 1
#TODO: figure out how to include the environment variable {"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}
#TODO: test this when picked up by the prefect agent running on ECS (deploy to queue "test")
# set job id's to run
# TODO: pull this from a different file (or block?) that can be shared with various flows
if dev_stage.lower() == "prd":
default_job_ids = [
41996, # Production - Run
47477, # Production - Insurance AR API
]
else:
default_job_ids = [
41997, # Test - Test and Run on PR
# 151777, #Test - test "new" flows
]
dbt_cloud_jobs = [
DbtCloudJob(
dbt_cloud_credentials=dbt_cloud_credentials,
job_id=job_id,
timeout_seconds=3600,
)
for job_id in default_job_ids
]
print(dbt_cloud_jobs)
@task(
name=f' {dev_stage} - dbt - task'
)
def run_dbt_cloud_job(dbt_cloud_job: DbtCloudJob):
dbt_cloud_job_run = dbt_cloud_job.trigger()
dbt_cloud_job_run.wait_for_completion()
dbt_cloud_job_run.fetch_result()
return dbt_cloud_job_run
@flow(
name=f' {dev_stage} - dbt - flow',
log_prints=True,
)
def trigger_dbt_cloud_job_run_flow():
#run dbt job(s)
dbt_results = run_dbt_cloud_job.map(dbt_cloud_jobs)
if __name__ == "__main__":
trigger_dbt_cloud_job_run_flow()