Kelby
11/11/2021, 2:40 PMwait_for
argument, Orion seems to ignore the wait_for and processes the tasks even when the subflow fails.import sys
from prefect import task, flow
sys.tracebacklimit = 0
@task
def should_not_run():
print('THIS SHOULD NOT APPEAR')
@task
def fail():
raise Exception('Fail')
@flow()
def subflow():
fail()
@flow()
def my_flow():
ss = fail()
should_not_run(wait_for=ss)
@flow()
def my_flow_using_subflow():
ss = subflow()
should_not_run(wait_for=ss)
if __name__ == "__main__":
print('-' * 50)
print('NO SUBFLOW')
print('-' * 50)
my_flow()
print('-' * 50)
print('WITH SUBFLOW')
print('-' * 50)
my_flow_using_subflow()
--------------------------------------------------
NO SUBFLOW
--------------------------------------------------
09:39:32.463 | Beginning flow run 'glorious-teal' for flow 'my-flow'...
09:39:32.463 | Starting executor `SequentialExecutor`...
09:39:32.518 | Submitting task run 'fail-6aa020f4-0' to executor...
09:39:32.537 | Task run 'fail-6aa020f4-0' encountered exception:
Exception: Fail
09:39:32.559 | Task run 'fail-6aa020f4-0' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
09:39:32.579 | Submitting task run 'should_not_run-9f323c93-0' to executor...
09:39:32.593 | Task run 'should_not_run-9f323c93-0' finished in state NotReady(message="Upstream task run '0680e64f-2da8-4eaf-9c07-a0c2112dc753' did not reach a 'COMPLETED' state.", type=PENDING)
09:39:32.594 | Shutting down executor `SequentialExecutor`...
09:39:32.608 | Flow run 'glorious-teal' finished in state Failed(message='1/2 states failed.', type=FAILED)
--------------------------------------------------
WITH SUBFLOW
--------------------------------------------------
09:39:32.638 | Beginning flow run 'glittering-hound' for flow 'my-flow-using-subflow'...
09:39:32.638 | Starting executor `SequentialExecutor`...
09:39:32.755 | Beginning subflow run 'tuscan-numbat' for flow 'subflow'...
09:39:32.755 | Starting executor `SequentialExecutor`...
09:39:32.797 | Submitting task run 'fail-6aa020f4-1' to executor...
09:39:32.814 | Task run 'fail-6aa020f4-1' encountered exception:
Exception: Fail
09:39:32.831 | Task run 'fail-6aa020f4-1' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
09:39:32.832 | Shutting down executor `SequentialExecutor`...
09:39:32.855 | Subflow run 'tuscan-numbat' finished in state Failed(message='1/1 states failed.', type=FAILED)
09:39:32.873 | Submitting task run 'should_not_run-9f323c93-1' to executor...
THIS SHOULD NOT APPEAR
09:39:32.908 | Task run 'should_not_run-9f323c93-1' finished in state Completed(message=None, type=COMPLETED)
09:39:32.909 | Shutting down executor `SequentialExecutor`...
09:39:32.922 | Flow run 'glittering-hound' finished in state Failed(message='1/2 states failed.', type=FAILED)
my_flow
, I have a task that fails so the should_not_run
function should be skipped. That is working as expected.my_flow_using_subflow
, I introduce a subflow which has a task that fails. Given that the subflow failed, I expected that the should_not_run
task would be skipped. However, it is still getting run. Is this the expected behavior? Thanks!Anna Geller
11/11/2021, 3:05 PMshould_not_run
doesn’t even start the execution if the child flow ss
fails?
@flow()
def my_flow_using_subflow():
ss = subflow()
should_not_run(wait_for=ss)
Kelby
11/11/2021, 3:06 PMAnna Geller
11/11/2021, 3:21 PM@flow()
def my_flow_using_subflow():
ss = subflow()
ss_result = ss.result(raise_on_failure=True)
should_not_run(wait_for=ss_result)
@flow()
def my_flow_using_subflow():
ss = subflow()
ss_result = ss.result()
should_not_run(wait_for=ss_result)
I think the issue is that we need to explicitly wait until the child flow is finished and with .result() we do thatKelby
11/11/2021, 3:36 PM--------------------------------------------------
WITH SUBFLOW
--------------------------------------------------
10:26:51.063 | Beginning flow run 'misty-llama' for flow 'my-flow-using-subflow'...
10:26:51.063 | Starting executor `SequentialExecutor`...
10:26:51.178 | Beginning subflow run 'rich-turaco' for flow 'subflow'...
10:26:51.178 | Starting executor `SequentialExecutor`...
10:26:51.241 | Submitting task run 'fail-6aa020f4-1' to executor...
10:26:51.262 | Task run 'fail-6aa020f4-1' encountered exception:
Exception: Fail
10:26:51.285 | Task run 'fail-6aa020f4-1' finished in state Failed(message='Task run encountered an exception.', type=FAILED)
10:26:51.286 | Shutting down executor `SequentialExecutor`...
10:26:51.320 | Subflow run 'rich-turaco' finished in state Failed(message='1/1 states failed.', type=FAILED)
10:26:51.320 | Flow run 'misty-llama' encountered exception:
Exception: Fail
10:26:51.321 | Shutting down executor `SequentialExecutor`...
10:26:51.339 | Flow run 'misty-llama' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)
Anna Geller
11/11/2021, 3:43 PMZanie
11/11/2021, 3:44 PMKelby
11/11/2021, 3:46 PMZanie
11/11/2021, 3:47 PM.result()
to reraise it in the parent isn't something I want you to have to do.Kelby
11/11/2021, 3:47 PMZanie
11/11/2021, 3:47 PMAnna Geller
11/11/2021, 3:50 PM@flow()
def flow_with_subflow():
ss = subflow()
should_not_run(wait_for=ss)
if so, I can open an issue.