Is there a way to ask "map" to create batches inst...
# ask-community
m
Is there a way to ask "map" to create batches instead of one task per value? For example if I have 1M events, I feel that seeing 1M tasks in the UI might make things bad. Maybe I am wrong on that From that
Copy code
@task
def A() -> List[int]:
    return [1, 2, 3]

@task
def B(y: int) -> int:
    return y + 100

with Flow('flat map') as f:
    a = A() # [1, 2, 3]
    b = B.map(x=a) # [101, 102, 103]
to
Copy code
@task
def A() -> List[int]:
    return [1, 2, 3]

@task
def B(values: List[int]) -> List[int]:
    return [y + 100 for y in values]

with Flow('flat map') as f:
    a = A() # [1, 2, 3]
    b = B.map(x=a, batches=2) # [101, 102, 103]
z
You could do this with a for loop, submit, and something like
Copy code
def chunked_iterable(iterable: Iterable, size: int):
    """
    Yield chunks of a certain size from an iterable

    Args:
        - iterable (Iterable): An iterable
        - size (int): The size of chunks to return

    Yields:
        tuple: A chunk of the iterable
    """
    it = iter(iterable)
    while True:
        chunk = tuple(itertools.islice(it, size))
        if not chunk:
            break
        yield chunk
I think we’d want a different operator (i.e.
batch
) than
map
if we were to expose something like this directly
h
If your list is an easy-to-fit-in-memory size, here's another simple function to just split it into chunks:
Copy code
def chunkify(xs, size):
    return (xs[pos : pos + size] for pos in range(0, len(xs), size))
(That's what we use to create batches of identifiers that we feed in via
map
) Honestly, though, this had more to do with coming from Airflow; being able to submit the tasks in a for-loop (as Michael proposes) is a really nice, easy-to-read feature of Prefect.