Etienne Callies
11/14/2024, 11:59 AM@task(name="DBT - Command")
async def dbt_command(command):
return await trigger_dbt_cli_command(
command=command,
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile,
project_dir=project_dir_str,
)
# Install dbt dependencies
await dbt_command("dbt deps")
# Source freshness test
await dbt_command(
f"dbt source freshness --exclude {' '.join(exclude_sources)}"
)
How to make it stop at first task failure ?Marvin
11/14/2024, 11:59 AMpython
@task(name="Handle DBT Command Failure")
async def handle_dbt_command_failure(command):
try:
return await trigger_dbt_cli_command(
command=command,
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile,
project_dir=project_dir_str,
)
except Exception as e:
# Log the error and re-raise to stop the flow
print(f"Task failed with error: {str(e)}")
raise
# Install dbt dependencies
await handle_dbt_command_failure("dbt deps")
# Source freshness test
await handle_dbt_command_failure(
f"dbt source freshness --exclude {' '.join(exclude_sources)}"
)
This is a simple illustrative solution. You can further explore Prefect's 3.x Transaction documentation to better suit your needs, available here.
If you start talking to your flow, make sure it doesn’t reply... you might just have invented true AI!