On a similar note to the above question, is there ...
# ask-community
s
On a similar note to the above question, is there a good way to set what docker image a kubernetes work-pool uses? In our case we have a custom image that mirrors our code base, and will update anytime a push is made to our main branch. we want our CICD to update the image name on the work pool so we don't have to manually change it every time the image updates. I see in the API reference that there is a
Update Work Pool
PATCH function, but the docs themselves don't give much information about how to use the command or any example of it being used https://app.prefect.cloud/api/docs#tag/Work-Pools/operation/update_work_pool_api_accounts_[…]t_id__workspaces__workspace_id__work_pools__name__patch
n
hmm, instead of constantly updating the work-pool, would it be easier to have the work-pool set with the latest tag for your image? and if you need to layer on dependencies on a deployment-basis, you could do so like this (layering on stuff on top of your baseimage:latest in the Dockerfile)
s
we cannot use a
:latest
tag as our org doesn't allow for them in production. That was our initial and tested plan in lab, but will not work in production. we can't add layers either as it it a package of our own code and utility functions. When the code changes the image layer needs to be rebuilt
n
hmm then perhaps you could template in the image for workpool to use like this? and use the sdk to update the value stored in that block as necessary
s
but then wouldn't I have to redeploy all of my flows using that deployment.yaml so they can get the updated image?
n
hmm yeah I guess so - going back to your original suggestion, you could use the client method if you have a python runtime in your CI but that seems less than ideal, i can inquire about this
s
That's probably our best bet for our use case! Please let me know of you find a better way. Also can you provide any example of this being used? Like I mentioned above, I couldn't figure it out from the docs. Would I do this with the prefect CLI, or a HTTP PATCH request?
n
Copy code
In [1]: from prefect.workers.utilities import get_default_base_job_template_for_infrastructure_type

In [2]: from prefect import get_client

In [3]: from prefect.client.schemas.actions import WorkPoolUpdate

In [4]: tmpl = await get_default_base_job_template_for_infrastructure_type("kubernetes")
   ...:
   ...: tmpl["variables"]["properties"]["image"] = "foo"
   ...:
   ...: async with get_client() as client:
   ...:     await client.update_work_pool(
   ...:         work_pool_name="k8s",
   ...:         work_pool=WorkPoolUpdate(
   ...:             base_job_template=tmpl
   ...:         )
   ...:     )
   ...:
where the whole template looks like this, which you can find in the UI: work pool > edit > base job template > advanced
Copy code
{
  "variables": {
    "type": "object",
    "properties": {
      "env": {
        "type": "object",
        "title": "Environment Variables",
        "description": "Environment variables to set when starting a flow run.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "name": {
        "type": "string",
        "title": "Name",
        "description": "Name given to infrastructure created by a worker."
      },
      "image": "foo",
      "labels": {
        "type": "object",
        "title": "Labels",
        "description": "Labels applied to infrastructure created by a worker.",
        "additionalProperties": {
          "type": "string"
        }
      },
      "command": {
        "type": "string",
        "title": "Command",
        "description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker."
      },
      "namespace": {
        "type": "string",
        "title": "Namespace",
        "default": "default",
        "description": "The Kubernetes namespace to create jobs within."
      },
      "stream_output": {
        "type": "boolean",
        "title": "Stream Output",
        "default": true,
        "description": "If set, output will be streamed from the job to local standard output."
      },
      "cluster_config": {
        "allOf": [
          {
            "$ref": "#/definitions/KubernetesClusterConfig"
          }
        ],
        "title": "Cluster Config",
        "description": "The Kubernetes cluster config to use for job creation."
      },
      "finished_job_ttl": {
        "type": "integer",
        "title": "Finished Job TTL",
        "description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
      },
      "image_pull_policy": {
        "enum": [
          "IfNotPresent",
          "Always",
          "Never"
        ],
        "type": "string",
        "title": "Image Pull Policy",
        "default": "IfNotPresent",
        "description": "The Kubernetes image pull policy to use for job containers."
      },
      "service_account_name": {
        "type": "string",
        "title": "Service Account Name",
        "description": "The Kubernetes service account to use for job creation."
      },
      "job_watch_timeout_seconds": {
        "type": "integer",
        "title": "Job Watch Timeout Seconds",
        "description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
      },
      "pod_watch_timeout_seconds": {
        "type": "integer",
        "title": "Pod Watch Timeout Seconds",
        "default": 60,
        "description": "Number of seconds to watch for pod creation before timing out."
      }
    },
    "definitions": {
      "KubernetesClusterConfig": {
        "type": "object",
        "title": "KubernetesClusterConfig",
        "required": [
          "config",
          "context_name"
        ],
        "properties": {
          "config": {
            "type": "object",
            "title": "Config",
            "description": "The entire contents of a kubectl config file."
          },
          "context_name": {
            "type": "string",
            "title": "Context Name",
            "description": "The name of the kubectl context to use."
          }
        },
        "description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
        "secret_fields": [],
        "block_type_slug": "kubernetes-cluster-config",
        "block_schema_references": {}
      }
    },
    "description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
  },
  "job_configuration": {
    "env": "{{ env }}",
    "name": "{{ name }}",
    "labels": "{{ labels }}",
    "command": "{{ command }}",
    "namespace": "{{ namespace }}",
    "job_manifest": {
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "env": "{{ env }}",
                "args": "{{ command }}",
                "name": "prefect-job",
                "image": "{{ image }}",
                "imagePullPolicy": "{{ image_pull_policy }}"
              }
            ],
            "completions": 1,
            "parallelism": 1,
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          }
        },
        "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
      },
      "metadata": {
        "labels": "{{ labels }}",
        "namespace": "{{ namespace }}",
        "generateName": "{{ name }}-"
      },
      "apiVersion": "batch/v1"
    },
    "stream_output": "{{ stream_output }}",
    "cluster_config": "{{ cluster_config }}",
    "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
    "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
  }
}
so you could hardcode as much of this template as you want and set it via the client method above
🙌 1
s
@Nate Trying to look into this now and come up with a script to do our update. My prefect package isn't able to find either of the following:
from prefect.workers.utilities import get_default_base_job_template_for_infrastructure_type
--
prefect.workers
only has
base
and
process
subfolders
from prefect.client.schemas.actions import WorkPoolUpdate
--
prefect.client.schemas
doesn't have any sub packages
n
what version of prefect are you working with?
s
I currently have
Prefect 2.10.13
installed via pip. Is there a difference package I should Install
sorry, 2.10.13
n
hmm is it just your IDE that is using a different interpreter? we moved the schemas into the client a couple releases ago and that utility should also be there
s
hmm it might be. odd though, I'm a virtual environment to manage python versions. I have prefect 2.10.13 installed on both python
3.8.16
and
3.8.12
3.8.12
recognizes the packages but
3.8.16
doesn't . Interesting
scratch that, the 3.8.16 version WAS behind on a second look. Should be all good to go now, Thanks!
n
👍 👍