https://prefect.io logo
s

Severin Ryberg [sevberg]

01/26/2021, 2:00 PM
Hi all! We've recently run into the issue that very short tasks which should run in less than a second (tested when using a LocalAgent) can take much longer than that when using a KubernetesAgent. Unsurprisingly, it appears the culprit is the overhead time it takes for the DaskKubernetesEnvironment to spin up and tear down a worker pod, which is done for each task invocation. • Is it possible to avoid this overhead (or at least only pay it once), by telling the KubernetesAgent and/or DaskKubernetesEnvironment to always keep X worker pods alive which continue to wait for and complete tasks without dying until the end of the flow? • Or, if the above isn't possible, then perhaps the only thing to do here is have less finely-resolved tasks so that the overhead is not so noticeable. In such a case, what is the suggested single-task execution time when using a KubernetesAgent / DaskKubernetesEnvironment? • Possibility for other routes?
j

Jim Crist-Harif

01/26/2021, 5:19 PM
Hi Severin, First, note that
Environment
based configuration has been deprecated in favor of using
flow.run_config
and
flow.executor
(for
DaskKubernetesEnvironment
this would be a mix of a
KubernetesRun
run config and a
DaskExecutor
configured to use
dask-kubernetes
).
Environment
based config will continue to work for a while (it's a slow deprecation), but we encourage users to transition.
When running with a k8s agent, each flow run will create a new k8s job, which will have some delay before startup. There's no way around this, if you need faster startup times than k8s can start jobs you'll want to either use a different agent, or make your flow runs less fine grained so you don't need to run as many of them. That said, if your flows make use a large (distributed) dask cluster on k8s, you could speed up execution time by keeping a dask cluster running in the background that your flows connect to. You can do this by providing an address to a
DaskExecutor
configured on a flow:
Copy code
flow.executor = DaskExecutor(address=...)
You might make use the helm chart (https://github.com/dask/helm-chart) for managing this.
Is it possible to avoid this overhead (or at least only pay it once), by telling the KubernetesAgent and/or DaskKubernetesEnvironment to always keep X worker pods alive which continue to wait for and complete tasks without dying until the end of the flow?
Dask shouldn't be starting a new worker for every task (the flow should startup a dask cluster, run all tasks on it, then shutdown the cluster). If you're using adaptive scaling the cluster might scale up/down during a flow run to add workers when needed and remove them when unneeded. There are some config options for how long a worker can sit idle before scaling down, if your flows have large gaps between work this may be triggering this behavior. You can also run with a fixed set of workers instead (pass
n_workers
to
cluster_kwargs
in the
DaskExecutor
constructor), which will disable adaptive scaling.
Copy code
flow.executor = DaskExecutor(
    cluster_class=...,
    cluster_kwargs={"n_workers": 5, ...},
)
Also note that if you only need a small amount of parallelism, you may be better served with using a
LocalDaskExecutor
, which avoids all the complications of running multiple pods for a flow.
r

Robin

01/26/2021, 8:59 PM
Dear @Jim Crist-Harif, thanks for the answers, which seem to answer all our questions (@Severin Ryberg [sevberg] will correct me otherwise tomorrow morning if not). šŸ™ˆ Is there a blog article or part of the documentation or so, that describes how to setup the run config with the latest release accordingly? We are also open to use ECS, Fargate or other similar backends instead of AWS EKS if it simplifies or improves the setup.
To give some more context about the flow: One of our flows currently consists of ~20 smaller tasks of which some are taking much longer than others. Other flows rather have 3-5 bigger tasks. All flows are executed for 10s of thousands of systems, hence high parallelization would be great (1000s to 10_000s parallel tasks ideally).
As Sev described above, we thought of batching together tasks that execute fast, but seem to have quite some overhead. But your explanations suggest that prefect should normally not have such a big overhead and that batching tasks should not be required (would also seem unpythonic/unintuitive). We would be happy to optimize our prefect setup and not need to think about workarounds or batching etc. šŸ™‚
s

Severin Ryberg [sevberg]

01/26/2021, 10:06 PM
Hey @Jim Crist-Harif, really appreciate the detailed answer! Lots of new information to go on šŸ‘ Nevertheless, as @Robin feared the issue is still a bit unclear from my side.
When running with a k8s agent, each flow run will create a new k8s job, which will have some delay before startup. There's no way around this...Dask shouldn't be starting a new worker for every task (the flow should startup a dask cluster, run all tasks on it, then shutdown the cluster)
Sorry if I didnt pose the question clearly enough, however we do in fact see only one k8s job, which only creates one dask cluster. These of course take time to start-up, but this is negligible in comparison to the overall flow, and so isnt a big concern. Rather it's the time required to create and destroy dask workers (once for each mapped iteration of a task) which are creating the issue. But perhaps this is a behavior which has changed in the lateat Prefect version? Just to be sure, Id like to check this with a hypothetical. If a flow maps a task over an array of 100 items, but we have
n_workers
set to 5, then how many dask worker pods should we expect to see in total in relation to this task? Our current observations show 100, of which only 5 would be running at once. Is this the intended behavior? If so, then sadly our issue will still be present. If possible, what we would like to see would be only 5 dask worker pods (which each do on average 20 of the mapped sub-tasks)
j

Jim Crist-Harif

01/26/2021, 10:13 PM
If a flow maps a task over an array of 100 items, but we have
n_workers
set to 5, then how many dask worker pods should we expect to see in
total in relation to this task? Our current observations show 100, of
which only 5 would be running at once.
This is certainly not the intended behavior. If you set the number of dask workers to 5 (so no adaptive scaling), then you should only see 5 dask workers ever. The number of active tasks would depend on the number of threads available per worker, but in general prefect should keep all workers busy if there are enough tasks.
s

Severin Ryberg [sevberg]

01/26/2021, 10:34 PM
Great! Thanks for the clarification. Sounds like the problem is user-error, which was exactly what I hoping to hear 😃. Ill run a test with the newest prefect and the run_config and get back to you
Hello again @Jim Crist-Harif, I have an update for you: I've set up a run_config with KubernetesRun, and an executer with DaskExecutor (using a KuberCluster). This is what you had in mind, I hope? Feedback here would be highly appreciated šŸ™‚
Copy code
# Create run config
flow.run_config = KubernetesRun(
    cpu_request=2, memory_request="2G", env={"AWS_DEFAULT_REGION": "eu-central-1"}
)


# Create Dask Executor
def make_cluster(n_workers, image):
    """Start a cluster using the same image as the flow run"""
    from dask_kubernetes import KubeCluster, make_pod_spec

    pod_spec = make_pod_spec(
        image=image,
        memory_limit="1900M",
        memory_request="1900M",
        cpu_limit=0.5,
        cpu_request=0.5,
    )

    return KubeCluster(pod_spec, n_workers=n_workers)


flow.executor = DaskExecutor(
    cluster_class=make_cluster,
    cluster_kwargs={"n_workers": 10, "image": flow.storage.name},
)
Anyway, in the end it behaved exactly as you suggested it would šŸ˜„. As in, when mapping over a task with 100 iterations, and
n_workers=5
, then I saw only 5 dask worker pods which worked throughout the run (very efficient!) If you're interested, you can see a full codebase of what I've written here, which will hopefully be helpful to others who are interested in a similar set-up. We're also plan to turn this in a blog post sometime soon(ish), also with information about how our team has mixed Prefect with Pulumi to unleash a robot army šŸ¤–
upvote 1
In any case, thanks again for the helpful feedback. And also, props to the Prefect team for solving the parallelization problem with the newest version before we had even asked our question!
šŸŽ‰ 1
j

Jim Crist-Harif

01/28/2021, 4:29 PM
Excellent! Glad you got things working. We'd love to see a blogpost about what y'all are doing, looking forward to seeing that.
One small tip - you can access the current docker image used by the flow run at runtime via the
prefect.context.image
field. This would let you avoid passing in an image via
cluster_kwargs
to
DaskExecutor
, and let your
make_cluster
function determine the image automatically.
Copy code
def make_cluster(n_workers, image=None):
    """Start a cluster using the same image as the flow run"""
    from dask_kubernetes import KubeCluster, make_pod_spec

    pod_spec = make_pod_spec(
        image=image or prefect.context.image,  # default to current active image
        memory_limit="1900M",
        memory_request="1900M",
        cpu_limit=0.5,
        cpu_request=0.5,
    )

    return KubeCluster(pod_spec, n_workers=n_workers)
See https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster for more info.