Christopher Schmitt
02/18/2022, 3:09 PMKevin Kho
raise SKIP
on a certain condition and it will propagateChristopher Schmitt
02/18/2022, 3:57 PMraise SKIP
would solve it, though, or not getting it yet at least. I mean it is probably equivalent to the FilterTask, but how would I filter the list of table_names?Kevin Kho
from prefect import Flow, task
from prefect.engine.signals import SKIP
@task
def numbers():
return [1,2]
@task
def letters():
return ["a","b", "c"]
@task
def cross_product(numbers, letters):
res = []
for letter in letters:
for num in numbers:
res.append({"num":num, "letter":letter})
return res
@task(nout=2)
def pull_numbers_and_letters(items):
numbers = [x["num"] for x in items]
letter = [x["letter"] for x in items]
return numbers, letter
@task
def pull_numbers():
return ["a","b", "c"]
@task
def step1(item):
if item["num"] == 1:
raise SKIP("Skipping based on condition")
return item
@task
def step2(item):
return item
@task
def step3(item, number, letter):
assert number == item["num"]
assert letter == item["letter"]
return item
with Flow('...') as flow:
n = numbers()
l = letters()
map_items = cross_product(n,l)
mapped_numbers, mapped_letters = pull_numbers_and_letters(map_items)
first = step1.map(map_items)
second = step2.map(first)
third = step3.map(second, mapped_numbers, mapped_letters)
flow.run()
SKIP
instead of Filter
, the lists remain a fixed length throughout and my assert statements in step3
workChristopher Schmitt
02/18/2022, 4:31 PM