Philip MacMenamin
09/16/2022, 7:21 PMPhilip MacMenamin
09/16/2022, 7:21 PMfrom prefect.triggers import all_finished, any_failed
from prefect import task, Flow
@task
def do_something_important():
# return True
raise ValueError("Non-deterministic error has occured.")
@task
def succeed():
print("Success")
@task(trigger=any_failed)
def fail():
print("\n--------------------\nUpstream Failure")
@task(trigger=all_finished)
def always_run():
print("Running regardless of upstream task's state")
with Flow("Trigger example") as flow:
success = succeed()
failed_task = fail(upstream_tasks=[do_something_important])
always_run(upstream_tasks=[do_something_important])
if __name__ == "__main__":
flow.run()
If I run the flow, the output will be:
...
prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Despite the fact an exception got raised.
If however that exception is not raised the outcome is:
[2022-09-16 20:15:34+0100] INFO - prefect.TaskRunner | Task 'always_run': Finished task run for task with final state: 'Success'
[2022-09-16 20:15:34+0100] INFO - prefect.TaskRunner | Task 'fail': Starting task run...
[2022-09-16 20:15:34+0100] INFO - prefect.TaskRunner | Task 'fail': Finished task run for task with final state: 'TriggerFailed'
[2022-09-16 20:15:34+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Is this expected behavior? Ie if the task is decorated with any_failed and no exceptions are raised the run is considered "Failed", and if an exception is raised the run is considered Success?Philip MacMenamin
09/19/2022, 1:47 PMChristopher Boyd
09/19/2022, 2:28 PMChristopher Boyd
09/19/2022, 2:29 PMPhilip MacMenamin
09/19/2022, 2:37 PMany_failure
, and there are no exceptions raised, the run will return a final state of FAILED: some reference tasks failed
And vice versa.
This seems strange, but maybe I'm not doing it correctly.Christopher Boyd
09/19/2022, 2:41 PMChristopher Boyd
09/19/2022, 2:42 PMFAILED
.
• If the flow does not return a value (or returns None
), its state is determined by the states of all of the tasks and subflows within it. If any task run or subflow run failed, then the final flow run state is marked as FAILED
.
• If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
• If the flow run returns any other object, then it is marked as successfully completed.Christopher Boyd
09/19/2022, 2:43 PMPhilip MacMenamin
09/19/2022, 2:46 PM@task
def do_something_important():
return True
@task
def succeed():
print("Success")
@task(trigger=all_failed)
def fail():
print("\n--------------------\nUpstream Failure")
@task(trigger=all_finished)
def always_run():
print("Running regardless of upstream task's state")
with Flow("Trigger example") as flow:
success = succeed()
failed_task = fail(upstream_tasks=[do_something_important])
always_run(upstream_tasks=[do_something_important])
if __name__ == "__main__":
flow.run()
...
[2022-09-19 154409+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
So, in the above, no exceptions were raised, no errors were encountered. The run is marked as failed.Christopher Boyd
09/19/2022, 2:48 PMPhilip MacMenamin
09/19/2022, 2:49 PMPhilip MacMenamin
09/19/2022, 2:51 PMany_failure
type task to occur at the end of the run.Christopher Boyd
09/19/2022, 2:53 PMPhilip MacMenamin
09/19/2022, 2:53 PMChristopher Boyd
09/19/2022, 2:55 PMPhilip MacMenamin
09/19/2022, 2:59 PMlogger = prefect.context.get("logger")
and there's some kind of log level setter somewhere?Philip MacMenamin
09/19/2022, 3:05 PMpython3 trigger_fail.py
[2022-09-19 16:03:43+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Trigger example'
[2022-09-19 16:03:43+0100] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2022-09-19 16:03:43+0100] DEBUG - prefect.FlowRunner | Flow 'Trigger example': Handling state change from Scheduled to Running
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'succeed': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'succeed': Handling state change from Pending to Running
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'succeed': Calling task.run() method...
Success
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'succeed': Handling state change from Running to Success
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'succeed': Finished task run for task with final state: 'Success'
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'do_something_important': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'do_something_important': Handling state change from Pending to Running
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'do_something_important': Calling task.run() method...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'do_something_important': Handling state change from Running to Success
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'do_something_important': Finished task run for task with final state: 'Success'
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'fail': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'fail': TRIGGERFAIL signal raised during execution.
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'fail': Handling state change from Pending to TriggerFailed
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'fail': Finished task run for task with final state: 'TriggerFailed'
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'always_run': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'always_run': Handling state change from Pending to Running
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'always_run': Calling task.run() method...
Running regardless of upstream task's state
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'always_run': Handling state change from Running to Success
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'always_run': Finished task run for task with final state: 'Success'
[2022-09-19 16:03:43+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2022-09-19 16:03:43+0100] DEBUG - prefect.FlowRunner | Flow 'Trigger example': Handling state change from Running to Failed
Christopher Boyd
09/19/2022, 3:05 PMPhilip MacMenamin
09/19/2022, 3:06 PMtrigger_fail
decorator is itself triggering a failure, despite there being no actual failureChristopher Boyd
09/19/2022, 3:17 PMChristopher Boyd
09/19/2022, 3:17 PMChristopher Boyd
09/19/2022, 3:17 PMimport random
from prefect.triggers import all_successful, all_failed
from prefect import task, Flow
@task(name="Task A")
def task_a():
if random.random() > 0.5:
raise ValueError("Non-deterministic error has occured.")
@task(name="Task B", trigger=all_successful)
def task_b():
# do something interesting
pass
@task(name="Task C", trigger=all_failed)
def task_c():
# do something interesting
pass
with Flow("Trigger example") as flow:
success = task_b(upstream_tasks=[task_a])
fail = task_c(upstream_tasks=[task_a])
## note that as written, this flow will fail regardless of the path taken
## because *at least one* terminal task will fail;
## to fix this, we want to set Task B as the "reference task" for the Flow
## so that its state uniquely determines the overall Flow state
flow.set_reference_tasks([success])
flow.run()
Philip MacMenamin
09/19/2022, 3:17 PM## note that as written, this flow will fail regardless of the path taken
## because *at least one* terminal task will fail;
## to fix this, we want to set Task B as the "reference task" for the Flow
## so that its state uniquely determines the overall Flow state
flow.set_reference_tasks([success])
Looks like that in the example they're saying that task_c
(the guy decorated with the all_failed
decorator will run... yeahPhilip MacMenamin
09/19/2022, 3:18 PMall_failed
it will run if there's no failure, and raise an exception??Philip MacMenamin
09/19/2022, 3:19 PMChristopher Boyd
09/19/2022, 3:20 PM