(Somewhat related to my question above, but I’ll k...
# ask-community
d
(Somewhat related to my question above, but I’ll keep it separate) If a mapped task raises
prefect.engine.signals.SKIP
, does it need to return a
SKIP
as its result? Can I configure the (mapped or mapper) task such that it only returns a result (say, a string) if the task did not skip? Or [does/should] it have to return a
SKIP
?
k
A bit unclear on the question because
raise SKIP
is a return so if you do:
Copy code
@task
def abc(x):
    if x == 2:
        raise SKIP
    else:
        return x + 1
this will SKIP if a condition is met. Otherwise it will return a value. Does that answer your question (I think not)?
d
Hey @Kevin Kho — I think I mean the following outcome:
Copy code
from prefect import Parameter
from prefect import Flow
from prefect import mapped
from prefect import task
from prefect.engine.signals import SKIP
from prefect.engine.state import State


@task()
def skipper(x: int) -> int:
    if x == 2:
        raise SKIP
    return x + 1


with Flow(name="test") as flow:
    xs = Parameter(name="xs", default=[1, 2, 3])
    ys = skipper(x=mapped(xs))
    zs = skipper(x=mapped(ys))

flow_state: State = flow.run()

print(flow_state.result[ys].result)
# [2, SKIP(None), 4]

print(flow_state.result[zs].result)
# [SKIP(None), None, 5]
Even if you raise
SKIP
you’re still getting a returned object, right? I would expect that raising a
SKIP
would exit without actually returning anything? (Edit: I could see it being helpful if you want to always know that N inputs gives you N outputs. In that case we can just check (in a downstream task) whether or not some input
isinstance(x, SKIP)
and go from there. But just wondering if that’s by design.)
k
Ah you do because it needs to get submitted to the next task in general. If you return nothing, then the SKIP signal won’t be able to propagate. If you need to though, you can use the
FilterTask
to remove these states. That will collect all the states though and prevent depth first execution.
upvote 1
It is by design of the task without Mapping. But for mapping specifically over Dask, depth first execution is preferred so the 2 stages here are grouped and submitted together to Dask. If you have an intermediate stage to remove the SKIPs, you force a collection of all of the results, making it breadth first.
And then this is also help for the N inputs to N outputs because mapping guarantees order of the List.
d
Gotcha! Thanks again @Kevin Kho! 🙌
k
I think in general, you need something different from
None
because
None
can be a valid input in some cases
1