Matt Delacour
01/24/2023, 9:06 PM@task
def A() -> List[int]:
return [1, 2, 3]
@task
def B(y: int) -> int:
return y + 100
with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a) # [101, 102, 103]
to
@task
def A() -> List[int]:
return [1, 2, 3]
@task
def B(values: List[int]) -> List[int]:
return [y + 100 for y in values]
with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a, batches=2) # [101, 102, 103]
Zanie
01/24/2023, 9:27 PMdef chunked_iterable(iterable: Iterable, size: int):
"""
Yield chunks of a certain size from an iterable
Args:
- iterable (Iterable): An iterable
- size (int): The size of chunks to return
Yields:
tuple: A chunk of the iterable
"""
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, size))
if not chunk:
break
yield chunk
batch
) than map
if we were to expose something like this directlyHans Lellelid
01/25/2023, 1:34 AMdef chunkify(xs, size):
return (xs[pos : pos + size] for pos in range(0, len(xs), size))
(That's what we use to create batches of identifiers that we feed in via map
)
Honestly, though, this had more to do with coming from Airflow; being able to submit the tasks in a for-loop (as Michael proposes) is a really nice, easy-to-read feature of Prefect.