Michael K
12/01/2022, 7:52 PMdeployment build
I get the following errors (in thread).
Perhaps they are related, but at this point I'm stumped. Not sure why it runs but cant be deployed.
Any help would be appreciated and I'm happy to provide more info as needed. Thanks!Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "daily_run.py", line 20, in <module>
token = github_credentials_block.token
AttributeError: 'coroutine' object has no attribute 'token'
The above exception was the direct cause of the following exception:
Error 2:
sys:1: RuntimeWarning: coroutine 'Block.load' was never awaited
Jeff Hale
12/01/2022, 7:59 PMMichael K
12/01/2022, 8:54 PMfrom prefect import task, flow
from prefect.blocks.notifications import SlackWebhook
from prefect.filesystems import GitHub
from prefect.filesystems import GCS
from prefect_github import GitHubCredentials
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import BigQueryTargetConfigs
from prefect_gcp import GcpCredentials
import shutil
import os
#Slack Credentials Block
slack_webhook_block = SlackWebhook.load("slack-webhook")
# GitHub Credentials Block
github_credentials_block = GitHubCredentials.load("get-dbt-repo")
token = github_credentials_block.token
repo = "<https://github.com/[owner]/[repo].git>"
github_configs = GitHub(
repository=repo,
access_token=token,
)
# BigQuery Credentials Block
gcp_credentials_block = GcpCredentials.load("gcp-credentials")
target_configs = BigQueryTargetConfigs(
schema="[dataset]",
project="[project]",
credentials=gcp_credentials_block,
)
# Google Storage Block
gcs_block = GCS.load("google-storage-block")
@task
def clone_dbt_repo():
shutil.rmtree("dbt_temp/", ignore_errors=True)
github_configs.get_directory(local_path="dbt_temp/")
print("Cloned GitHub directory.")
@flow
def run_dbt_deps():
try:
trigger_dbt_cli_command("dbt deps")
except Exception as e:
error_msg = repr(e)
print('error caught:: ', error_msg)
slack_webhook_block.notify(error_msg)
@flow
def run_dbt_build():
try:
trigger_dbt_cli_command("dbt build")
except Exception as e:
error_msg = repr(e)
print('error caught:: ', error_msg)
slack_webhook_block.notify(error_msg)
@task
def delete_dbt_repo():
shutil.rmtree("dbt_temp/", ignore_errors=True)
@task
def send_slack_message():
slack_webhook_block.notify("Run completed")
@flow
def main_wrapper_flow():
clone_dbt_repo()
os.chdir("dbt_temp")
run_dbt_deps()
run_dbt_build()
os.chdir("..")
delete_dbt_repo()
send_slack_message()
if __name__ == "__main__":
main_wrapper_flow()
Deploy
prefect deployment build daily_run.py:main_wrapper_flow --name "Test Deployment" --storage-block gcs/google-storage-block
Jeff Hale
12/01/2022, 10:54 PMMichael K
12/01/2022, 11:08 PMJeff Hale
12/02/2022, 1:44 AM