Whats the best way to have complex trigger logic? ...
# prefect-server
m
Whats the best way to have complex trigger logic? For example, I have 3 upstream tasks. I want the target task to run if: • upstream A. is a mapped task. it must have at least 1 successful child task • upstream B is success. • upstream C can be state.
k
I think this is very hard to accomplish. I was trying to make a custom trigger to distinguish but you don’t have access to which state came from which task at that point. My personal suggestion is to create an intermediate task with trigger
always_run
that takes these inputs and handles the
True/False
logic or raises SKIP for the downstream task. The trigger is just lacking as an interface for this kind of complex logic I think.
m
Cool. Let me try that.
@Kevin Kho how do I access the inputted tasks' states, from within this trigger function?
k
Let me make an example one sec
Oops sorry I got caught up in stuff. Working on this.
I think this script should give ideas
Copy code
from prefect import Flow, task
from prefect.triggers import always_run
import prefect
from prefect.engine.signals import FAIL, SKIP
from prefect.exceptions import PrefectSignal

@task
def mapped_task(x):
    if x == 2:
        raise ValueError()
    else:
        return x + 1

@task
def other_task():
    return "succeeded"

@task
def state_upstream():
    raise ValueError("error")

@task(trigger=always_run)
def trigger_task(up_a, up_b, up_c):
    <http://prefect.context.logger.info|prefect.context.logger.info>(up_a)
    <http://prefect.context.logger.info|prefect.context.logger.info>(up_b)
    <http://prefect.context.logger.info|prefect.context.logger.info>(type(up_c))
    # get none error values
    cond_a = (len([x for x in up_a if not isinstance(x, BaseException)]) > 0)
    cond_b = isinstance(up_b, str)
    cond_c = isinstance(up_c, BaseException)
    if cond_a or cond_b or cond_c:
        return True
    else:
        raise SKIP

@task
def final_downstream(input):
    return input

with Flow('...') as flow:
    task_a = mapped_task.map([1,2,3])
    task_b = other_task()
    task_c = state_upstream()
    run_next = trigger_task(task_a, task_b, task_c)
    final_downstream(run_next)

flow.run()
m
cool. let me try this