Question regarding trigger.any_failed:
# ask-community
p
Question regarding trigger.any_failed:
1️⃣ 1
🙌 1
Given the example:
from prefect.triggers import all_finished, any_failed
from prefect import task, Flow
@task
def do_something_important():
# return True
raise ValueError("Non-deterministic error has occured.")
@task
def succeed():
print("Success")
@task(trigger=any_failed)
def fail():
print("\n--------------------\nUpstream Failure")
@task(trigger=all_finished)
def always_run():
print("Running regardless of upstream task's state")
with Flow("Trigger example") as flow:
success = succeed()
failed_task = fail(upstream_tasks=[do_something_important])
always_run(upstream_tasks=[do_something_important])
if __name__ == "__main__":
flow.run()
If I run the flow, the output will be: ...
prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Despite the fact an exception got raised. If however that exception is not raised the outcome is:
[2022-09-16 20:15:34+0100] INFO - prefect.TaskRunner | Task 'always_run': Finished task run for task with final state: 'Success'
[2022-09-16 20:15:34+0100] INFO - prefect.TaskRunner | Task 'fail': Starting task run...
[2022-09-16 20:15:34+0100] INFO - prefect.TaskRunner | Task 'fail': Finished task run for task with final state: 'TriggerFailed'
[2022-09-16 20:15:34+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Is this expected behavior? Ie if the task is decorated with any_failed and no exceptions are raised the run is considered "Failed", and if an exception is raised the run is considered Success?
Hello - is there potentially somewhere more visible I could post this?
c
Hi Philip, it seems to me like like it’s failing because you are calling fail() (a task) on an item with a trigger that only triggers if there is a failure?
if do_something_important doesn’t raise a valueError, there is no error, and fail (which is really just a print with a trigger) never actually triggers, but you are calling it and assigning it explicitly
p
the behavior I'm looking for is for a specific task to only occur if any upstream failures occur. The above does this. However if one of the tasks is decorated with
any_failure
, and there are no exceptions raised, the run will return a final state of
FAILED: some reference tasks failed
And vice versa. This seems strange, but maybe I'm not doing it correctly.
c
Indeed, a flow runs execution state if there is no return is the result of the sum of its upstream tasks
• If an exception is raised directly in the flow function, the flow run is marked as
FAILED
. • If the flow does not return a value (or returns
None
), its state is determined by the states of all of the tasks and subflows within it. If any task run or subflow run failed, then the final flow run state is marked as
FAILED
. • If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state. • If the flow run returns any other object, then it is marked as successfully completed.
Can you maybe elaborate more on what you are trying to do?
p
Copy code
@task
def do_something_important():
    return True

@task
def succeed():
    print("Success")

@task(trigger=all_failed)
def fail():
    print("\n--------------------\nUpstream Failure")

@task(trigger=all_finished)
def always_run():
    print("Running regardless of upstream task's state")

with Flow("Trigger example") as flow:
    success = succeed()
    failed_task = fail(upstream_tasks=[do_something_important])
    always_run(upstream_tasks=[do_something_important])

if __name__ == "__main__":
    flow.run()
... [2022-09-19 154409+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed. So, in the above, no exceptions were raised, no errors were encountered. The run is marked as failed.
c
what level logging are you capturing? perhaps we can turn up the logging a bit? It might definitely be an issue in the behavior here, just looking to see if there is anything else we can get from the output
p
I'd like a task to occur ONLY if any issues are encountered, that would be one behaviour I'd like. But I DON'T want the run to fail if there are no issues.
Additionally, I'd like this
any_failure
type task to occur at the end of the run.
c
I understand the ask, I'm just curious if we can get more logging here
p
got it, so how should I turn up the logging?
c
Adding a task / logging handler - https://docs.prefect.io/concepts/logs/ Ideally capturing debug logs . There are separate handlers / contexts for flows and tasks , so one would be needed for the flow, and a separate one for the tasks . My suspicion would be that it's likely to do with the trigger on failed maybe
p
still using v1 - not sure how diff that is. So, essentially:
Copy code
logger = prefect.context.get("logger")
and there's some kind of log level setter somewhere?
Copy code
python3 trigger_fail.py 
[2022-09-19 16:03:43+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Trigger example'
[2022-09-19 16:03:43+0100] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2022-09-19 16:03:43+0100] DEBUG - prefect.FlowRunner | Flow 'Trigger example': Handling state change from Scheduled to Running
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'succeed': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'succeed': Handling state change from Pending to Running
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'succeed': Calling task.run() method...
Success
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'succeed': Handling state change from Running to Success
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'succeed': Finished task run for task with final state: 'Success'
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'do_something_important': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'do_something_important': Handling state change from Pending to Running
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'do_something_important': Calling task.run() method...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'do_something_important': Handling state change from Running to Success
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'do_something_important': Finished task run for task with final state: 'Success'
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'fail': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'fail': TRIGGERFAIL signal raised during execution.
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'fail': Handling state change from Pending to TriggerFailed
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'fail': Finished task run for task with final state: 'TriggerFailed'
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'always_run': Starting task run...
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'always_run': Handling state change from Pending to Running
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'always_run': Calling task.run() method...
Running regardless of upstream task's state
[2022-09-19 16:03:43+0100] DEBUG - prefect.TaskRunner | Task 'always_run': Handling state change from Running to Success
[2022-09-19 16:03:43+0100] INFO - prefect.TaskRunner | Task 'always_run': Finished task run for task with final state: 'Success'
[2022-09-19 16:03:43+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2022-09-19 16:03:43+0100] DEBUG - prefect.FlowRunner | Flow 'Trigger example': Handling state change from Running to Failed
c
Sorry, https://docs-v1.prefect.io/core/idioms/logging.html You can export an environment variable to turn up the logging as well , or set it in the logging config -‘PREFECT__LOGGING__LEVEL=DEBUG’
👍 1
p
So, seemingly the
trigger_fail
decorator is itself triggering a failure, despite there being no actual failure
c
I noted this on the API pages that I think targets the behavior you’re seeing?
Copy code
import random

from prefect.triggers import all_successful, all_failed
from prefect import task, Flow


@task(name="Task A")
def task_a():
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")

@task(name="Task B", trigger=all_successful)
def task_b():
    # do something interesting
    pass

@task(name="Task C", trigger=all_failed)
def task_c():
    # do something interesting
    pass


with Flow("Trigger example") as flow:
    success = task_b(upstream_tasks=[task_a])
    fail = task_c(upstream_tasks=[task_a])

## note that as written, this flow will fail regardless of the path taken
## because *at least one* terminal task will fail;
## to fix this, we want to set Task B as the "reference task" for the Flow
## so that its state uniquely determines the overall Flow state
flow.set_reference_tasks([success])

flow.run()
p
I'm looking at https://docs-v1.prefect.io/api/latest/triggers.html, and they seem to suggest that this is expected behaviour?
Copy code
## note that as written, this flow will fail regardless of the path taken
## because *at least one* terminal task will fail;
## to fix this, we want to set Task B as the "reference task" for the Flow
## so that its state uniquely determines the overall Flow state
flow.set_reference_tasks([success])
Looks like that in the example they're saying that
task_c
(the guy decorated with the
all_failed
decorator will run... yeah
👀 1
So it's saying... if you decorate the func with
all_failed
it will run if there's no failure, and raise an exception??
🤯 I mean... ok. I guess I can write things to conform here. Seems a bit strange. Was this behaviour changed in v2 by any chance?
c
I’m not sure but I can look into it