Egil Bugge
05/03/2022, 7:18 PMimport prefect
from prefect import Flow, task
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.executors import DaskExecutor
from dask_kubernetes import KubeCluster, make_pod_spec
PROJECT_NAME = ...
FLOW_NAME = ...
IMAGE_TAG = ...
storage = Docker(
registry_url = f"<http://eu.gcr.io/{PROJECT_NAME}|eu.gcr.io/{PROJECT_NAME}>",
image_name = FLOW_NAME,
image_tag = IMAGE_TAG,
dockerfile = "Dockerfile"
)
run_config = KubernetesRun(
image = f"<http://eu.gcr.io/{PROJECT_NAME}/{FLOW_NAME}:{IMAGE_TAG}|eu.gcr.io/{PROJECT_NAME}/{FLOW_NAME}:{IMAGE_TAG}>",
labels = ["dask"])
executor = DaskExecutor(
cluster_class=lambda: KubeCluster(
make_pod_spec(
image=prefect.context.image,
extra_pod_config= {
"nodeSelector": {"<http://cloud.google.com/gke-nodepool|cloud.google.com/gke-nodepool>": "worker-pool"}
}
)),
adapt_kwargs={"minimum": 1, "maximum": 3},
)
with Flow(
name = FLOW_NAME,
storage = storage,
executor = executor,
run_config = run_config) as flow:
#tasks here
Anna Geller
05/03/2022, 9:19 PMI'm having some issue getting the autoscaler to remove the nodesGenerally speaking, this is a Kubernetes issue, not a Prefect issue, and I'm also no DevOps expert, but let me try to help still. 1. You could try GKE Autopilot if you don't want to deal with this type of issues - it would remove this burden of autoscaling from you 2. If you want to make
Cluster Autoscaler
work on GKE
, you have to create Disruptions with proper information, how to create it can be found in How to set PDBs to enable CA to move kube-system pods? - more on that can be found here
From Prefect perspective, your flow is fineEgil Bugge
05/04/2022, 7:01 AM