https://prefect.io logo
Title
k

Kelby

11/11/2021, 2:40 PM
Good Morning. In Orion (Prefect 2.0a4), I’m starting to test out subflows. I’m noticing that when using a subflow as a future added to a task’s
wait_for
argument, Orion seems to ignore the wait_for and processes the tasks even when the subflow fails.
import sys
from prefect import task, flow
sys.tracebacklimit = 0

@task
def should_not_run():
    print('THIS SHOULD NOT APPEAR')


@task
def fail():
    raise Exception('Fail')

@flow()
def subflow():
    fail()


@flow()
def my_flow():
    ss = fail()
    should_not_run(wait_for=ss)


@flow()
def my_flow_using_subflow():
    ss = subflow()
    should_not_run(wait_for=ss)

if __name__ == "__main__":
    print('-' * 50)
    print('NO SUBFLOW')
    print('-' * 50)
    my_flow()

    print('-' * 50)
    print('WITH SUBFLOW')
    print('-' * 50)
    my_flow_using_subflow()
--------------------------------------------------
NO SUBFLOW
--------------------------------------------------
09:39:32.463 | Beginning flow run 'glorious-teal' for flow 'my-flow'...
09:39:32.463 | Starting executor `SequentialExecutor`...
09:39:32.518 | Submitting task run 'fail-6aa020f4-0' to executor...
09:39:32.537 | Task run 'fail-6aa020f4-0' encountered exception:
Exception: Fail
09:39:32.559 | Task run 'fail-6aa020f4-0' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
09:39:32.579 | Submitting task run 'should_not_run-9f323c93-0' to executor...
09:39:32.593 | Task run 'should_not_run-9f323c93-0' finished in state NotReady(message="Upstream task run '0680e64f-2da8-4eaf-9c07-a0c2112dc753' did not reach a 'COMPLETED' state.", type=PENDING)
09:39:32.594 | Shutting down executor `SequentialExecutor`...
09:39:32.608 | Flow run 'glorious-teal' finished in state Failed(message='1/2 states failed.', type=FAILED)
--------------------------------------------------
WITH SUBFLOW
--------------------------------------------------
09:39:32.638 | Beginning flow run 'glittering-hound' for flow 'my-flow-using-subflow'...
09:39:32.638 | Starting executor `SequentialExecutor`...
09:39:32.755 | Beginning subflow run 'tuscan-numbat' for flow 'subflow'...
09:39:32.755 | Starting executor `SequentialExecutor`...
09:39:32.797 | Submitting task run 'fail-6aa020f4-1' to executor...
09:39:32.814 | Task run 'fail-6aa020f4-1' encountered exception:
Exception: Fail
09:39:32.831 | Task run 'fail-6aa020f4-1' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
09:39:32.832 | Shutting down executor `SequentialExecutor`...
09:39:32.855 | Subflow run 'tuscan-numbat' finished in state Failed(message='1/1 states failed.', type=FAILED)
09:39:32.873 | Submitting task run 'should_not_run-9f323c93-1' to executor...
THIS SHOULD NOT APPEAR
09:39:32.908 | Task run 'should_not_run-9f323c93-1' finished in state Completed(message=None, type=COMPLETED)
09:39:32.909 | Shutting down executor `SequentialExecutor`...
09:39:32.922 | Flow run 'glittering-hound' finished in state Failed(message='1/2 states failed.', type=FAILED)
In the above code, in
my_flow
, I have a task that fails so the
should_not_run
function should be skipped. That is working as expected.
In
my_flow_using_subflow
, I introduce a subflow which has a task that fails. Given that the subflow failed, I expected that the
should_not_run
task would be skipped. However, it is still getting run. Is this the expected behavior? Thanks!
a

Anna Geller

11/11/2021, 3:05 PM
I will look at it in more detail, but just to clarify: in the flow below, you want that
should_not_run
doesn’t even start the execution if the child flow
ss
fails?
@flow()
def my_flow_using_subflow():
    ss = subflow()
    should_not_run(wait_for=ss)
k

Kelby

11/11/2021, 3:06 PM
Correct. Thanks.
a

Anna Geller

11/11/2021, 3:21 PM
@Kelby I think this is how you can do it:
@flow()
def my_flow_using_subflow():
    ss = subflow()
    ss_result = ss.result(raise_on_failure=True)
    should_not_run(wait_for=ss_result)
or even this:
@flow()
def my_flow_using_subflow():
    ss = subflow()
    ss_result = ss.result()
    should_not_run(wait_for=ss_result)
I think the issue is that we need to explicitly wait until the child flow is finished and with .result() we do that
k

Kelby

11/11/2021, 3:36 PM
Yes, functionally, that appears to work. It does repeat the exception in the log though.
--------------------------------------------------
WITH SUBFLOW
--------------------------------------------------
10:26:51.063 | Beginning flow run 'misty-llama' for flow 'my-flow-using-subflow'...
10:26:51.063 | Starting executor `SequentialExecutor`...
10:26:51.178 | Beginning subflow run 'rich-turaco' for flow 'subflow'...
10:26:51.178 | Starting executor `SequentialExecutor`...
10:26:51.241 | Submitting task run 'fail-6aa020f4-1' to executor...
10:26:51.262 | Task run 'fail-6aa020f4-1' encountered exception:
Exception: Fail
10:26:51.285 | Task run 'fail-6aa020f4-1' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
10:26:51.286 | Shutting down executor `SequentialExecutor`...
10:26:51.320 | Subflow run 'rich-turaco' finished in state Failed(message='1/1 states failed.', type=FAILED)
10:26:51.320 | Flow run 'misty-llama' encountered exception:
Exception: Fail
10:26:51.321 | Shutting down executor `SequentialExecutor`...
10:26:51.339 | Flow run 'misty-llama' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)
a

Anna Geller

11/11/2021, 3:43 PM
the exception is once logged for the task run and once for the flow run of the subflow, therefore it’s twice
z

Zanie

11/11/2021, 3:44 PM
I think this is unintended.
You shouldn't have to throw an exception in the flow to prevent a downstream from running.
k

Kelby

11/11/2021, 3:46 PM
In my case, it was the opposite. If I have an exception in my subflow, I don’t want the later tasks to execute.
z

Zanie

11/11/2021, 3:47 PM
Yep. I'm saying unpacking the subflow exception with
.result()
to reraise it in the parent isn't something I want you to have to do.
k

Kelby

11/11/2021, 3:47 PM
Ahh. Agreed.
z

Zanie

11/11/2021, 3:47 PM
We're only checking for failures in futures passed downstream, not states.
We wanted people to be able to pass states downstream manually so they could work with them, so we can't just stop your task from running if it receives failed states.
This is tricky with a subflow since it returns a state instead of a future.
a

Anna Geller

11/11/2021, 3:50 PM
@Zanie how should it ideally be defined then, like this?
@flow()
def flow_with_subflow():
    ss = subflow()
    should_not_run(wait_for=ss)
if so, I can open an issue.