m

Marc Lipoff

04/20/2022, 4:19 PM
Whats the best way to have complex trigger logic? For example, I have 3 upstream tasks. I want the target task to run if: • upstream A. is a mapped task. it must have at least 1 successful child task • upstream B is success. • upstream C can be state.
k

Kevin Kho

04/20/2022, 4:25 PM
I think this is very hard to accomplish. I was trying to make a custom trigger to distinguish but you don’t have access to which state came from which task at that point. My personal suggestion is to create an intermediate task with trigger
``always_run``
that takes these inputs and handles the
``True/False``
logic or raises SKIP for the downstream task. The trigger is just lacking as an interface for this kind of complex logic I think.
m

Marc Lipoff

04/20/2022, 4:26 PM
Cool. Let me try that.
@Kevin Kho how do I access the inputted tasks' states, from within this trigger function?
k

Kevin Kho

04/20/2022, 4:29 PM
Let me make an example one sec
Oops sorry I got caught up in stuff. Working on this.
I think this script should give ideas
Copy code
``````from prefect import Flow, task
from prefect.triggers import always_run
import prefect
from prefect.engine.signals import FAIL, SKIP
from prefect.exceptions import PrefectSignal

if x == 2:
raise ValueError()
else:
return x + 1

return "succeeded"

def state_upstream():
raise ValueError("error")

<http://prefect.context.logger.info|prefect.context.logger.info>(up_a)
<http://prefect.context.logger.info|prefect.context.logger.info>(up_b)
<http://prefect.context.logger.info|prefect.context.logger.info>(type(up_c))
# get none error values
cond_a = (len([x for x in up_a if not isinstance(x, BaseException)]) > 0)
cond_b = isinstance(up_b, str)
cond_c = isinstance(up_c, BaseException)
if cond_a or cond_b or cond_c:
return True
else:
raise SKIP

def final_downstream(input):
return input

with Flow('...') as flow: