Good Morning. In Orion (Prefect 2.0a4), I’m start...
# prefect-server
k
Good Morning. In Orion (Prefect 2.0a4), I’m starting to test out subflows. I’m noticing that when using a subflow as a future added to a task’s
wait_for
argument, Orion seems to ignore the wait_for and processes the tasks even when the subflow fails.
Copy code
import sys
from prefect import task, flow
sys.tracebacklimit = 0

@task
def should_not_run():
    print('THIS SHOULD NOT APPEAR')


@task
def fail():
    raise Exception('Fail')

@flow()
def subflow():
    fail()


@flow()
def my_flow():
    ss = fail()
    should_not_run(wait_for=ss)


@flow()
def my_flow_using_subflow():
    ss = subflow()
    should_not_run(wait_for=ss)

if __name__ == "__main__":
    print('-' * 50)
    print('NO SUBFLOW')
    print('-' * 50)
    my_flow()

    print('-' * 50)
    print('WITH SUBFLOW')
    print('-' * 50)
    my_flow_using_subflow()
Copy code
--------------------------------------------------
NO SUBFLOW
--------------------------------------------------
09:39:32.463 | Beginning flow run 'glorious-teal' for flow 'my-flow'...
09:39:32.463 | Starting executor `SequentialExecutor`...
09:39:32.518 | Submitting task run 'fail-6aa020f4-0' to executor...
09:39:32.537 | Task run 'fail-6aa020f4-0' encountered exception:
Exception: Fail
09:39:32.559 | Task run 'fail-6aa020f4-0' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
09:39:32.579 | Submitting task run 'should_not_run-9f323c93-0' to executor...
09:39:32.593 | Task run 'should_not_run-9f323c93-0' finished in state NotReady(message="Upstream task run '0680e64f-2da8-4eaf-9c07-a0c2112dc753' did not reach a 'COMPLETED' state.", type=PENDING)
09:39:32.594 | Shutting down executor `SequentialExecutor`...
09:39:32.608 | Flow run 'glorious-teal' finished in state Failed(message='1/2 states failed.', type=FAILED)
--------------------------------------------------
WITH SUBFLOW
--------------------------------------------------
09:39:32.638 | Beginning flow run 'glittering-hound' for flow 'my-flow-using-subflow'...
09:39:32.638 | Starting executor `SequentialExecutor`...
09:39:32.755 | Beginning subflow run 'tuscan-numbat' for flow 'subflow'...
09:39:32.755 | Starting executor `SequentialExecutor`...
09:39:32.797 | Submitting task run 'fail-6aa020f4-1' to executor...
09:39:32.814 | Task run 'fail-6aa020f4-1' encountered exception:
Exception: Fail
09:39:32.831 | Task run 'fail-6aa020f4-1' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
09:39:32.832 | Shutting down executor `SequentialExecutor`...
09:39:32.855 | Subflow run 'tuscan-numbat' finished in state Failed(message='1/1 states failed.', type=FAILED)
09:39:32.873 | Submitting task run 'should_not_run-9f323c93-1' to executor...
THIS SHOULD NOT APPEAR
09:39:32.908 | Task run 'should_not_run-9f323c93-1' finished in state Completed(message=None, type=COMPLETED)
09:39:32.909 | Shutting down executor `SequentialExecutor`...
09:39:32.922 | Flow run 'glittering-hound' finished in state Failed(message='1/2 states failed.', type=FAILED)
In the above code, in
my_flow
, I have a task that fails so the
should_not_run
function should be skipped. That is working as expected.
In
my_flow_using_subflow
, I introduce a subflow which has a task that fails. Given that the subflow failed, I expected that the
should_not_run
task would be skipped. However, it is still getting run. Is this the expected behavior? Thanks!
a
I will look at it in more detail, but just to clarify: in the flow below, you want that
should_not_run
doesn’t even start the execution if the child flow
ss
fails?
Copy code
@flow()
def my_flow_using_subflow():
    ss = subflow()
    should_not_run(wait_for=ss)
k
Correct. Thanks.
a
@Kelby I think this is how you can do it:
Copy code
@flow()
def my_flow_using_subflow():
    ss = subflow()
    ss_result = ss.result(raise_on_failure=True)
    should_not_run(wait_for=ss_result)
or even this:
Copy code
@flow()
def my_flow_using_subflow():
    ss = subflow()
    ss_result = ss.result()
    should_not_run(wait_for=ss_result)
I think the issue is that we need to explicitly wait until the child flow is finished and with .result() we do that
k
Yes, functionally, that appears to work. It does repeat the exception in the log though.
Copy code
--------------------------------------------------
WITH SUBFLOW
--------------------------------------------------
10:26:51.063 | Beginning flow run 'misty-llama' for flow 'my-flow-using-subflow'...
10:26:51.063 | Starting executor `SequentialExecutor`...
10:26:51.178 | Beginning subflow run 'rich-turaco' for flow 'subflow'...
10:26:51.178 | Starting executor `SequentialExecutor`...
10:26:51.241 | Submitting task run 'fail-6aa020f4-1' to executor...
10:26:51.262 | Task run 'fail-6aa020f4-1' encountered exception:
Exception: Fail
10:26:51.285 | Task run 'fail-6aa020f4-1' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
10:26:51.286 | Shutting down executor `SequentialExecutor`...
10:26:51.320 | Subflow run 'rich-turaco' finished in state Failed(message='1/1 states failed.', type=FAILED)
10:26:51.320 | Flow run 'misty-llama' encountered exception:
Exception: Fail
10:26:51.321 | Shutting down executor `SequentialExecutor`...
10:26:51.339 | Flow run 'misty-llama' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)
a
the exception is once logged for the task run and once for the flow run of the subflow, therefore it’s twice
z
I think this is unintended.
You shouldn't have to throw an exception in the flow to prevent a downstream from running.
k
In my case, it was the opposite. If I have an exception in my subflow, I don’t want the later tasks to execute.
z
Yep. I'm saying unpacking the subflow exception with
.result()
to reraise it in the parent isn't something I want you to have to do.
k
Ahh. Agreed.
z
We're only checking for failures in futures passed downstream, not states.
We wanted people to be able to pass states downstream manually so they could work with them, so we can't just stop your task from running if it receives failed states.
This is tricky with a subflow since it returns a state instead of a future.
a
@Zanie how should it ideally be defined then, like this?
Copy code
@flow()
def flow_with_subflow():
    ss = subflow()
    should_not_run(wait_for=ss)
if so, I can open an issue.