Can someone suggest a best practice for mapping a large set to a task? My flow is failing with the error below. I'm just using a local dask executor in a very simple single task flow. I don't really understand this warning and it's unclear how to use something like
client.scatter
?
Copy code
python3.8/site-packages/distributed/worker.py:3373: UserWarning: Large object of size 126.73 MB detected in task graph:
{'task': <Task: blah>, 'state': None, 'ups ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
warnings.warn(
Killed
python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 48 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
Berty
06/18/2021, 6:48 PM
my flow is like:
Copy code
data = get_dataframe() # returns 250k rows
with flow('testing large set', executor=DaskExecutor()) as f:
my_simple_task.map(data.pk.values, data.more_data.values)
Berty
06/18/2021, 6:59 PM
Even reducing the set to map down to 10,000 fails! 😢
Berty
06/18/2021, 7:28 PM
Finally it works at 5,000 rows with a graph size of 2.7mb. I've resorted to iterating a chunked set to get past this as a blocker but I'd love to know the proper way to address this.
k
Kevin Kho
06/18/2021, 8:35 PM
Hey @Berty, I think this is conceptually incompatible because if you have a DataFrame already, you should not use the Prefect map to perform operations by row. You should probably just use Dask operations on that DataFrame instead of mapping. Why are you using the Prefect map in this case? What operation are you trying to do?
b
Berty
06/18/2021, 8:47 PM
Hi @Kevin Kho this is not a dask dataframe. I'm just passing 2 large lists of values to map.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.