Chris Arderne
02/10/2022, 4:02 PMDaskExecutor
!
If I run the code below without the DaskExecutor
, it gets to 40/50GB before being Killed because of OOM. However, if I run it as-is (i.e., on DaskExecutor
), it fails at 10GB, tries three times, and then gives a distributed.scheduler.KilledWorker
. Does Prefect do anything to the Dask worker memory limits? I'm facing this issue running with KubernetesRun and a DaskExecutor with a KubeCluster backend, where the worker spec has 50GB of memory for both k8s and the dask worker limit. It still fails around the 10GB mark. (In practise this is while loading PyTorch models and predicting.)
import numpy as np
import prefect
from prefect.executors import DaskExecutor
from prefect import Flow, task
@task
def test_memory_limits():
logger = prefect.context.get("logger")
for size in [1, 2, 5, 10, 20, 30, 40, 50]:
logger.warning(f"Creating array with {size=} GB")
a = np.ones((size * 1000, 1000, 1000), dtype=np.uint8)
logger.warning(f"Created with size {a.nbytes/1e9=} GB")
del a
logger.warning("Deleted")
with Flow(
"test-memory",
executor=DaskExecutor(),
) as flow:
_ = test_memory_limits()
Kevin Kho
Chris Arderne
02/10/2022, 4:26 PMKevin Kho