Hi there, try to get a flow that looks like this to run, currently
test_reduce
gets skipped but I would like for it to receive
[1, 2, 4, 5]
Robert Hales
07/30/2021, 4:18 PM
Copy code
@task(trigger=not_all_skipped)
def test_reduce(test):
return test
@task
def to_map():
return [1, 2, 3, 4, 5]
@task
def test_map(x):
print(x)
if x == 3:
raise SKIP
with prefect.Flow("Test") as flow:
l = to_map()
m = test_map.map(l)
test_reduce(m)
k
Kevin Kho
07/30/2021, 4:22 PM
Hey @Robert Hales, let me try to get this working. I suspect you need an intermediate step after
test_map
using a
FilterTask
to remove the SKIP
r
Robert Hales
07/30/2021, 4:23 PM
Thank you, thought there might be something for it but couldnt find it!
k
Kevin Kho
07/30/2021, 4:27 PM
Copy code
@task
def to_map():
return [1, 2, 3, 4, 5]
@task
def test_map(x):
print(x)
if x == 3:
raise SKIP
return x
@task()
def test_reduce(test):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(test)
return test
fil = FilterTask(lambda x: not isinstance(x, BaseException))
with prefect.Flow("Test") as flow:
l = to_map()
m = test_map.map(l)
n = fil(m)
test_reduce(n)
flow.run()
Kevin Kho
07/30/2021, 4:28 PM
I think that trigger is more multiple upstream tasks whereas the mapped task is just treated as one. We can filter out the exceptions for the reduce step.
r
Robert Hales
07/30/2021, 4:32 PM
Cheers, just got it working my end too! Many thanks for your super quick and helpful answer as always 😄
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.