Maverick Humbert
06/24/2022, 11:55 AMKevin Kho
06/24/2022, 2:11 PMMaverick Humbert
06/24/2022, 2:13 PMdef pull_dbt_repo():
github_token = os.environ.get("GITHUB_ACCESS_TOKEN")
shutil.rmtree("dbt", ignore_errors=True) _# Delete folder on run_
dbt_repo_name = "dbt-thesandbox"
dbt_repo = (
f"https://{github_token}:<mailto:x-oauth-basic@github.com|x-oauth-basic@github.com>/xxx/{dbt_repo_name}"
)
pygit2.clone_repository(dbt_repo, "dbt")
pathlib.Path(os.path.expanduser('~/.dbt/')).mkdir(parents=True, exist_ok=True)
shutil.copy('dbt/dbt_thesandbox/profiles.yml', os.path.expanduser('~/.dbt/'))
I setup the dbt task (& link profiles dir to ~/.dbt):
dbt = DbtShellTask(
profiles_dir=os.path.expanduser('~/.dbt/'),
log_stdout=True,
log_stderr=True,
overwrite_profiles=True,
profile_name='dbt_thesandbox',
helper_script="cd dbt/dbt_thesandbox",
)
Execute a specific dbt model:
try:
history_marts = dbt(
command='dbt run --select models/marts/tsb_history',
upstream_tasks=[transfer_final]
)
except Exception as e:
print(e)
Result (with prefect[dbt]):
/tmp/prefect-q5dqm2jn: line 2: dbt: command not found
FAIL signal raised: FAIL('Command failed with exit code 127')
Result (with dbt-core & dbt-postgres dependencies + prefect[dbt] ):
Could not run dbt 'Command failed with exit code 2
BUT, it's working with dbt debug --config-dir
& fail with dbt run --select models/marts/tsb_history
Is my helper script wrong ? Is the path wrong somewhere ? Should I delete deps dbt-core & dbt-postgres ?
Thanks for your help !Nate
06/24/2022, 2:36 PMdbt
extra for the prefect
pypi distribution. you should have seen something like
❯ pip install "prefect[dbt]"
Collecting prefect[dbt]
WARNING: prefect 1.2.2 does not provide the extra 'dbt'
when trying to install it
so, you'd need to install dbt-core
and any other dbt packages yourself to your venvMaverick Humbert
06/24/2022, 6:05 PMhelper_script
is not linked with my command
My --select
should be wrong with that helper_script
right ?
dbt = DbtShellTask(
profiles_dir=os.path.join(os.path.expanduser("~"), ".dbt"),
log_stdout=True,
log_stderr=True,
overwrite_profiles=False,
profile_name='dbt_thesandbox',
helper_script="cd dbt/dbt_thesandbox",
)
schedule = Schedule(clocks=[CronClock("30 0 * * *")])
with Flow("[LAND's: Old & Current] Create history", schedule=schedule) as flow:
pull_repo = pull_dbt_repo()
transfer_final = tx_receipt()
history_marts = dbt(
command='dbt run --select ./dbt/dbt_thesandbox/models/marts/tsb_history',
upstream_tasks=[transfer_final]
)