Hey y’all, What’s the best way to batch items aft...
# ask-community
w
Hey y’all, What’s the best way to batch items after running a map. So, as an example, let’s say I’m processing rows from CSV in parallel and writing them into a DB. Writing each in a separate map is quite in efficient. I can reduce them with a into batches of 10000K items. But then the reducer task becomes a single source of bottle neck. Is there any other efficient approaches to batch to items together?
k
Hi @Walee, you need a subtask like this:
Copy code
import prefect
from prefect import task, Flow
import logging

@task
def a():
    return [1,2,3,4,5,6,7,8,9,10]

@task
def b(x):
    return x + 1

@task
def c(list_x):
    return [list_x[i:i+2] for i in range(0, len(list_x), 2)]

@task
def d(list_x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(sum(list_x))
    return sum(list_x)

with Flow(" ") as flow:
    x = a()
    y = b.map(x)
    y_batch = c(y)
    z = d.map(y_batch)
flow.run()
c is my task that does the batching
w
thanks @Kevin Kho. I’m pretty much doing the same thing