Constantino Schillebeeckx
08/28/2024, 3:19 PMConstantino Schillebeeckx
08/28/2024, 3:20 PMfrom typing import Dict, Optional
import time
from prefect import task
from hs_prefect_utils import flow
from hs_de_workflows.flows.config import (
DATABRICKS_TOKEN,
SLACK_TOKEN,
)
@task
def ssm_task(name):
time.sleep(2)
return name
@task(task_run_name="dbt-test")
def dbt_test(
a,
b,
):
"""This always fails but should be ignored and not fail the flow."""
time.sleep(2)
raise ValueError("should not fail flow")
return
@task(task_run_name="dbt-docs generate")
def dbt_docs():
time.sleep(2)
return
@task(task_run_name="send_edr_report")
def edr():
"""When this fails, it should fail the flow."""
time.sleep(2)
raise ValueError("should fail flow")
@task(task_run_name="upload_to_s3")
def upload_docs_to_s3():
time.sleep(2)
@flow(name="tmp-cs")
def post_dbt_transform(
select: Optional[str] = "tag:canary", # XXX temporary debug
exclude: Optional[str] = "elementary",
variables: Optional[Dict[str, str]] = None,
):
databricks_token = ssm_task.submit(DATABRICKS_TOKEN)
slack_token = ssm_task.submit(SLACK_TOKEN)
dbttest = dbt_test.submit(a=databricks_token, b=slack_token).result(
# so that any error raised doesn't get caught by the Flow
# and we can continue executing downstream tasks
raise_on_failure=False
)
edr_report = edr.submit(
wait_for=[dbttest],
return_state=True,
)
dbtdocs = dbt_docs.submit(
return_state=True,
)
upload_docs_to_s3.submit(
wait_for=[dbtdocs],
return_state=True,
)
# this allows us to ignore failed dbt_test since, if a test fails, dbt returns a non-zero exit code
# <https://docs.prefect.io/latest/concepts/flows/#final-state-determination>
return dbtdocs, edr_report, upload_docs_to_s3
post_dbt_transform()
When I run it (see screen shot) I'm seeing two issues:
⢠I expect final state of the flow to be FAILED
⢠The task "dbt-docs generate" should have executed in parallel with the "ssm_tasks(s)"Nate
08/28/2024, 3:21 PMConstantino Schillebeeckx
08/28/2024, 3:22 PMNate
08/28/2024, 3:27 PMNone
), its state is determined by the states of all of the tasks and subflows within it.
⢠If the flow run returns any other object, then it is marked as completed.
so since you're not allowing those exceptions to raise, and you have a non None
return
value, its Completed
Nate
08/28/2024, 3:27 PMConstantino Schillebeeckx
08/28/2024, 3:28 PMIf a flow returns a mix of futures and states, the final state is determined by resolving all futures to states, then determining if any of the states are not.COMPLETED
Constantino Schillebeeckx
08/28/2024, 3:28 PMNate
08/28/2024, 3:30 PMtuple([PrefectFuture, PrefectFuture, Task])
, so yeah it doesnt apply
perhaps you intended to return the future resulting from submitting upload_docs_to_s3
?Constantino Schillebeeckx
08/28/2024, 3:32 PMSUCCESS, SUCCESS, FAILED
it would be marked as FAILED
if instead it returned
SUCCESS, SUCCESS, SUCCESS
it would be marked as SUCCESS
In my case, am I not returning ?
PrefectFuture, PrefectFuture, PrefectFuture
Nate
08/28/2024, 3:33 PMupload_docs_to_s3.submit(
wait_for=[dbtdocs],
return_state=True,
)
return dbtdocs, edr_report, upload_docs_to_s3
upload_docs_to_s3
is a task object no?Constantino Schillebeeckx
08/28/2024, 3:35 PMConstantino Schillebeeckx
08/28/2024, 3:35 PMNate
08/28/2024, 3:43 PMdbtdocs = dbt_docs.submit(
return_state=True,
)
up before
dbttest = dbt_test.submit(a=databricks_token, b=slack_token).result(
# so that any error raised doesn't get caught by the Flow
# and we can continue executing downstream tasks
raise_on_failure=False
)
where .result() is blocking waiting for the future from dbttest
?Constantino Schillebeeckx
08/28/2024, 3:48 PMConstantino Schillebeeckx
08/28/2024, 3:48 PMNate
08/28/2024, 3:48 PM.result()
is blocking
so if you submit the other work before you start blocking, you give it time to do its thing in another threadConstantino Schillebeeckx
08/28/2024, 3:49 PMConstantino Schillebeeckx
08/28/2024, 3:49 PMNate
08/28/2024, 3:50 PMthe flow is just a dagš this was true in prefect 1, but is not in prefect>2 we discover the graph at runtime based on your python control flow
Constantino Schillebeeckx
08/28/2024, 3:58 PM