Matt Delacour
01/24/2023, 9:06 PM@task
def A() -> List[int]:
return [1, 2, 3]
@task
def B(y: int) -> int:
return y + 100
with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a) # [101, 102, 103]
to
@task
def A() -> List[int]:
return [1, 2, 3]
@task
def B(values: List[int]) -> List[int]:
return [y + 100 for y in values]
with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a, batches=2) # [101, 102, 103]Zanie
def chunked_iterable(iterable: Iterable, size: int):
"""
Yield chunks of a certain size from an iterable
Args:
- iterable (Iterable): An iterable
- size (int): The size of chunks to return
Yields:
tuple: A chunk of the iterable
"""
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, size))
if not chunk:
break
yield chunkZanie
batch ) than map if we were to expose something like this directlyHans Lellelid
01/25/2023, 1:34 AMdef chunkify(xs, size):
return (xs[pos : pos + size] for pos in range(0, len(xs), size))
(That's what we use to create batches of identifiers that we feed in via map)
Honestly, though, this had more to do with coming from Airflow; being able to submit the tasks in a for-loop (as Michael proposes) is a really nice, easy-to-read feature of Prefect.