Title
m

Matt Delacour

01/24/2023, 9:06 PM
Is there a way to ask "map" to create batches instead of one task per value? For example if I have 1M events, I feel that seeing 1M tasks in the UI might make things bad. Maybe I am wrong on that From that
``````@task
def A() -> List[int]:
return [1, 2, 3]

def B(y: int) -> int:
return y + 100

with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a) # [101, 102, 103]``````
to
``````@task
def A() -> List[int]:
return [1, 2, 3]

def B(values: List[int]) -> List[int]:
return [y + 100 for y in values]

with Flow('flat map') as f:
a = A() # [1, 2, 3]
b = B.map(x=a, batches=2) # [101, 102, 103]``````
z

Zanie

01/24/2023, 9:27 PM
You could do this with a for loop, submit, and something like
``````def chunked_iterable(iterable: Iterable, size: int):
"""
Yield chunks of a certain size from an iterable

Args:
- iterable (Iterable): An iterable
- size (int): The size of chunks to return

Yields:
tuple: A chunk of the iterable
"""
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, size))
if not chunk:
break
yield chunk``````
I think we’d want a different operator (i.e.
``batch``
) than
``map``
if we were to expose something like this directly
h

Hans Lellelid

01/25/2023, 1:34 AM
If your list is an easy-to-fit-in-memory size, here's another simple function to just split it into chunks:
``````def chunkify(xs, size):
return (xs[pos : pos + size] for pos in range(0, len(xs), size))``````
(That's what we use to create batches of identifiers that we feed in via
``map``
) Honestly, though, this had more to do with coming from Airflow; being able to submit the tasks in a for-loop (as Michael proposes) is a really nice, easy-to-read feature of Prefect.