Is there an easy way to manage work pools as Python code for our CI/CD? I initially created my work ...
n
Is there an easy way to manage work pools as Python code for our CI/CD? I initially created my work via the
publish_as_work_pool
method. I now want to manage this work pool as code so any changes can be easily applied such as the image version etc. The docs specify work pools can be configured via the UI, CLI, REST API or Terraform. Other than using Terraform is there an easy way to have work pools defined as code so any updates will be applied?
I guess copying the base template to a json file in my git repo and then running the below is a good option:
Copy code
prefect work-pool update --base-job-template base-job-template.json my-work-pool
I would also need to run a create command if the work queue doesn't already exist This approach was mentioned here: https://prefect-community.slack.com/archives/CL09KU1K7/p1717079008117299?thread_ts=1716967402.918599&cid=CL09KU1K7
I created this function to easily create/update a work pool based on a job template
Copy code
from typing import Any, Dict

from prefect import get_client
from prefect.client.schemas.actions import WorkPoolCreate, WorkPoolUpdate
from prefect.exceptions import ObjectAlreadyExists

async def create_update_work_pool(work_pool_name: str, job_template: Dict[str, Any]) -> None:
    """Create or update a Prefect Work Pool

    Args:
        work_pool_name (str): name of the work pool to create/update
        job_template (Dict[str, Any]: work pool job template to create/update
    """
    async with get_client() as client:  # type: ignore [union-attr]
        try:
            await client.create_work_pool(
                WorkPoolCreate(
                    name=work_pool_name,
                    type="kubernetes",
                    base_job_template=job_template,
                    description="",
                )
            )
        except ObjectAlreadyExists:
            await client.update_work_pool(
                work_pool_name=work_pool_name,
                work_pool=WorkPoolUpdate(
                    base_job_template=job_template,
                    is_paused=False,
                    description="",
                    concurrency_limit=None,
                ),
            )
You can then call the function with:
Copy code
import asyncio
import json

with open("base_job_template.json", "r") as f:
    job_template = json.load(f)
asyncio.run(create_update_work_pool("test_work_pool", job_template)
FYI this now exists via
--overwrite
https://github.com/PrefectHQ/prefect/pull/14967
❤️ 1