William Smith
08/28/2020, 10:16 AMMatt Wong-Kemp
08/28/2020, 10:18 AMreference_tasks
, which determines the state of the flow as a whole, would then be Skipped
? when in principle it's failed.William Smith
08/28/2020, 10:32 AMflow.set_reference_tasks([failedTask, skippedTask, skippedTask])
Chris White
08/28/2020, 3:45 PMSkipped
state on the downstream tasksWilliam Smith
08/28/2020, 3:48 PMChris White
08/28/2020, 3:52 PMfrom prefect import signals
def my_skipped_trigger(upstream_states: Dict[Edge, State]):
if any(s.is_failed() for s in upstream_states.values()):
raise signals.SKIP("Some upstreams failed")
@task(trigger=my_skipped_trigger)
…
William Smith
08/28/2020, 3:53 PMChris White
08/28/2020, 3:53 PMMarvin
08/28/2020, 3:54 PMWilliam Smith
08/28/2020, 4:01 PMdef my_skipped_trigger(upstream_states: Dict[Edge, State]):
TypeError: 'SignatureValidator' object is not subscriptable
Chris White
08/28/2020, 4:01 PMWilliam Smith
08/28/2020, 4:03 PM@task(name = "Unhappy Path", result=LocalResult(dir='~/Desktop/Credit/results'), trigger = my_skipped_trigger)
def netting_data_failed():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Caught failed flow!")
def my_skipped_trigger(upstream_states: Dict[Edge, State]):
if any(s.is_failed() for s in upstream_states.values()):
raise signals.SKIP("Some upstreams failed")
from prefect.tasks.core.collections import Dict
from prefect.engine.state import State
from prefect.core.edge import Edge
Chris White
08/28/2020, 4:05 PMDict
should be imported from `typing`: `from typing import Dict`; the type annotation is not necessary, I just provided it so you knew what the argument task type wasWilliam Smith
08/28/2020, 4:05 PM