w

    Walee

    1 year ago
    Hey y’all, What’s the best way to batch items after running a map. So, as an example, let’s say I’m processing rows from CSV in parallel and writing them into a DB. Writing each in a separate map is quite in efficient. I can reduce them with a into batches of 10000K items. But then the reducer task becomes a single source of bottle neck. Is there any other efficient approaches to batch to items together?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Walee, you need a subtask like this:
    import prefect
    from prefect import task, Flow
    import logging
    
    @task
    def a():
        return [1,2,3,4,5,6,7,8,9,10]
    
    @task
    def b(x):
        return x + 1
    
    @task
    def c(list_x):
        return [list_x[i:i+2] for i in range(0, len(list_x), 2)]
    
    @task
    def d(list_x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(sum(list_x))
        return sum(list_x)
    
    with Flow(" ") as flow:
        x = a()
        y = b.map(x)
        y_batch = c(y)
        z = d.map(y_batch)
    flow.run()
    c is my task that does the batching
    w

    Walee

    1 year ago
    thanks @Kevin Kho. I’m pretty much doing the same thing