Domantas
06/09/2021, 8:23 AMDaskExecutor
- after the flow run is finished, DASK workers are not freeing up used RAM resources.
There are some references in Dask documentation about memory management(https://distributed.dask.org/en/latest/memory.html) but it is hard to understand how to apply these methods into Prefect code context.
Does anybody have dealt with a Prefect Dask memory management problem and knows any solution to it?Kevin Kho
Domantas
06/09/2021, 2:01 PMDaskExecutor(address="dask_address:port")
) and it seems transitions between parallelism methods are not giving any effect in terms of RAM clean up after flow is finished in the Dask worker pods.
I'm attaching a screenshot of Dask status UI where it could be seen ~29 GB RAM usage where results of prefect tasks are stored and there are no prefect flows or other tasks running in the background.Kevin Kho
del
and hope Python collects it (the garbage collecttor is not always good). You can then persist intermediate data between tasks. Save it upstream and load it downsteam and just return the location. This makes the memory footprint lower because the result is smaller.Kevin Kho
Sean Harkins
06/10/2021, 2:54 PM@task
def download(source_url, cache_location):
time.sleep(0.4)
as flow:
days = Parameter(
"days",
default=pd.date_range("1981-09-01", "2021-01-05", freq="D")
.strftime("%Y-%m-%d")
.tolist(),
)
sources = list(
map(
source_url,
pd.date_range("1981-09-01", "2021-01-05", freq="D")
.strftime("%Y-%m-%d")
.to_list(),
)
)
# Unsure if the unmapped parameter is part of the issue
nc_sources = download.map(
sources,
cache_location=unmapped("none"),
)
Sean Harkins
06/10/2021, 2:54 PMTadas
06/11/2021, 6:42 AMTadas
06/11/2021, 6:43 AMTadas
06/11/2021, 6:49 AMKevin Kho
Sean Harkins
06/11/2021, 7:34 PMTaskRunner
. We dynamically create or dask cluster and scale it on a per-flow basis but our number of tasks is large enough to hit OOM issues even on large containers.Zanie
Zanie
Sean Harkins
06/11/2021, 8:17 PMTadas
06/12/2021, 10:02 AMTadas
06/14/2021, 4:21 PMSean Harkins
06/24/2021, 7:40 PMscheduler
memory issues we were experiencing due to serialization problems so now this worker
memory āleakā is the only issue blocking us. To revisit, simple flows with mapped tasks over large lists are not freeing worker memory. I put together this very simple example Flow (which uses nothing but sleep in the worker task)
from prefect import task
import os
import json
import pandas as pd
from time import sleep
from prefect import storage, Flow, Parameter, unmapped
@task
def source_url(day: str) -> str:
day = pd.Timestamp(day)
source_url_pattern = (
"<https://www.ncei.noaa.gov/data/>"
"sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
"{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
)
return source_url_pattern.format(day=day)
@task
def download(source_url, cache_location):
sleep(0.4)
with Flow(
flow_name,
storage=flow_storage,
run_config=run_config,
executor=executor,
) as flow:
days = Parameter(
"days",
default=pd.date_range("1981-09-01", "2021-01-05", freq="D")
.strftime("%Y-%m-%d")
.tolist(),
)
sources = source_url.map(days)
# Unsure if the unmapped parameter is part of the issue
nc_sources = download.map(
sources,
cache_location=unmapped("none"),
)
Sean Harkins
06/24/2021, 7:41 PMSean Harkins
06/24/2021, 7:41 PMRomain
06/25/2021, 10:09 AMSean Harkins
06/25/2021, 2:25 PMZanie
Tadas
06/30/2021, 10:15 AMTadas
07/12/2021, 5:58 AMKevin Kho
Tadas
07/12/2021, 6:17 AMKevin Kho
Kevin Kho
Tadas
07/12/2021, 6:22 AM