I need some help understanding memory limits in `D...
# prefect-community
c
I need some help understanding memory limits in
DaskExecutor
! If I run the code below without the
DaskExecutor
, it gets to 40/50GB before being Killed because of OOM. However, if I run it as-is (i.e., on
DaskExecutor
), it fails at 10GB, tries three times, and then gives a
distributed.scheduler.KilledWorker
. Does Prefect do anything to the Dask worker memory limits? I'm facing this issue running with KubernetesRun and a DaskExecutor with a KubeCluster backend, where the worker spec has 50GB of memory for both k8s and the dask worker limit. It still fails around the 10GB mark. (In practise this is while loading PyTorch models and predicting.)
Copy code
import numpy as np
import prefect
from prefect.executors import DaskExecutor
from prefect import Flow, task

@task
def test_memory_limits():
    logger = prefect.context.get("logger")
    for size in [1, 2, 5, 10, 20, 30, 40, 50]:
        logger.warning(f"Creating array with {size=} GB")
        a = np.ones((size * 1000, 1000, 1000), dtype=np.uint8)
        logger.warning(f"Created with size {a.nbytes/1e9=} GB")
        del a
        logger.warning("Deleted")


with Flow(
    "test-memory",
    executor=DaskExecutor(),
) as flow:
    _ = test_memory_limits()
k
I don’t think we do anything on the Dask worker memory limits.
This is hard to answer here but there is also ongoing research into the Dask performance for Prefect 2.0 so I suggest you post this as an issue to Github so the devs will see it
c
Okay thanks @Kevin Kho will do. (Once I've done a bit more scratching to see if I can replicate without Prefect.)
k
Thank you!