Hey y’all,
What’s the best way to batch items after running a map.
So, as an example, let’s say I’m processing rows from CSV in parallel and writing them into a DB. Writing each in a separate map is quite in efficient. I can reduce them with a into batches of 10000K items. But then the reducer task becomes a single source of bottle neck.
Is there any other efficient approaches to batch to items together?
k
Kevin Kho
05/19/2021, 5:48 PM
Hi @Walee, you need a subtask like this:
Copy code
import prefect
from prefect import task, Flow
import logging
@task
def a():
return [1,2,3,4,5,6,7,8,9,10]
@task
def b(x):
return x + 1
@task
def c(list_x):
return [list_x[i:i+2] for i in range(0, len(list_x), 2)]
@task
def d(list_x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(sum(list_x))
return sum(list_x)
with Flow(" ") as flow:
x = a()
y = b.map(x)
y_batch = c(y)
z = d.map(y_batch)
flow.run()
Kevin Kho
05/19/2021, 5:48 PM
c is my task that does the batching
w
Walee
05/19/2021, 6:19 PM
thanks @Kevin Kho. I’m pretty much doing the same thing
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.