Erik Amundson

    Erik Amundson

    1 year ago
    Hi, I'm running into an issue with the dask distributed executor where the scheduler is quickly running out of memory. The entire workflow is :
    import prefect
    
    
    @prefect.task
    def do_nothing(n):
        pass
    
    
    with prefect.Flow("Dummy Flow") as flow:
        listy = list(range(200000))
        do_nothing.map(listy)
    The scheduler pod runs out of memory after around 300 tasks, the screenshot of the dask dashboard is attached. Has anyone ran into this issue or have any ideas for a fix? We normally run prefect 0.14.16 but I've tried on both 0.14.16 and latest (0.15.5) with the same results.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Erik Amundson, what Dask version are you using? Because I think there were fixes on more recent versions of Dask. I appreciate the detailed writeup but if you get the chance, could you please move the Executor code to the thread so we don’t crowd the main channel?
    Erik Amundson

    Erik Amundson

    1 year ago
    Our flow executor is:
    DaskExecutor(
        cluster_class="dask_kubernetes.KubeCluster",
        cluster_kwargs={
            "pod_template": make_pod_spec(
                image="my/image:tag",
                memory_limit="3G",
                memory_request="3G",
                cpu_limit=0.9,
                cpu_request=0.9,
                extra_pod_config={
                    "imagePullSecrets": [{"name": "regcred"}],
                    "tolerations": [
                        {
                            "key": "k8s.dask.org_dedicated",
                            "operator": "Equal",
                            "effect": "NoSchedule",
                            "value": "worker",
                        }
                    ],
                },
            ),
            "env": {"MALLOC_TRIM_THRESHOLD_": "0"},
            "n_workers": 10,
        },
    )
    with the run configuration:
    KubernetesRun(
        image=f"my/image:tag",
        image_pull_secrets=["regcred"],
    )
    Yeah, sorry about that. We're using dask-kubernetes version 2021.03.1 which is the latest version, and it installs dask version 2021.9.0 as a dependency
    Kevin Kho

    Kevin Kho

    1 year ago
    No worries. I’ll have to talk to the team about this. I thought there were updates in 2021.06.1 that resolved this and I see you use the
    MALLOC_TRIM_THRESHOLD
    also. Will ask people for answers.
    I talked to the Dask/Coiled folks and this is unexpected and the
    MALLOC_TIM_THRESHOLD_
    does help most of the time. I’ve seen this issue before and I thought it was resolved (haven’t seen it in this Slack channel lately). That said, I’ll try taking a look at it but this will probably take a while. If you need something immediately, You likely need to batch up the elements of the list before passing it to the mapped task so that you map over a lower amount of elements and have a lower number futures returned
    Erik Amundson

    Erik Amundson

    1 year ago
    Ok I'll try that out, thank you!
    Kevin Kho

    Kevin Kho

    1 year ago
    Other people have also gotten around this with subflows using the
    StartFlowRun
    on the batches