My flow looks like:
from typing import Dict, Optional
import time
from prefect import task
from hs_prefect_utils import flow
from hs_de_workflows.flows.config import (
DATABRICKS_TOKEN,
SLACK_TOKEN,
)
@task
def ssm_task(name):
time.sleep(2)
return name
@task(task_run_name="dbt-test")
def dbt_test(
a,
b,
):
"""This always fails but should be ignored and not fail the flow."""
time.sleep(2)
raise ValueError("should not fail flow")
return
@task(task_run_name="dbt-docs generate")
def dbt_docs():
time.sleep(2)
return
@task(task_run_name="send_edr_report")
def edr():
"""When this fails, it should fail the flow."""
time.sleep(2)
raise ValueError("should fail flow")
@task(task_run_name="upload_to_s3")
def upload_docs_to_s3():
time.sleep(2)
@flow(name="tmp-cs")
def post_dbt_transform(
select: Optional[str] = "tag:canary", # XXX temporary debug
exclude: Optional[str] = "elementary",
variables: Optional[Dict[str, str]] = None,
):
databricks_token = ssm_task.submit(DATABRICKS_TOKEN)
slack_token = ssm_task.submit(SLACK_TOKEN)
dbttest = dbt_test.submit(a=databricks_token, b=slack_token).result(
# so that any error raised doesn't get caught by the Flow
# and we can continue executing downstream tasks
raise_on_failure=False
)
edr_report = edr.submit(
wait_for=[dbttest],
return_state=True,
)
dbtdocs = dbt_docs.submit(
return_state=True,
)
upload_docs_to_s3.submit(
wait_for=[dbtdocs],
return_state=True,
)
# this allows us to ignore failed dbt_test since, if a test fails, dbt returns a non-zero exit code
# <https://docs.prefect.io/latest/concepts/flows/#final-state-determination>
return dbtdocs, edr_report, upload_docs_to_s3
post_dbt_transform()
When I run it (see screen shot) I'm seeing two issues:
⢠I expect final state of the flow to be
FAILED
⢠The task "dbt-docs generate" should have executed in parallel with the "ssm_tasks(s)"