Hi, I'm running into an issue with the dask distri...
# ask-community
e
Hi, I'm running into an issue with the dask distributed executor where the scheduler is quickly running out of memory. The entire workflow is :
Copy code
import prefect


@prefect.task
def do_nothing(n):
    pass


with prefect.Flow("Dummy Flow") as flow:
    listy = list(range(200000))
    do_nothing.map(listy)
The scheduler pod runs out of memory after around 300 tasks, the screenshot of the dask dashboard is attached. Has anyone ran into this issue or have any ideas for a fix? We normally run prefect 0.14.16 but I've tried on both 0.14.16 and latest (0.15.5) with the same results.
k
Hey @Erik Amundson, what Dask version are you using? Because I think there were fixes on more recent versions of Dask. I appreciate the detailed writeup but if you get the chance, could you please move the Executor code to the thread so we don’t crowd the main channel?
e
Our flow executor is:
Copy code
DaskExecutor(
    cluster_class="dask_kubernetes.KubeCluster",
    cluster_kwargs={
        "pod_template": make_pod_spec(
            image="my/image:tag",
            memory_limit="3G",
            memory_request="3G",
            cpu_limit=0.9,
            cpu_request=0.9,
            extra_pod_config={
                "imagePullSecrets": [{"name": "regcred"}],
                "tolerations": [
                    {
                        "key": "k8s.dask.org_dedicated",
                        "operator": "Equal",
                        "effect": "NoSchedule",
                        "value": "worker",
                    }
                ],
            },
        ),
        "env": {"MALLOC_TRIM_THRESHOLD_": "0"},
        "n_workers": 10,
    },
)
with the run configuration:
Copy code
KubernetesRun(
    image=f"my/image:tag",
    image_pull_secrets=["regcred"],
)
Yeah, sorry about that. We're using dask-kubernetes version 2021.03.1 which is the latest version, and it installs dask version 2021.9.0 as a dependency
k
No worries. I’ll have to talk to the team about this. I thought there were updates in 2021.06.1 that resolved this and I see you use the
MALLOC_TRIM_THRESHOLD
also. Will ask people for answers.
I talked to the Dask/Coiled folks and this is unexpected and the
MALLOC_TIM_THRESHOLD_
does help most of the time. I’ve seen this issue before and I thought it was resolved (haven’t seen it in this Slack channel lately). That said, I’ll try taking a look at it but this will probably take a while. If you need something immediately, You likely need to batch up the elements of the list before passing it to the mapped task so that you map over a lower amount of elements and have a lower number futures returned
e
Ok I'll try that out, thank you!
k
Other people have also gotten around this with subflows using the
StartFlowRun
on the batches