Chris O'Brien
11/21/2019, 11:58 PMfrom prefect import task, Flow
from prefect.triggers import any_failed, some_failed
from prefect.tasks.control_flow.conditional import ifelse, merge, switch
@task
def three_outcomes():
    return "dead_branch"
@task
def fail_branch():
    print("i fail")
@task
def pass_branch():
    print("i pass")
@task
def dead_branch():
    print("im dead")
@task
def do_final_thing():
    print("final")
with Flow("example") as flow:
    switch(three_outcomes, dict(dead_branch=dead_branch, pass_branch=pass_branch, fail_branch=fail_branch))
    do_final_thing.set_upstream(merge(pass_branch, fail_branch))
flow_state = flow.run()
flow.visualize(flow_state=flow_state)nicholas
skip_on_upstream_skip=Falsefrom prefect import signals
def merge_trigger(upstream_states):
    if all(state.is_skipped() for state in upstream_states):
        raise signals.SKIP("All upstreams skipped")
    elif not all(state.is_successful() for state in upstream_states):
        raise signals.TRIGGERFAIL(
            'Trigger was "all_successful" but some of the upstream tasks failed.'
        )
    return TrueChris O'Brien
11/22/2019, 3:00 AMcustom_mergedef merge_trigger(upstream_states):
    if all(state.is_skipped() for state in upstream_states):
        raise signals.SKIP("All upstreams skipped")
    elif not all(state.is_successful() for state in upstream_states):
        raise signals.TRIGGERFAIL(
            'Trigger was "all_successful" but some of the upstream tasks failed.'
        )
    return True
class Merge(Task):
    def __init__(self, **kwargs) -> None:
        if kwargs.setdefault("skip_on_upstream_skip", False):
            raise ValueError("Merge tasks must have `skip_on_upstream_skip=False`.")
        super().__init__(**kwargs)
    def run(self, **task_results: Any) -> Any:
        return next(
            (
                v
                for k, v in sorted(task_results.items())
                if not isinstance(v, NoResultType)
            ),
            None,
        )
def custom_merge(*tasks: Task) -> Task:
    return Merge(trigger=merge_trigger).bind(**{"task_{}".format(i + 1): t for i, t in enumerate(tasks)})