Severin Ryberg [sevberg]
01/26/2021, 2:00 PMJim Crist-Harif
01/26/2021, 5:19 PMEnvironment
based configuration has been deprecated in favor of using flow.run_config
and flow.executor
(for DaskKubernetesEnvironment
this would be a mix of a KubernetesRun
run config and a DaskExecutor
configured to use dask-kubernetes
). Environment
based config will continue to work for a while (it's a slow deprecation), but we encourage users to transition.DaskExecutor
configured on a flow:
flow.executor = DaskExecutor(address=...)
You might make use the helm chart (https://github.com/dask/helm-chart) for managing this.
Is it possible to avoid this overhead (or at least only pay it once), by telling the KubernetesAgent and/or DaskKubernetesEnvironment to always keep X worker pods alive which continue to wait for and complete tasks without dying until the end of the flow?Dask shouldn't be starting a new worker for every task (the flow should startup a dask cluster, run all tasks on it, then shutdown the cluster). If you're using adaptive scaling the cluster might scale up/down during a flow run to add workers when needed and remove them when unneeded. There are some config options for how long a worker can sit idle before scaling down, if your flows have large gaps between work this may be triggering this behavior. You can also run with a fixed set of workers instead (pass
n_workers
to cluster_kwargs
in the DaskExecutor
constructor), which will disable adaptive scaling.
flow.executor = DaskExecutor(
cluster_class=...,
cluster_kwargs={"n_workers": 5, ...},
)
LocalDaskExecutor
, which avoids all the complications of running multiple pods for a flow.Robin
01/26/2021, 8:59 PMSeverin Ryberg [sevberg]
01/26/2021, 10:06 PMWhen running with a k8s agent, each flow run will create a new k8s job, which will have some delay before startup. There's no way around this...Dask shouldn't be starting a new worker for every task (the flow should startup a dask cluster, run all tasks on it, then shutdown the cluster)Sorry if I didnt pose the question clearly enough, however we do in fact see only one k8s job, which only creates one dask cluster. These of course take time to start-up, but this is negligible in comparison to the overall flow, and so isnt a big concern. Rather it's the time required to create and destroy dask workers (once for each mapped iteration of a task) which are creating the issue. But perhaps this is a behavior which has changed in the lateat Prefect version? Just to be sure, Id like to check this with a hypothetical. If a flow maps a task over an array of 100 items, but we have
n_workers
set to 5, then how many dask worker pods should we expect to see in total in relation to this task? Our current observations show 100, of which only 5 would be running at once. Is this the intended behavior? If so, then sadly our issue will still be present. If possible, what we would like to see would be only 5 dask worker pods (which each do on average 20 of the mapped sub-tasks)Jim Crist-Harif
01/26/2021, 10:13 PMIf a flow maps a task over an array of 100 items, but we haven_workers
set to 5, then how many dask worker pods should we expect to see in
total in relation to this task? Our current observations show 100, of
which only 5 would be running at once.This is certainly not the intended behavior. If you set the number of dask workers to 5 (so no adaptive scaling), then you should only see 5 dask workers ever. The number of active tasks would depend on the number of threads available per worker, but in general prefect should keep all workers busy if there are enough tasks.
Severin Ryberg [sevberg]
01/26/2021, 10:34 PM# Create run config
flow.run_config = KubernetesRun(
cpu_request=2, memory_request="2G", env={"AWS_DEFAULT_REGION": "eu-central-1"}
)
# Create Dask Executor
def make_cluster(n_workers, image):
"""Start a cluster using the same image as the flow run"""
from dask_kubernetes import KubeCluster, make_pod_spec
pod_spec = make_pod_spec(
image=image,
memory_limit="1900M",
memory_request="1900M",
cpu_limit=0.5,
cpu_request=0.5,
)
return KubeCluster(pod_spec, n_workers=n_workers)
flow.executor = DaskExecutor(
cluster_class=make_cluster,
cluster_kwargs={"n_workers": 10, "image": flow.storage.name},
)
n_workers=5
, then I saw only 5 dask worker pods which worked throughout the run (very efficient!)
If you're interested, you can see a full codebase of what I've written here, which will hopefully be helpful to others who are interested in a similar set-up. We're also plan to turn this in a blog post sometime soon(ish), also with information about how our team has mixed Prefect with Pulumi to unleash a robot army š¤Jim Crist-Harif
01/28/2021, 4:29 PMprefect.context.image
field. This would let you avoid passing in an image via cluster_kwargs
to DaskExecutor
, and let your make_cluster
function determine the image automatically.
def make_cluster(n_workers, image=None):
"""Start a cluster using the same image as the flow run"""
from dask_kubernetes import KubeCluster, make_pod_spec
pod_spec = make_pod_spec(
image=image or prefect.context.image, # default to current active image
memory_limit="1900M",
memory_request="1900M",
cpu_limit=0.5,
cpu_request=0.5,
)
return KubeCluster(pod_spec, n_workers=n_workers)
See https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster for more info.