Hello Prefect, I'm having a problem with `DaskExe...
# ask-community
d
Hello Prefect, I'm having a problem with
DaskExecutor
- after the flow run is finished, DASK workers are not freeing up used RAM resources. There are some references in Dask documentation about memory management(https://distributed.dask.org/en/latest/memory.html) but it is hard to understand how to apply these methods into Prefect code context. Does anybody have dealt with a Prefect Dask memory management problem and knows any solution to it?
k
Hey @Domantas, some people find success switching from threads to processes in this case
d
Hello @Kevin Kho, I'm using existing Dask cluster(
DaskExecutor(address="dask_address:port")
) and it seems transitions between parallelism methods are not giving any effect in terms of RAM clean up after flow is finished in the Dask worker pods. I'm attaching a screenshot of Dask status UI where it could be seen ~29 GB RAM usage where results of prefect tasks are stored and there are no prefect flows or other tasks running in the background.
k
I see what you mean now. So Prefect is currently not good at garbage collection. There are a couple of things you can do. First is deleting objects you may not need with
del
and hope Python collects it (the garbage collecttor is not always good). You can then persist intermediate data between tasks. Save it upstream and load it downsteam and just return the location. This makes the memory footprint lower because the result is smaller.
The last thing you can do is rely on Dask for garbage collection by using a resource manager and offshoring your work there.
s
@Domantas Are you able to provide an example of the Flow which results in this behavior? We are also tracking this issue and trying to isolate whether this issue (steadily increasing memory utilization on dask workers) is purely Prefect related or is a lower level dask issue. This is the related issue we are are using for tracking (please ignore some of the discussion above this comment https://github.com/pangeo-forge/pangeo-forge-recipes/issues/151#issuecomment-854085539 as it is unrelated to this specific issue). We are able to replicate this behavior in our pre-existing Dask cluster running on Fargate using a very simple flow such as
Copy code
@task
def download(source_url, cache_location):
    time.sleep(0.4)

as flow:
    days = Parameter(
        "days",
        default=pd.date_range("1981-09-01", "2021-01-05", freq="D")
        .strftime("%Y-%m-%d")
        .tolist(),
    )
    sources = list(
        map(
            source_url,
            pd.date_range("1981-09-01", "2021-01-05", freq="D")
            .strftime("%Y-%m-%d")
            .to_list(),
        )
    )
    #  Unsure if the unmapped parameter is part of the issue
    nc_sources = download.map(
        sources,
        cache_location=unmapped("none"),
    )
@Zanie Had been reviewing this with me previously.
t
@Kevin Kho using del on biggest objects didn't help. As for results we are using very small results. @Sean Harkins can't provide a simple flow, as our is a bit quite complex. But we are using multiple mapped runs. I found this on dask issue tracker: https://github.com/dask/distributed/issues/2757
It is a very frustrating issue, as it is blocking Prefect adoption for us šŸ˜ž
It would be possible to workaround this issue with temporary dask cluster (per flow), but we are running 200 flows at one moment, so while agent doesn't have concurrency limits it is now possible to do that, as it would kill our infrastructure.
k
Hey @Tadas, what would the ideal setup be for those 200 flows? Connecting to a cluster and running it there?
s
@Kevin Kho I think the most relevant question here is what is preventing dask worker memory from being freed when running many small tasks. As you can see with the small example flow I provided above (which essentially does nothing at the task level) any flow that executes a large number of tasks on a specific dask worker will eventually result in an OOM error on that worker (the speed and scale at which this will happen is obviously dependent on the memory allocation of the worker). For us, Tom Augspurger is looking at this from the dask distributed side but it would be great if someone from Prefect with more knowledge of the Prefect internals was able to look at this from the Prefect side of the
TaskRunner
. We dynamically create or dask cluster and scale it on a per-flow basis but our number of tasks is large enough to hit OOM issues even on large containers.
z
Hey @Sean Harkins -- I can try spinning up a dask cluster and running your example but I can't guarantee a fast timeline for this as I've got a ton on my plate right now.
Perhaps we can try explicitly purging futures from dask on flow run completion
s
šŸ‘ @Zanie No sweat, we probably won’t being doing too much investigation on the dask side until later next week. Thanks for investigating. I’ll try to post any of Tom’s findings here if we discover anything new.
t
@Kevin Kho that was my idea. Having one cluster, where we can control allocatable resources and running all tasks there. But it could go in other direction with better control how much flows run at one time. I don’t have a preference here.
Btw, if there is a way in which I can help to debug this issue - just shoot a message here.
s
@Zanie Just wanted to check in on this again and see if you had a chance to do any further investigation? We’ve fixed some of the previous
scheduler
memory issues we were experiencing due to serialization problems so now this
worker
memory ā€œleakā€ is the only issue blocking us. To revisit, simple flows with mapped tasks over large lists are not freeing worker memory. I put together this very simple example Flow (which uses nothing but sleep in the worker task)
Copy code
from prefect import task
import os
import json
import pandas as pd
from time import sleep
from prefect import storage, Flow, Parameter, unmapped

@task
def source_url(day: str) -> str:
    day = pd.Timestamp(day)
    source_url_pattern = (
        "<https://www.ncei.noaa.gov/data/>"
        "sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
        "{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
    )
    return source_url_pattern.format(day=day)


@task
def download(source_url, cache_location):
    sleep(0.4)

with Flow(
    flow_name,
    storage=flow_storage,
    run_config=run_config,
    executor=executor,
) as flow:
    days = Parameter(
        "days",
        default=pd.date_range("1981-09-01", "2021-01-05", freq="D")
        .strftime("%Y-%m-%d")
        .tolist(),
    )
    sources = source_url.map(days)
    #  Unsure if the unmapped parameter is part of the issue
    nc_sources = download.map(
        sources,
        cache_location=unmapped("none"),
    )
This results in steadily increasing memory usage on the workers as more tasks are executed. As you can see from the 2 captures of worker status below.
r
@Sean Harkins I am also facing something similar with a lot of small tasks (especially skipped ones) which leads to the dask scheduler memory to blow up. You mentioned that you fixed some scheduler memory issues, may you let me know what you ve done?
s
@Romain In our case we needed to refactor our classes which were being serialized slightly and actually remove some class properties that referenced potentially very large pandas objects and restructuring our class internal references a bit. You can see the changes being made in this PR https://github.com/pangeo-forge/pangeo-forge-recipes/pull/160
šŸ‘ 1
z
Hey Sean, we're meeting with some of the dask team soon and this will definitely be on the list of topics we go over.
šŸ™ 1
t
We are eagerly waiting for any news regarding worker memory issues šŸ™‚. Thanks.
@Zanie maybe there is some progress here?
k
Kind of? I talked to Sean in this thread and Dask itself has memory issues when mapping over a large number of futures. Coiled is donating a sprint to try to address these, and handed them feedback from out side.
t
Is there a place I can follow the progress on these issues?
k
I re-read your situation. I’d be happy to chat with you more so we can see if there’s a way to work around your situation.
I don’t know of the place to track or if there is one. I also don’t know the specifics of what they’re gonna do. Will message you in the morning and we can look into the specifics of your use case (if you’re willing)
t
Yea, I would be interested in chatting.