Can someone suggest a best practice for mapping a ...
# ask-community
b
Can someone suggest a best practice for mapping a large set to a task? My flow is failing with the error below. I'm just using a local dask executor in a very simple single task flow. I don't really understand this warning and it's unclear how to use something like
client.scatter
?
Copy code
python3.8/site-packages/distributed/worker.py:3373: UserWarning: Large object of size 126.73 MB detected in task graph: 
  {'task': <Task: blah>, 'state': None, 'ups ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
  
Killed

python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 48 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
my flow is like:
Copy code
data = get_dataframe() # returns 250k rows
with flow('testing large set', executor=DaskExecutor()) as f:
    my_simple_task.map(data.pk.values, data.more_data.values)
Even reducing the set to map down to 10,000 fails! 😢
Finally it works at 5,000 rows with a graph size of 2.7mb. I've resorted to iterating a chunked set to get past this as a blocker but I'd love to know the proper way to address this.
k
Hey @Berty, I think this is conceptually incompatible because if you have a DataFrame already, you should not use the Prefect map to perform operations by row. You should probably just use Dask operations on that DataFrame instead of mapping. Why are you using the Prefect map in this case? What operation are you trying to do?
b
Hi @Kevin Kho this is not a dask dataframe. I'm just passing 2 large lists of values to map.
my_simple_task.map([250,000 values], [250,000 values])
this has the same issue
k
Ah ok I see what you mean. My bad. What does your task do? I assume it’s just simple for now right?
Can you try wrapping your flow.run() under
if __name__ == '__main__'
and retry?
b
@Kevin Kho Yes that's what I've done regarding the
if __name__
block. The task just fires off an api request and save the response json to disk.
k
I see I’ll check with the team if there are any ideas and get back to you on monday
🙏 1