Constantino Schillebeeckx
08/28/2024, 3:19 PMConstantino Schillebeeckx
08/28/2024, 3:20 PMfrom typing import Dict, Optional
import time
from prefect import task
from hs_prefect_utils import flow
from hs_de_workflows.flows.config import (
    DATABRICKS_TOKEN,
    SLACK_TOKEN,
)
@task
def ssm_task(name):
    time.sleep(2)
    return name
@task(task_run_name="dbt-test")
def dbt_test(
    a,
    b,
):
    """This always fails but should be ignored and not fail the flow."""
    time.sleep(2)
    raise ValueError("should not fail flow")
    return
@task(task_run_name="dbt-docs generate")
def dbt_docs():
    time.sleep(2)
    return
@task(task_run_name="send_edr_report")
def edr():
    """When this fails, it should fail the flow."""
    time.sleep(2)
    raise ValueError("should fail flow")
@task(task_run_name="upload_to_s3")
def upload_docs_to_s3():
    time.sleep(2)
@flow(name="tmp-cs")
def post_dbt_transform(
    select: Optional[str] = "tag:canary",  # XXX temporary debug
    exclude: Optional[str] = "elementary",
    variables: Optional[Dict[str, str]] = None,
):
    databricks_token = ssm_task.submit(DATABRICKS_TOKEN)
    slack_token = ssm_task.submit(SLACK_TOKEN)
    dbttest = dbt_test.submit(a=databricks_token, b=slack_token).result(
        # so that any error raised doesn't get caught by the Flow
        # and we can continue executing downstream tasks
        raise_on_failure=False
    )
    edr_report = edr.submit(
        wait_for=[dbttest],
        return_state=True,
    )
    dbtdocs = dbt_docs.submit(
        return_state=True,
    )
    upload_docs_to_s3.submit(
        wait_for=[dbtdocs],
        return_state=True,
    )
    # this allows us to ignore failed dbt_test since, if a test fails, dbt returns a non-zero exit code
    # <https://docs.prefect.io/latest/concepts/flows/#final-state-determination>
    return dbtdocs, edr_report, upload_docs_to_s3
post_dbt_transform()FAILEDNate
08/28/2024, 3:21 PMConstantino Schillebeeckx
08/28/2024, 3:22 PMNate
08/28/2024, 3:27 PMNoneNonereturnCompletedNate
08/28/2024, 3:27 PMConstantino Schillebeeckx
08/28/2024, 3:28 PMIf a flow returns a mix of futures and states, the final state is determined by resolving all futures to states, then determining if any of the states are not.COMPLETED
Constantino Schillebeeckx
08/28/2024, 3:28 PMNate
08/28/2024, 3:30 PMtuple([PrefectFuture, PrefectFuture, Task])upload_docs_to_s3Constantino Schillebeeckx
08/28/2024, 3:32 PMSUCCESS, SUCCESS, FAILEDFAILEDSUCCESS, SUCCESS, SUCCESSSUCCESSPrefectFuture, PrefectFuture, PrefectFutureNate
08/28/2024, 3:33 PMupload_docs_to_s3.submit(
        wait_for=[dbtdocs],
        return_state=True,
    )
    return dbtdocs, edr_report, upload_docs_to_s3upload_docs_to_s3Constantino Schillebeeckx
08/28/2024, 3:35 PMConstantino Schillebeeckx
08/28/2024, 3:35 PMNate
08/28/2024, 3:43 PMdbtdocs = dbt_docs.submit(
        return_state=True,
    )dbttest = dbt_test.submit(a=databricks_token, b=slack_token).result(
        # so that any error raised doesn't get caught by the Flow
        # and we can continue executing downstream tasks
        raise_on_failure=False
    )dbttestConstantino Schillebeeckx
08/28/2024, 3:48 PMConstantino Schillebeeckx
08/28/2024, 3:48 PMNate
08/28/2024, 3:48 PM.result()Constantino Schillebeeckx
08/28/2024, 3:49 PMConstantino Schillebeeckx
08/28/2024, 3:49 PMNate
08/28/2024, 3:50 PMthe flow is just a dagš this was true in prefect 1, but is not in prefect>2 we discover the graph at runtime based on your python control flow
Constantino Schillebeeckx
08/28/2024, 3:58 PM