Hi there, try to get a flow that looks like this t...
# ask-community
r
Hi there, try to get a flow that looks like this to run, currently
test_reduce
gets skipped but I would like for it to receive
[1, 2, 4, 5]
Copy code
@task(trigger=not_all_skipped)
def test_reduce(test):
    return test


@task
def to_map():
    return [1, 2, 3, 4, 5]


@task
def test_map(x):
    print(x)
    if x == 3:
        raise SKIP


with prefect.Flow("Test") as flow:
    l = to_map()
    m = test_map.map(l)
    test_reduce(m)
k
Hey @Robert Hales, let me try to get this working. I suspect you need an intermediate step after
test_map
using a
FilterTask
to remove the SKIP
r
Thank you, thought there might be something for it but couldnt find it!
k
Copy code
@task
def to_map():
    return [1, 2, 3, 4, 5]

@task
def test_map(x):
    print(x)
    if x == 3:
        raise SKIP
    return x

@task()
def test_reduce(test):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(test)
    return test

fil = FilterTask(lambda x: not isinstance(x, BaseException))

with prefect.Flow("Test") as flow:
    l = to_map()
    m = test_map.map(l)
    n = fil(m)
    test_reduce(n)

flow.run()
I think that trigger is more multiple upstream tasks whereas the mapped task is just treated as one. We can filter out the exceptions for the reduce step.
r
Cheers, just got it working my end too! Many thanks for your super quick and helpful answer as always 😄
👍 1