https://prefect.io logo
Title
m

Matt Delacour

01/24/2023, 9:06 PM
Is there a way to ask "map" to create batches instead of one task per value? For example if I have 1M events, I feel that seeing 1M tasks in the UI might make things bad. Maybe I am wrong on that From that
@task
def A() -> List[int]:
    return [1, 2, 3]

@task
def B(y: int) -> int:
    return y + 100

with Flow('flat map') as f:
    a = A() # [1, 2, 3]
    b = B.map(x=a) # [101, 102, 103]
to
@task
def A() -> List[int]:
    return [1, 2, 3]

@task
def B(values: List[int]) -> List[int]:
    return [y + 100 for y in values]

with Flow('flat map') as f:
    a = A() # [1, 2, 3]
    b = B.map(x=a, batches=2) # [101, 102, 103]
z

Zanie

01/24/2023, 9:27 PM
You could do this with a for loop, submit, and something like
def chunked_iterable(iterable: Iterable, size: int):
    """
    Yield chunks of a certain size from an iterable

    Args:
        - iterable (Iterable): An iterable
        - size (int): The size of chunks to return

    Yields:
        tuple: A chunk of the iterable
    """
    it = iter(iterable)
    while True:
        chunk = tuple(itertools.islice(it, size))
        if not chunk:
            break
        yield chunk
I think we’d want a different operator (i.e.
batch
) than
map
if we were to expose something like this directly
h

Hans Lellelid

01/25/2023, 1:34 AM
If your list is an easy-to-fit-in-memory size, here's another simple function to just split it into chunks:
def chunkify(xs, size):
    return (xs[pos : pos + size] for pos in range(0, len(xs), size))
(That's what we use to create batches of identifiers that we feed in via
map
) Honestly, though, this had more to do with coming from Airflow; being able to submit the tasks in a for-loop (as Michael proposes) is a really nice, easy-to-read feature of Prefect.