Hey :raising_hand: is it somehow possible to do so...
# prefect-community
t
Hey 🙋 is it somehow possible to do something in between a map and a reduce? 🤔 e.g. - let’s say i have the following pattern: • a huge input CSV (let’s say 200K rows) • that i then split into a ton of tiny CSVs (let’s say 1000 rows each) • then some work is done on each one, and a new output CSV is generated per each tiny CSV • and afterwards i wanna combine the outputs - but in “slightly bigger batches”, i.e. - let’s say in groups of 5 or groups of 10 - and each one of those to turn into a “medium sized” CSV (with 5K or 10K rows) which is then uploaded to S3 obviously, i don’t want the last step (4) that combines them to wait for ALL the tiny CSVs to be processed (so i don’t want it to just be a simple reduce), just enough (e.g. 5, or 10) that could be grouped and uploaded does that make any sense?
âś… 1
a
hmm this looks also like a question that may be better answered by https://dask.discourse.group/ or https://discuss.ray.io/ community for Prefect my answer would be: how would you do it in Python? if you figure that out, you can use Prefect to orchestrate that work
t
in python i would maybe (naively) have an queue of jobs (that are executed in parallel with a few workers), and each one that finishes, has its output added to a special queue class that accumulates data chunks - and whenever a new data chunk is pushed to it - it checks if there’s “enough chunks” already - and then applies a reduce on all the data it has accumulated (in this case, combine and upload), and then it of course continues to accumulate more… i’m not a python expert though, i mostly coded in Java, JS, etc. 🙂 how can i use prefect to make that ^ work?
i did something similar (in NodeJS) when i wanted to combine streaming inputs: i would accumulate them with a dedicated class that - when fills up (let’s say, with a capacity of 100) - fires the batch as a webhook to another destination (this was done to reduce the load on the destination so it doesn’t have to deal with tons of incoming events)
alternatively --- (and maybe this is more natural, i don’t know) - instead of splitting 500K rows to 500 batches of 1000 rows, i would split 500K to 50 batches of 10K rows, for example and THEN split each 10K batch to 10 batches of 1K rows so i have:
Copy code
500K -> 50 batches of 10K -> 10 batches of 1K
so that the batches of 1K are “sub-tasks” of each “task” of 10K the “task” of 10K finishes when all its “sub-tasks” of 1K finish (and are combined, and the combined file is uploaded to S3) is there a way to apply this kind of heirarchical mapping in Prefect?
a
To orchestrate that with Prefect you would need to package it into functions in the same way you would do that when moving to concurrent futures module Are we even talking about 1.0 or 2.0? Again I would encourage you to ask in Dask or Ray community since they are much better in this distributed data frames thing and once you know how to do that either using Python concurrent futures or using Dask or Ray, then you can orchestrate that using one of Prefect task runners
t
1.0
t
@Anna Geller that doesn’t address the issue - that just means that if i have an input of size N i can apply some computation X on it - and then, have M results, and apply some computation Y on the results i was already doing that: i was splitting my 58K input CSV to 58 batches of 1K each batch of the 58 was then mapped to the task:
wrapped_enrich_shell_task
that was applied on it, and each such task yielded an output of size 1K rows then, each output (of size 1K) had the task
wrapped_s3_upload_task
applied on it in Prefect it looked like:
Copy code
saved = save_as_csv(MAIN_INPUT_FILE_PATH, accounts_data)
    chunks = split_csv(file=saved, rowsize=1000)
    run_outputs = wrapped_enrich_shell_task.map(chunks)
    my_uploads = wrapped_s3_upload_task.map(run_outputs)
now i changed the code so that it combines the outputs into a single CSV, but i prefer it to happen in batches of 5K or 10K, NOT to wait for the entire 58 batches to finish
Copy code
saved = save_as_csv(MAIN_INPUT_FILE_PATH, accounts_data)
    chunks = split_csv(file=saved, rowsize=1000)
    run_outputs = wrapped_enrich_shell_task.map(chunks)
    combined = combine_csvs(run_outputs)
    my_uploads = wrapped_s3_upload_task(combined)
a
I understand the problem you are facing and I believe this post may help https://medium.com/the-prefect-blog/map-faster-mapping-improvements-in-prefect-0-12-0-7cacc3f14e16 to avoid that you have to wait for all mapped tasks to finish
t
^^ the DFE is relevant when the mapping is :
1 -> N -> M ->…
and so on where M is larger than or equal to N if i have
1-> N -> M
where M is LESS than N (i.e. - a reduce) i don’t see how it’s possible? how would prefect know how to reduce only “some” of the above layer?
each layer here has the same number of items - i.e. - it’s a map followed by a map followed by a map, and so on (it would also work if the lower layer had more) there is no reduce here, which is what i’m talking about
a
Gotcha. I said everything I know about the topic. Again, perhaps someone else from the community can help or perhaps you can ask about it on Dask/Ray discourse - good luck 🤞