Hi all. I'm new to Prefect. Trying to use it to pr...
# ask-community
g
Hi all. I'm new to Prefect. Trying to use it to process a large dataset of 3d images. It works nicely for a small subset of the dataset, but when I try it on the whole dataset I find that "Unmanaged memory" (as per dask's dashboard) accumulates in the dask-workers and they end up OOM'ing. In broad terms, the flow does the following: • read in each image and apply a couple of simple transformations (cropping, aligning, ...) • compute some global stats across the whole dataset • use the global stats as parameters to further process the images so "map - reduce - map" I'm trying to avoid IO ops (and maybe I just shouldn't), so I do expect dask's memory to increase as it stores intermediate results, but I would assume that to be "Managed memory". • Could it be that this is just Prefect storing intermediate results? And dask doesn't "know" about them? • Is it possible I'm messing something up in terms of how I'm programming the workflow, leading to unnecessary serialization of large objects? (I do get some messages about this). I tried to use prefect's constructs everywhere to avoid messing this up. ◦ e.g.: a prefect task reads a file with a list of paths, a read-file task is mapped to this list, and so on. so I don't see where I could be passing large objects to be serialized • I also tried manually garbage collecting inside the prefect task where memory seems to be accumulating any ideas on how to further debug this? thanks in advance!!
this is what the flow looks like
and this is what the dashboard looks like:
k
Hi @Gui Pires, Prefect doesn’t manage memory so as you said, you can probably decrease the memory footprint by controlling the IO. You can write out the file intermediately, and then return the location. The downstream task can read in the location. Do you have results on these tasks? Because the default behavior will be to checkpoint the task output as a result. This should be reflected in disk space rather than RAM though. Are you familiar with

this

video? Maybe it can help also reduce unmanaged memory.
I assume the messages about the serialization of large objects is when your have a reduce step involved? The results of the mapped tasks will be collected
g
I see those messages when the read-rescale-align task is being mapped across the image filenames. I had seen that video and tried using its code snippet but it didn't help 😕 I'll try writing the images to disk and returning the file paths instead. If that works at least I know it's because I'm storing big images
k
Do you think those read-rescale-align need to be separate tasks? Or just one that calls ordinary Python functions?
g
didn't get the second question
k
Oops sorry I edited it. I think if these operations succeed and you don’t need observability around them, you can just treat them as one bigger task
g
hm. I do want it to operate in parallel for as many images as possible
k
I think you can still make it parallel.
Copy code
def a(x):
    return x + 1

def b(x):
    return x + 1

def c(x)
    return x + 1

@task
def a-b-c(x)
   return a(b(c(x)))

with Flow(..) as flow:
    a-b-c.map([1,2,3,4])
versus
Copy code
@task
def a(x):
    return x + 1

@task
def b(x):
    return x + 1

@task
def c(x)
    return x + 1

with Flow(..) as flow:
    _a = a.map([1,2,3,4])
    _b = b.map(_a)
    _c = c.map(_b)
The first snippet represents combining
read-rescale-align
into a bigger task
g
sorry I misunderstood your point initially. read-rescale-align is already a composition of tasks similar to what you propose
k
Ah sorry I misunderstood. Got it