Why is it when a task is set to only trigger if it...
# prefect-community
w
Why is it when a task is set to only trigger if it's upstream fails the said task is then marked as failed? The task was never executed, shouldn't it be marked as skipped?
I suppose the same is try for all_successful trigger, as in if the upstream failed the downstream tasks are marked as failure, but again they didn't actually run
so would have thought they would be marked as skipped
m
I suspect this is because
reference_tasks
, which determines the state of the flow as a whole, would then be
Skipped
? when in principle it's failed.
w
I believe if you have 1 task as failed and the others skipped/successful the whole flow would be marked as failed. If not you could just add it to the reference tasks array?
flow.set_reference_tasks([failedTask, skippedTask, skippedTask])
for example
I dunno, just seems odd to me that a task is marked as failure when it didn't even execute
c
@William Smith Matt is correct - this is to ensure the final flow state reflects the presence of failure; if you want the behavior you describe, simply use a different trigger on your tasks so that Failure of an upstream task is converted to a
Skipped
state on the downstream tasks
w
Sorry, not sure I follow, how would you convert a state to "Skipped"?
c
You can use a signal within the trigger like so:
Copy code
from prefect import signals

def my_skipped_trigger(upstream_states: Dict[Edge, State]):
    if any(s.is_failed() for s in upstream_states.values()):
        raise signals.SKIP("Some upstreams failed")

@task(trigger=my_skipped_trigger)
…
w
Nice, wasn't aware you could do this
Thanks!
c
You’re welcome!
@Marvin archive “How to make my tasks skip instead of triggerfail anytime an upstream task fails”
w
I actually get an error when trying to run this :
def my_skipped_trigger(upstream_states: Dict[Edge, State]):
TypeError: 'SignatureValidator' object is not subscriptable
c
can you share the full code you wrote?
w
Here is my function trying to use trigger:
@task(name = "Unhappy Path", result=LocalResult(dir='~/Desktop/Credit/results'), trigger = my_skipped_trigger)
def netting_data_failed():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Caught failed flow!")
The skipped trigger:
def my_skipped_trigger(upstream_states: Dict[Edge, State]):
if any(s.is_failed() for s in upstream_states.values()):
raise signals.SKIP("Some upstreams failed")
Had to import, Dict, Edge and State
from prefect.tasks.core.collections import Dict
from prefect.engine.state import State
from  prefect.core.edge import Edge
There are other tasks etc in my code but this is just what I've added
c
OH, I see - that
Dict
should be imported from `typing`: `from typing import Dict`; the type annotation is not necessary, I just provided it so you knew what the argument task type was
w
and now it doesn't run
Ohhh