hi guys, any way to speed up (i.e. avoid serializi...
# prefect-community
t
hi guys, any way to speed up (i.e. avoid serializing the whole result and sending it back to Prefect) task mapping? I've got a ~1M item list/tuple computed with Dask Bags which I'd like to map over, and between completing computation of the bag itself and the first mapped task I'm observing a good deal of time spent waiting for the complete result list
z
Hi @trapped! Speeding up mapping is definitely on our roadmap for the next week or two, and we're aiming to make some improvements here. While the upcoming changes should help your situation, there is a certain amount of time necessitated by Dask sending results to/from workers, so there's an upper bound on how much we can improve. So while we are working on it, I want to make sure to set expectations accordingly.
t
hi @Zachary Hughes, that's great to know! in my particular case, the list I'd like to map over is composed of many small items (~12B each, an identifier), so I'd expect transfer of each individual item to be quite fast; any clues on whether the mapping improvements you mentioned will improve my use case?
j
@trapped We do a lot of mapping in Prefect & Dask so I'm really interested in your use case. One thought -- if you already have data in a Dask bag I assume what's most important is to distribute processing to your Dask cluster so that it happens in parallel, but it's not so critical that each individual item get distributed. Would it work to not map over all ~1M items, but to distribute chunks of work to say N workers, e.g. each worker takes ~1M/N items and processes those? Also, roughly how many workers are you using for your Dask cluster?
Another quick thought is (depending on your use case) to use one simple (not mapped) Prefect task that calls dask.bag.map() inside that task. That might be a better approach, i.e. just let Dask handle the processing rather than doing it at the Prefect level.
t
@Joe Schmid yes, the Bag computation is already running on the Dask cluster as it's somewhat heavy; if possible I'd rather handle each item as a separate task: while processing of each item is not heavy computation-wise, it's rather time-consuming. being able to wrap processing in Prefect tasks saves me from caching, retries, etc.
I've only tested locally with a small LocalCluster for now; also, yes - I'm just going to map directly with Dask for now
j
@trapped Gotcha, makes sense. If you have a chance, let us know how it goes!
t
@Joe Schmid sure! another thing I'd like to try would be not returning the Bag results but their "count" (or rather
list(range(bag.count().compute()))
), then accessing each item (persisted/scattered) by its index
j
That makes a lot of sense. Since your Dask bag is already distributed on the cluster, no need to push that data around to Prefect tasks -- just access it in place. It's interesting to think about distributing processing on two levels: level 1 with Prefect task mapping and level 2 with Dask operations inside of Prefect tasks.