Hi! quick question, what's the recommended approac...
# prefect-kubernetes
m
Hi! quick question, what's the recommended approach to define the KubernetesJob resources (cpu and RAM requests and limits) in a flow deployment to a Kubernetes based Work Pool?
Copy code
def deploy_hello_flow():
    hello_flow.deploy(
        name="hello_flow_k8s_deployment",
        work_pool_name="default-worker-pool", # k8s work pool
        image="europe-docker.pkg.dev/cambium-earth/prefect-runners-prod/default:latest",
        tags=["prod"],
        schedule=CronSchedule(cron="0 12 1 * *", timezone="UTC"),
    )
1
j
You can add the variables to the Base Job Template of the work pool. 1. Create a K8s work pool and click on the Advanced tab under Base Job Template 2. Edit the JSON to add the four variables you’d like and add them to the “job_configuration” . I’ll paste the JSON below. 3. You can then add defaults for those values in the Work Pool-> Base Job Template-> Defaults tab if you like. 4. In the deployment, pass a dictionary to .deploys’s
job_variables
param. Example code below.
Full base job template with four new variables:
Copy code
{
  "variables": {
    "type": "object",
    "properties": {
      "env": {
        "type": "object",
        "title": "Environment Variables",
        "description": "Environment variables to set when starting a flow run.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "name": {
        "type": "string",
        "title": "Name",
        "description": "Name given to infrastructure created by a worker."
      },
      "image": {
        "type": "string",
        "title": "Image",
        "example": "<http://docker.io/prefecthq/prefect:2-latest|docker.io/prefecthq/prefect:2-latest>",
        "description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used."
      },
      "labels": {
        "type": "object",
        "title": "Labels",
        "description": "Labels applied to infrastructure created by a worker.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "command": {
        "type": "string",
        "title": "Command",
        "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker."
      },
      "cpu_limit": {
        "type": "string",
        "title": "CPU limit",
        "description": "CPU limit for pod"
      },
      "mem_limit": {
        "type": "string",
        "title": "Memory limit",
        "description": "Memory limit for pod"
      },
      "namespace": {
        "type": "string",
        "title": "Namespace",
        "default": "default",
        "description": "The Kubernetes namespace to create jobs within."
      },
      "cpu_request": {
        "type": "string",
        "title": "CPU request",
        "default": "500m",
        "description": "CPU request for pod"
      },
      "mem_request": {
        "type": "string",
        "title": "Memory request",
        "default": "128Mi",
        "description": "Memory request for pod"
      },
      "stream_output": {
        "type": "boolean",
        "title": "Stream Output",
        "default": true,
        "description": "If set, output will be streamed from the job to local standard output."
      },
      "cluster_config": {
        "allOf": [
          {
            "$ref": "#/definitions/KubernetesClusterConfig"
          }
        ],
        "title": "Cluster Config",
        "description": "The Kubernetes cluster config to use for job creation."
      },
      "finished_job_ttl": {
        "type": "integer",
        "title": "Finished Job TTL",
        "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
      },
      "image_pull_policy": {
        "enum": [
          "IfNotPresent",
          "Always",
          "Never"
        ],
        "type": "string",
        "title": "Image Pull Policy",
        "default": "IfNotPresent",
        "description": "The Kubernetes image pull policy to use for job containers."
      },
      "service_account_name": {
        "type": "string",
        "title": "Service Account Name",
        "description": "The Kubernetes service account to use for job creation."
      },
      "job_watch_timeout_seconds": {
        "type": "integer",
        "title": "Job Watch Timeout Seconds",
        "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
      },
      "pod_watch_timeout_seconds": {
        "type": "integer",
        "title": "Pod Watch Timeout Seconds",
        "default": 60,
        "description": "Number of seconds to watch for pod creation before timing out."
      }
    },
    "definitions": {
      "KubernetesClusterConfig": {
        "type": "object",
        "title": "KubernetesClusterConfig",
        "required": [
          "config",
          "context_name"
        ],
        "properties": {
          "config": {
            "type": "object",
            "title": "Config",
            "description": "The entire contents of a kubectl config file."
          },
          "context_name": {
            "type": "string",
            "title": "Context Name",
            "description": "The name of the kubectl context to use."
          }
        },
        "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
        "secret_fields": [],
        "block_type_slug": "kubernetes-cluster-config",
        "block_schema_references": {}
      }
    },
    "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
  },
  "job_configuration": {
    "env": "{{ env }}",
    "name": "{{ name }}",
    "labels": "{{ labels }}",
    "command": "{{ command }}",
    "namespace": "{{ namespace }}",
    "job_manifest": {
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "env": "{{ env }}",
                "args": "{{ command }}",
                "name": "prefect-job",
                "image": "{{ image }}",
                "resources": {
                  "limits": {
                    "cpu": "{{ cpu_limit }}",
                    "memory": "{{ mem_limit }}"
                  },
                  "requests": {
                    "cpu": "{{ cpu_request }}",
                    "memory": "{{ mem_request }}"
                  }
                },
                "imagePullPolicy": "{{ image_pull_policy }}"
              }
            ],
            "completions": 1,
            "parallelism": 1,
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          }
        },
        "backoffLimit": 0,
        "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
      },
      "metadata": {
        "labels": "{{ labels }}",
        "namespace": "{{ namespace }}",
        "generateName": "{{ name }}-"
      },
      "apiVersion": "batch/v1"
    },
    "stream_output": "{{ stream_output }}",
    "cluster_config": "{{ cluster_config }}",
    "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
    "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
  }
}
Deployment example for testing:
Copy code
from prefect import flow


