I'm confused about <Final flow state> and <paralle...
# ask-community
c
I'm confused about Final flow state and parallelism in a Flow. 🧵
My flow looks like:
Copy code
from typing import Dict, Optional
import time
from prefect import task

from hs_prefect_utils import flow

from hs_de_workflows.flows.config import (
    DATABRICKS_TOKEN,
    SLACK_TOKEN,
)


@task
def ssm_task(name):
    time.sleep(2)
    return name


@task(task_run_name="dbt-test")
def dbt_test(
    a,
    b,
):
    """This always fails but should be ignored and not fail the flow."""
    time.sleep(2)
    raise ValueError("should not fail flow")
    return


@task(task_run_name="dbt-docs generate")
def dbt_docs():
    time.sleep(2)
    return


@task(task_run_name="send_edr_report")
def edr():
    """When this fails, it should fail the flow."""
    time.sleep(2)
    raise ValueError("should fail flow")


@task(task_run_name="upload_to_s3")
def upload_docs_to_s3():
    time.sleep(2)


@flow(name="tmp-cs")
def post_dbt_transform(
    select: Optional[str] = "tag:canary",  # XXX temporary debug
    exclude: Optional[str] = "elementary",
    variables: Optional[Dict[str, str]] = None,
):
    databricks_token = ssm_task.submit(DATABRICKS_TOKEN)
    slack_token = ssm_task.submit(SLACK_TOKEN)

    dbttest = dbt_test.submit(a=databricks_token, b=slack_token).result(
        # so that any error raised doesn't get caught by the Flow
        # and we can continue executing downstream tasks
        raise_on_failure=False
    )
    edr_report = edr.submit(
        wait_for=[dbttest],
        return_state=True,
    )
    dbtdocs = dbt_docs.submit(
        return_state=True,
    )
    upload_docs_to_s3.submit(
        wait_for=[dbtdocs],
        return_state=True,
    )
    # this allows us to ignore failed dbt_test since, if a test fails, dbt returns a non-zero exit code
    # <https://docs.prefect.io/latest/concepts/flows/#final-state-determination>
    return dbtdocs, edr_report, upload_docs_to_s3


post_dbt_transform()
When I run it (see screen shot) I'm seeing two issues: • I expect final state of the flow to be
FAILED
• The task "dbt-docs generate" should have executed in parallel with the "ssm_tasks(s)"
n
hi @Constantino Schillebeeckx - are you using prefect 2 or 3?
c
2
n
gotcha i think the behavior is expected as described by these two bullets from the docs you linked • If the flow does not return a value (or returns
None
), its state is determined by the states of all of the tasks and subflows within it. • If the flow run returns any other object, then it is marked as completed. so since you're not allowing those exceptions to raise, and you have a non
None
return
value, its
Completed
are you noticing a change in behavior, or this is just not what you expect?
c
what about:
If a flow returns a mix of futures and states, the final state is determined by resolving all futures to states, then determining if any of the states are not
COMPLETED
.
In my case, I'm returning multiple states, I guess the above does not apply?
n
in your example, you seem to be returning
tuple([PrefectFuture, PrefectFuture, Task])
, so yeah it doesnt apply perhaps you intended to return the future resulting from submitting
upload_docs_to_s3
?
c
In my mind, if a flow returned:
Copy code
SUCCESS, SUCCESS, FAILED
it would be marked as
FAILED
if instead it returned
Copy code
SUCCESS, SUCCESS, SUCCESS
it would be marked as
SUCCESS
In my case, am I not returning ?
Copy code
PrefectFuture, PrefectFuture, PrefectFuture
n
Copy code
upload_docs_to_s3.submit(
        wait_for=[dbtdocs],
        return_state=True,
    )

    return dbtdocs, edr_report, upload_docs_to_s3
Copy code
upload_docs_to_s3
is a task object no?
c
ah you're right! 🄹
that resolves final state determination; do you have any thoughts on why "dbt-docs generate" isn't executing at the very start of the flow (parallel to SSM)?
n
šŸ‘ hmm what happens if you move
Copy code
dbtdocs = dbt_docs.submit(
        return_state=True,
    )
up before
Copy code
dbttest = dbt_test.submit(a=databricks_token, b=slack_token).result(
        # so that any error raised doesn't get caught by the Flow
        # and we can continue executing downstream tasks
        raise_on_failure=False
    )
where .result() is blocking waiting for the future from
dbttest
?
c
hmmm that works ....
this feels like a bug?
n
i dont think so! i think its just because
.result()
is blocking so if you submit the other work before you start blocking, you give it time to do its thing in another thread
c
ah! my bad
ok that makes sense; although it doesn't feel very intuitive; the flow is just a dag, it shouldn't care where i define a task ...
n
the flow is just a dag
šŸ™‚ this was true in prefect 1, but is not in prefect>2 we discover the graph at runtime based on your python control flow
šŸ™Œ 1
c
thanks for all the help @Nate
catjam 1