Nadav Nuni
09/14/2021, 8:21 AMunmapped
? (code added in a reply)Nadav Nuni
09/14/2021, 8:28 AM@task
def get_objects_to_work_on() -> List[list]:
objects = get_list_from_external_source() # this return a [Obj, Obj, Obj ....] list of size ~ 10000
chunked_objects = _split_to_sublists(objects, 100) # this returns a [[Obj, Obj Obj, Obj....100], [Obj, Obj Obj, Obj....100]] list of lists
return chunked_objects
@task
def perform_operation_on_chunk(objects: list) -> list:
res = []
for o in objects: # this is expected to be length ~ 100
res.append(do_stuff(o))
return res
with Flow("my-flow") as flow:
chunked = get_objects_to_work_on()
r = perform_operation_on_chunk(chunked)
...
Kevin Kho
perform_operation_on_chunk.map(chunked)
. This worked for me (and is similar yo your intended structure)
@task()
def fetch_data():
data = [[1,1,1], [2,2,2], [3,3,3], [4,4,4]]
return data
@task()
def sum_data(list_x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(sum(list_x))
my_sum = 0
for _ in list_x:
my_sum += _
return my_sum
with Flow("test") as flow:
data = fetch_data()
sum_data.map(data)
flow.run()
Nadav Nuni
09/14/2021, 5:09 PM