Marc Lipoff
04/20/2022, 4:19 PMKevin Kho
04/20/2022, 4:25 PMalways_run
that takes these inputs and handles the True/False
logic or raises SKIP for the downstream task. The trigger is just lacking as an interface for this kind of complex logic I think.Marc Lipoff
04/20/2022, 4:26 PMKevin Kho
04/20/2022, 4:29 PMfrom prefect import Flow, task
from prefect.triggers import always_run
import prefect
from prefect.engine.signals import FAIL, SKIP
from prefect.exceptions import PrefectSignal
@task
def mapped_task(x):
if x == 2:
raise ValueError()
else:
return x + 1
@task
def other_task():
return "succeeded"
@task
def state_upstream():
raise ValueError("error")
@task(trigger=always_run)
def trigger_task(up_a, up_b, up_c):
<http://prefect.context.logger.info|prefect.context.logger.info>(up_a)
<http://prefect.context.logger.info|prefect.context.logger.info>(up_b)
<http://prefect.context.logger.info|prefect.context.logger.info>(type(up_c))
# get none error values
cond_a = (len([x for x in up_a if not isinstance(x, BaseException)]) > 0)
cond_b = isinstance(up_b, str)
cond_c = isinstance(up_c, BaseException)
if cond_a or cond_b or cond_c:
return True
else:
raise SKIP
@task
def final_downstream(input):
return input
with Flow('...') as flow:
task_a = mapped_task.map([1,2,3])
task_b = other_task()
task_c = state_upstream()
run_next = trigger_task(task_a, task_b, task_c)
final_downstream(run_next)
flow.run()
Marc Lipoff
04/20/2022, 5:52 PM