Dominik Wagner
06/28/2022, 10:10 AMdbt source freshness
a. If I get an error, fail the flow
2. run dbt build
I’ve figured out a way to fail the flow by scanning the result from PrefectFuture but somehow this doesn’t feel right (simplified code snippet in 🧵).
Two questions 🙏:
1. Is there a “better” way to handle this?
2. I’d also like to add some kind of notification when the flow fails (email and/or slack) - I can’t find any integration within prefect itself, so would the recommended route be to send a webhook directly in the flow?def dbt_flow(env: str = "dev"):
logger = get_run_logger()
freshness = shell_run_command(
command=f"dbt source freshness -t {env}",
return_all=True
)
result = freshness.result()
for res in result:
assert "ERROR STALE" not in res
<http://logger.info|logger.info>(f"Source is fresh, continuing...")
build = shell_run_command(
command=f"dbt build -t {env}"
)
Bianca Hoch
06/28/2022, 4:00 PMif
statement to your flow to determine whether or not to run dbt build
based on the result of the `check dbt source freshness.`Here is some documentation where you can find an example of using results from tasks within the context of a flow.Dominik Wagner
06/28/2022, 7:34 PMBianca Hoch
06/29/2022, 2:03 PMdbt build
in response to the "ERROR STALE" error message specifically, then parsing the result to look for that error is a good idea.