Philip MacMenamin
11/22/2022, 4:50 PM@task(name="Task A")
def task_a(x: int) -> int:
if x == 2:
raise signals.FAIL
return x + 2
@task(name="Task B")
def task_b(x: int) -> int:
return x + 2
@task(name="check")
def checker(x: int) -> None:
if anything_upstream_broke():
this_didn't_work(ID)
else:
this_was_ok(x)
with Flow("x example") as flow:
l = [2,5,7]
la = task_a.map(x=l)
lb = task_b.map(x=la)
lc = checker.map(x=lb)
That is, I have a list of things I want to run through the WF, and I map
these. Sometimes some of the elements in the list won't run properly. I'd like a way to look through all of the upstream tasks and check if any failed and do a thing for that specific input.from prefect.engine import signals
from prefect.exceptions import PrefectSignal
from prefect.triggers import all_successful, all_failed, always_run
from prefect import task, Flow
l = [2,5,7]
@task(name="Task A")
def task_a(x: int) -> int:
if x == 2:
raise signals.FAIL
return x + 2
@task(name="Task B")
def task_b(x: int) -> int:
return x + 2
@task(name="check", trigger=always_run)
def checker(x: int=None) -> None:
if isinstance(x, PrefectSignal):
print("THINGS ARE NOT OK")
else:
print("THINGS ARE OK")
with Flow("x example") as flow:
la = task_a.map(x=l)
lb = task_b.map(x=la)
lc = checker.map(x=lb)
flow.run()
this kind of does what I'm talking about I think, in case it's useful.
force the task to always run, and then assume if something upstream broke it's going to give you a PrefectSignal rather than what you were expecting. I'm not sure how to look up the originating input ATM.Mason Menges
11/23/2022, 8:46 PMPhilip MacMenamin
11/23/2022, 8:55 PM