https://prefect.io logo
Title
p

Philip MacMenamin

11/22/2022, 4:50 PM
Hello, I'm trying to implement the following kind of logic within a Prefect1 flow.
@task(name="Task A")
def task_a(x: int) -> int:
    if x == 2:
        raise signals.FAIL
    return x + 2

@task(name="Task B")
def task_b(x: int) -> int:
    return x + 2

@task(name="check")
def checker(x: int) -> None:
    if anything_upstream_broke():
        this_didn't_work(ID)
    else:
        this_was_ok(x)


with Flow("x example") as flow:
    l = [2,5,7]
    la = task_a.map(x=l)
    lb = task_b.map(x=la)
    lc = checker.map(x=lb)
That is, I have a list of things I want to run through the WF, and I
map
these. Sometimes some of the elements in the list won't run properly. I'd like a way to look through all of the upstream tasks and check if any failed and do a thing for that specific input.
1️⃣ 2
Any ideas? Is my question unclear maybe? I'd like to have some kind of terminal task that would look upstream along the mapped parents to see if any errors occurred. In the above example I'd like to have for example some print statements that said: There WAS a problem with input: 2 There was no a problem with input: 5 There was no a problem with input: 7
Surely there's a way to do something like this? No? The Schematic tab in the GUI provides information about which mapped runs were skiped because of problems upstream.
from prefect.engine import signals
from prefect.exceptions import PrefectSignal
from prefect.triggers import all_successful, all_failed, always_run
from prefect import task, Flow


l = [2,5,7]
@task(name="Task A")
def task_a(x: int) -> int:
    if x == 2:
        raise signals.FAIL
    return x + 2

@task(name="Task B")
def task_b(x: int) -> int:
    return x + 2

@task(name="check", trigger=always_run)
def checker(x: int=None) -> None:
    if isinstance(x, PrefectSignal):
        print("THINGS ARE NOT OK")
    else:
        print("THINGS ARE OK")


with Flow("x example") as flow:
    la = task_a.map(x=l)
    lb = task_b.map(x=la)
    lc = checker.map(x=lb)
flow.run()
this kind of does what I'm talking about I think, in case it's useful. force the task to always run, and then assume if something upstream broke it's going to give you a PrefectSignal rather than what you were expecting. I'm not sure how to look up the originating input ATM.
m

Mason Menges

11/23/2022, 8:46 PM
Hey @Philip MacMenamin Triggers are definitely what I'd suggest to accomplish your use case, you may also consider configuring results https://docs-v1.prefect.io/core/concepts/results.html to reference the output of an upstream task which could help in identifying the inputs for your downstream tasks, you can query for this from the graphql client.
p

Philip MacMenamin

11/23/2022, 8:55 PM
Thanks Mason, I'll have a look