Hello everyone! I am building a flow where inferen...
# prefect-community
b
Hello everyone! I am building a flow where inference on a pytorch model is mapped on a series of images which are read from the disk. Currently, I am loading the model within a dedicated task and passing it as an argument to the downstream
map
function in an
unmapped
call. I am receiving an warning related to the size of the ml model that is being shared in the task graph (more details in the thread). Is there an issue if I am receiving that error message? And is there a better way to share that ML model across the tasks?
1
a
can you move code blocks into the thread?
b
definitely.
🙌 1
I am receiving the following warning message when running the flow:
Copy code
UserWarning: Large object of size 168.32 MiB detected in task graph: 
  {'task': <Task: remove_bg>, 'state': None, 'upstre ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
Here is a sample flow for reference:
Copy code
@task
def list_folder(root_fld):
    """List all the image paths in the root_fld"""

@task
def load_image(image_path):
    """Reads an image from the given image_path"""

@task
def load_model():
    """Loads the Pytorch ML model and returns it"""

@task
def apply_model(img, model):
    """Runs inference of the given model on the give image and returns the result"""

executor = DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 6})
with Flow("resolution-flow", executor=executor) as flow:
    root_fld = Parameter("root_fld",)
    image_paths = list_folder(root_fld)
    images = load_image.map(image_paths)
    model = load_model()
    inference_results = apply_model.map(images, unmapped(model))
a
this is really conscientious, most people would ignore this warning this warning comes from Dask being cautious about resources - I believe if your Dask cluster has the capacity to handle this data, it's safe to ignore this warning and your flow looks great, the syntax is perfectly valid in the current configuration, you may actually even switch to
LocalDaskExecutor
and I would expect that this warning would be gone
b
thank you for your answer 😄
I would have another question on this topic, if you would be so kind to help: When I am applying this flow on a large number of images, there is a high chance of stumbling on an OOM, as all the images get loaded into memory and are not dumped even if all the downstream
map
calls are applied for that specific image. Am I missing something in the configuration of result caching?
a
handling memory issues on Dask is a complex issue on its own: https://distributed.dask.org/en/stable/memory.html in Prefect 2.0 we have both task- and flow-level retries to help with failures often OOM results in a crash of the underlying infrastructure (e.g. container/pod) so it's more up to the execution layer to ensure enough memory is allocated for the runs Prefect, as of today, generally doesn't perform low-level resource management, things like Kubernetes or cloud providers are more suitable to track resource utilization and scale the infrastructure
TL;DR: it's complicated and you'd need to figure out how to allocate enough memory on your own based on your infrastructure/execution layer; neither Prefect nor Dask can't do that automatically
b
would there be a way to do the processing by chunks of data, instead of all at once?
a
A useful question to ask yourself is: how would you do that in Python without Prefect? Prefect generally is for orchestration - it's not a distributed compute or data processing framework once you figured that out, you can then use Prefect for orchestration, scheduling, operationalizing your flow and for observability
🙌 1
b
i think I got an Idea on how to handle that. Thank you!
🙌 1