Hey everyone looking for ideas on how to solve a P...
# prefect-community
j
Hey everyone looking for ideas on how to solve a Prefect / Dask interaction I am having. In a general form I have a flow that does the following:
Copy code
ids = [some_iterable_of_data_ids_to_retrieve]
with Flow() as flow:
    big_mixed_result = get_data.map(ids)
    filtered_result = filter_data(big_mixed_result)
    processed = process_data.map(filtered_result)
I run the flow using a
distributed.LocalCluster
/
DaskExecutor
and when it hits the
process_data
task, I get the: "UserWarning: Large object ... detected in task graph... consider using Client.scatter" Prefect / Dask tries to continue on but fails and restarts the Flow after workers hit memory limit. I guess what is confusing to me is that I am doing a
map
operation on the task so I wouldn't expect any large object being transferred between workers. I would have assumed that the
map
call only sends each small iteration but I guess that's not the case? One idea I was considering was instead of using a
List[Dict]
instead using a
dask.bag
to split the memory across workers maybe? I don't really know, any and all ideas welcome.
k
Hello @Jackson Maxfield Brown! I’ll ask the team for insight here, but hopefully others have some suggestions.
j
Hi Jackson, the flow as written should work as is. But due to some issues in Prefect's dask-related code, the tasks prefect submits to dask can be rather large in size (the large object dask is warning you about is likely the prefect task itself, and not your data). The good news is that this has been resolved in master, which should be released in the next couple days as 0.12.0. If the problem persists after the next release, there may be another issues, but that's my first guess.
upvote 2
j
Will install from master and give it a go. Thanks for the info!
Reporting back: upgrading to current git head solved the issue! 🎉 The dask / bokeh dashboard looks a bit funky after the upgrade but the actual data processing ran without hiccups!
🎉 1