@flow(log_prints=True)
def hi():
    print("Hi")


if __name__ == "__main__":
    hi.deploy(
        job_variables={"cpu_request": "200m", "mem_request": "64Mi"},
        push=False,
        image="discdiver/k8s:1",
        work_pool_name="k8s1",
        name="k8s-resources",
    )
In k9s I can see that the work pool default is used for the container, and that it can be overridden by the deployment.
m
thanks @Jeff Hale let me give this a try and get back to you!
that worked perfectly!! thanks a lot @Jeff Hale
🙌 1
any way to configure this programmatically from the CI or else via the Helm chart upon installation?
j
Hi Miguel. Great to hear! You’ve got a few options: Option 1: Use a bash command in your CI workflow to create a work pool that has a modified base job template: Command:
prefect work-pool create --type kubernetes --base-job-template path_to_template
If needed in the future, you can get the base job template for the K8s work pool and adjust it with
prefect work-pool get-default-base-job-template --type kubernetes
Option 2: use the Terraform Provider see: Use a JSON file to load a base job configuration here. I didn’t see a Helm chart option.
m
mmm we're using ArgoCD to deploy the work pool (via Helm)
Copy code
apiVersion: <http://argoproj.io/v1alpha1|argoproj.io/v1alpha1>
kind: Application
metadata:
  name: prefect-worker
  namespace: argocd
spec:
  project: default
  sources:
    # Chart from the Prefect chart repo
    - chart: prefect-worker
      repoURL: <https://prefecthq.github.io/prefect-helm>
      targetRevision: 2023.10.5
      helm:
        valueFiles:
          - $values/releases/dev/prefect/worker/values.yaml
    # Values from Git repository
    - repoURL: <https://github.com/cambium-earth/data-canopy-argocd.git>
      targetRevision: HEAD
      ref: values
  destination:
    namespace: prefect
    server: <https://kubernetes.default.svc>
  syncPolicy:
    automated:
      selfHeal: true
      prune: true
    syncOptions:
      - CreateNamespace=true
j
That looks like it's for the worker rather than the work pool. That should still work fine.
m
in the
values.yaml
there's the configuration for the work pool
Copy code
## general configuration of the worker
  config:
    # -- the work pool that your started worker will poll.
    workPool: "default-worker-pool"
    # -- one or more work queue names for the worker to pull from. if not provided, the worker will pull from all work queues in the work pool
    workQueues: []
    # -- how often the worker will query for runs
    queryInterval: 5
    # -- when querying for runs, how many seconds in the future can they be scheduled
    prefetchSeconds: 10
    # -- connect using HTTP/2 if the server supports it (experimental)
    http2: true
    # -- Maximum number of flow runs to start simultaneously (default: unlimited)
    limit: null
    ## You can set the worker type here.
    ## The default image includes only the type "kubernetes".
    ## Custom workers must be properly registered with the prefect cli.
    ## See the guide here: <https://docs.prefect.io/2.11.3/guides/deployment/developing-a-new-worker-type/>
    # -- specify the worker type
    type: kubernetes
(I have not created the work-pool manually/programmatically using the CLI or similar, it was automatically created when I deployed the chart)
j
Ah good call. And you can specify the base job template file for the worker to create the work pool. See here: https://github.com/PrefectHQ/prefect-helm/blob/main/charts/prefect-worker/values.schema.json#L161
m
ah I see also that if the work pool already exists the param is ignored
so if I'm reading this correctly, I can either delete the work pool programmatically through the CI and let ArgoCD recreate it with the updated template or just recreate it through the CLI command you shared (I think/hope ArgoCD does not mess up when this happens)
anyway, I'm not really expecting this to change too much, I was more concerned about the fact that if anything else changed in the configuration the work pool could be updated and the configuration applied through the UI lost, which is not the case if I understood correctly
j
Yeah, the work pool could be updated in the UI and the configuration for that work pool shouldn’t be lost.
squirtle cool 1
m
cool
thx!!
243 Views