hey people, I’m having some issued with a specific...
# ask-community
n
hey people, I’m having some issued with a specific use case: I have a large amount (e.g 10000) of objects which I want to perform am operation on, in separate tasks each has overhead of loading external data, starting a pod, etc… so…. I want to perform this operation in chunks (e.g, 100 objects per task). The problem is that passing a mapped array of arrays doesn’t really work for me, and I get this (attached) is there a way to mark the inner lists as
unmapped
? (code added in a reply)
Copy code
@task
def get_objects_to_work_on() -> List[list]:
    objects = get_list_from_external_source() # this return a [Obj, Obj, Obj ....] list of size ~ 10000
    chunked_objects = _split_to_sublists(objects, 100) # this returns a [[Obj, Obj Obj, Obj....100], [Obj, Obj Obj, Obj....100]] list of lists
    return chunked_objects

@task
def perform_operation_on_chunk(objects: list) -> list:
    res = []
    for o in objects: # this is expected to be length ~ 100
        res.append(do_stuff(o))
        
    return res
        

with Flow("my-flow") as flow:
    chunked = get_objects_to_work_on()
    r = perform_operation_on_chunk(chunked)
    ...
k
Hey @Nadav Nuni, this setup is possible. Wondering if it would be
perform_operation_on_chunk.map(chunked)
. This worked for me (and is similar yo your intended structure)
Copy code
@task()
def fetch_data():
    data = [[1,1,1], [2,2,2], [3,3,3], [4,4,4]]
    return data
    
@task()
def sum_data(list_x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(sum(list_x))
    my_sum = 0
    for _ in list_x:
        my_sum += _
    return my_sum

with Flow("test") as flow:
    data = fetch_data()
    sum_data.map(data)

flow.run()
n
@Kevin Kho thanks for you help! for some reason some of my inputs in the mapped task are received as `None`…I’m still investigating…,