<@ULVA73B9P> Is there a way for me to configure my...
# ask-marvin
k
@Marvin Is there a way for me to configure my kubernetes worker to delete kubernetes jobs once the flow has finished?
m
Yes, you can configure your Kubernetes worker in Prefect to delete Kubernetes jobs once the flow has finished by using state change hooks. While this specific case isn't covered in the documentation directly, you can use state change hooks to manage these clean-up tasks. Here's a link explaining how you can use state change hooks in Prefect: State Change Hooks Documentation In the case of Kubernetes, you can set the state change hook to delete the Kubernetes job when the flow run is completed or fails. Here's an example of how it can be done for Cloud Run jobs, which you can adapt for Kubernetes:
Copy code
python
import os
from prefect import flow, task
from prefect.client import get_client

async def delete_k8s_job(flow, flow_run, state):
    """Flow run state change hook that deletes a Kubernetes Job."""
    k8s_job_name = flow_run.flow_name + "-job"  # Customize this as necessary
    delete_job_command = f"kubectl delete job {k8s_job_name}"
    os.system(delete_job_command)

@flow(on_completion=[delete_k8s_job])
def my_flow():
    # Your flow tasks here
    pass

if __name__ == "__main__":
    my_flow()
### Example Adaptation for Kubernetes Jobs Here’s how you could adapt the example code to suit Kubernetes jobs:
Copy code
python
import os
from prefect import flow, task
from prefect.blocks.system import String
from prefect.client import get_client
import prefect.runtime

async def delete_k8s_job(flow, flow_run, state):
    """Flow run state change hook that deletes a Kubernetes Job."""
    k8s_job_name = "<your-job-name>"  # Define the Kubernetes job name
    delete_job_command = f"kubectl delete job {k8s_job_name}"
    os.system(delete_job_command)

@flow(on_completion=[delete_k8s_job])
def example_flow():
    @task
    def hello():
        print("Hello, world!")
    hello()

if __name__ == "__main__":
    example_flow()
This example defines a simple flow and a task within it. When the flow completes, the
delete_k8s_job
function will run and delete the Kubernetes job. Be sure to modify the job name and command parameters as necessary to fit your environment and naming conventions. For more detailed guidance, you may want to refer to the official documentation linked above. ### Links to References - State Change Hooks Documentation - Kubernetes Cluster AutoScaler and Prefect Issue It’s always a good idea to adapt the hooks based on specific scenarios and handle any edge cases unique to your workflows.
k
@Marvin is there a way to do this globally at the worker level?
m
To configure a Kubernetes worker to delete jobs globally after the flow has finished, you need to customize the worker's job template within the work pool settings in Prefect. Prefect provides a method to modify the base job template in the Kubernetes work pool to include job cleanup logic. Here’s a step-by-step guide to achieve this: ### Step-by-Step Guide 1. Navigate to the Kubernetes Work Pool Settings To begin, you'll configure your Kubernetes worker from the Prefect Cloud or Prefect UI. 2. Edit the Base Job Template In the Advanced settings of the work pool, you can edit the JSON representation of the base job template. You can add configuration for job cleanup here. Below is an example snippet to include job deletion logic:
Copy code
json
{
  "apiVersion": "batch/v1",
  "kind": "Job",
  "metadata": {
    "name": "{{ name }}"
  },
  "spec": {
    "ttlSecondsAfterFinished": 0,
    "template": {
      "spec": {
        "containers": [
          {
            "name": "{{ name }}",
            "image": "{{ image }}",
            "resources": {
              "requests": {
                "cpu": "{{ cpu_request }}"
              }
            }
          }
        ],
        "restartPolicy": "Never"
      }
    }
  }
}
ttlSecondsAfterFinished
is set to 0, which ensures that the job is deleted immediately after it has finished. Adjust the value based on your specific needs. 3. Testing and Deployment After editing the job template, create a new deployment and specify this work pool. The jobs created by this work pool will now have the
ttlSecondsAfterFinished
setting, ensuring they get deleted after completion. ### Reference Links You can read more about configuring work pools and job templates in the Prefect documentation: - Kubernetes Deployment Guide - Developing a New Worker Type These guides will provide additional context and details for setting up and managing your Kubernetes workers in Prefect. Adapting this global configuration ensures that all jobs spawned by your Kubernetes worker are automatically cleaned up, maintaining a tidy and efficient cluster environment.
t
Hi Kayvon, the pod should nolonger be running after the flow completes. Is this not what you are seeing?
k
I actually think I know what was going on. I previously did not have this set so the jobs were left forever. Testing it now.
Yep we are good now!
t
Ahhhh ok glad its working!