Hey! I’d like to clarify the best way to do a map-...
# ask-community
t
Hey! I’d like to clarify the best way to do a map-reduce style job with Prefect and Dask. We have some Parquet files in S3 containing a large number of S3 paths, and for each one we need to download the contents, run it through a library and collect the results into more Parquet files in S3. We could structure it like this using Dask:
Copy code
import dask.dataframe as dd

dataframe = dd.read_parquet("<s3://foo/bar>")
dataframe = dataframe.apply(do_something_expensive, axis="columns") 
dataframe.to_parquet("<s3://foo/bar2>").execute()
The question is: do Prefect mapping tasks help here? As I understand it a reduce task in Prefect cannot be done on each partition, so it needs the full set of inputs to be passed to it. If those inputs don’t fit in memory then it will fail? I must be misunderstanding something, because wouldn’t this limitation be quite… limiting?
k
Hey @Tom Forbes, you are right Prefect mapping with not help here. I think this discussion will show how to use Dask operations in a Prefect task
☝️ 1
s
I personally haven't done this but my intuition is that the first task in the flow would start a dask cluster and you would use that cluster to do the operations, then turn it off. (using https://cloudprovider.dask.org/en/latest/)
t
Yep, so we’re doing this with a temporary Dask cluster, but it’s not great. We like the fact that Prefect gives us a UI and progress for these tasks, and we have been told that it can be used with millions of elements. We lose all that if we go all-in on Dask for this, and the API is not as nice.
I was wondering if we where missing something. Just the ability to batch outputs of a mapping task would be a solution to this
Copy code
outputs = mapping_task.map(inputs=inputs)
save_to_s3_task.reduce(outputs, size=100)
to invoke
save_to_s3
with 100 elements at a time, as
mapping_task
is completed. or somesuch.
k
You can reshape to output of the map to batch it with an intermediate task to go from `[1,2,3,4,5,6]`to
[[1,2],[3,4],[5,6]]
. Did this not work for your use case?
t
the issue isn’t the shape, it’s that the reduce step requires the entire map task to be stored in memory. What if each task produces a matrix with 10mb of data?
k
Thinking out loud here but in a distributed environment, you would need either need a shuffle or collect the results to batch elements together, especially since you don't control ahead of time which elements go to which worker. This sounds like Dask would provide that functionality better. I understand though that you lose the Prefect monitoring if you do use Dask. Will raise this idea to the team. Was the pain point of using the temporary Dask cluster primarily that you don't get a much monitoring on the Dask operations?
Confirmed we will look into the batching of outputs. Thanks for the feedback!
t
Thanks! The pain point is more around having two ways to do similar things, and having to explain that one isn’t suitable for a very common operation.
👍 1
Consuming something from storage, mapping over it and shoving it back into storage is perhaps the most common workload. It’s not that you don’t have to collect the results, it’s more that you can’t do this workflow with Prefect alone unless it fits into a single nodes memory. Which kind of defeats the point of using Dask