Bogdan Serban
07/11/2022, 10:51 AMmap
function in an unmapped
call.
I am receiving an warning related to the size of the ml model that is being shared in the task graph (more details in the thread).
Is there an issue if I am receiving that error message? And is there a better way to share that ML model across the tasks?Anna Geller
07/11/2022, 11:05 AMBogdan Serban
07/11/2022, 11:06 AMUserWarning: Large object of size 168.32 MiB detected in task graph:
{'task': <Task: remove_bg>, 'state': None, 'upstre ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
warnings.warn(
@task
def list_folder(root_fld):
"""List all the image paths in the root_fld"""
@task
def load_image(image_path):
"""Reads an image from the given image_path"""
@task
def load_model():
"""Loads the Pytorch ML model and returns it"""
@task
def apply_model(img, model):
"""Runs inference of the given model on the give image and returns the result"""
executor = DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 6})
with Flow("resolution-flow", executor=executor) as flow:
root_fld = Parameter("root_fld",)
image_paths = list_folder(root_fld)
images = load_image.map(image_paths)
model = load_model()
inference_results = apply_model.map(images, unmapped(model))
Anna Geller
07/11/2022, 11:16 AMLocalDaskExecutor
and I would expect that this warning would be goneBogdan Serban
07/11/2022, 11:51 AMmap
calls are applied for that specific image. Am I missing something in the configuration of result caching?Anna Geller
07/11/2022, 12:00 PMBogdan Serban
07/11/2022, 12:02 PMAnna Geller
07/11/2022, 12:04 PMBogdan Serban
07/11/2022, 12:37 PM