https://prefect.io logo
Title
w

William Smith

08/28/2020, 10:16 AM
Why is it when a task is set to only trigger if it's upstream fails the said task is then marked as failed? The task was never executed, shouldn't it be marked as skipped?
I suppose the same is try for all_successful trigger, as in if the upstream failed the downstream tasks are marked as failure, but again they didn't actually run
so would have thought they would be marked as skipped
m

Matt Wong-Kemp

08/28/2020, 10:18 AM
I suspect this is because
reference_tasks
, which determines the state of the flow as a whole, would then be
Skipped
? when in principle it's failed.
w

William Smith

08/28/2020, 10:32 AM
I believe if you have 1 task as failed and the others skipped/successful the whole flow would be marked as failed. If not you could just add it to the reference tasks array?
flow.set_reference_tasks([failedTask, skippedTask, skippedTask])
for example
I dunno, just seems odd to me that a task is marked as failure when it didn't even execute
c

Chris White

08/28/2020, 3:45 PM
@William Smith Matt is correct - this is to ensure the final flow state reflects the presence of failure; if you want the behavior you describe, simply use a different trigger on your tasks so that Failure of an upstream task is converted to a
Skipped
state on the downstream tasks
w

William Smith

08/28/2020, 3:48 PM
Sorry, not sure I follow, how would you convert a state to "Skipped"?
c

Chris White

08/28/2020, 3:52 PM
You can use a signal within the trigger like so:
from prefect import signals

def my_skipped_trigger(upstream_states: Dict[Edge, State]):
    if any(s.is_failed() for s in upstream_states.values()):
        raise signals.SKIP("Some upstreams failed")

@task(trigger=my_skipped_trigger)
…
w

William Smith

08/28/2020, 3:53 PM
Nice, wasn't aware you could do this
Thanks!
c

Chris White

08/28/2020, 3:53 PM
You’re welcome!
@Marvin archive “How to make my tasks skip instead of triggerfail anytime an upstream task fails”
w

William Smith

08/28/2020, 4:01 PM
I actually get an error when trying to run this :
def my_skipped_trigger(upstream_states: Dict[Edge, State]):
TypeError: 'SignatureValidator' object is not subscriptable
c

Chris White

08/28/2020, 4:01 PM
can you share the full code you wrote?
w

William Smith

08/28/2020, 4:03 PM
Here is my function trying to use trigger:
@task(name = "Unhappy Path", result=LocalResult(dir='~/Desktop/Credit/results'), trigger = my_skipped_trigger)
def netting_data_failed():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Caught failed flow!")
The skipped trigger:
def my_skipped_trigger(upstream_states: Dict[Edge, State]):
if any(s.is_failed() for s in upstream_states.values()):
raise signals.SKIP("Some upstreams failed")
Had to import, Dict, Edge and State
from prefect.tasks.core.collections import Dict
from prefect.engine.state import State
from  prefect.core.edge import Edge
There are other tasks etc in my code but this is just what I've added
c

Chris White

08/28/2020, 4:05 PM
OH, I see - that
Dict
should be imported from `typing`: `from typing import Dict`; the type annotation is not necessary, I just provided it so you knew what the argument task type was
w

William Smith

08/28/2020, 4:05 PM
and now it doesn't run
Ohhh