Jerry Thomas
08/23/2019, 10:22 AMdef skipped(item):
return isinstance(item, prefect.engine.signals.SKIP) or isinstance(item, prefect.engine.result.NoResultType)
@task
def add(x,y)
try:
return x / y
except:
raise prefect.engine.signals.SKIP
with Flow("filter-skipped") as flow:
res = add.map([10, 2, 4], [1, 0, 3])
# res = FilterTask(res) #
Chris White
FilterTask
. In the meantime, here is how it can be used:
with Flow("filter-skipped") as flow:
res = add.map([10, 2, 4], [1, 0, 3])
filtered_res = FilterTask()(task_results=res)
Note that Filtertasks need to first be initialized and then called (which is slightly different from @task
tasks, which are already initialized); more info on this subtlety can be found here: https://docs.prefect.io/guide/tutorials/task-guide.html