<@ULVA73B9P> how do i create cleanup tasks that cl...
# ask-marvin
a
@Marvin how do i create cleanup tasks that cleans up the kubernetes jobs after they succeed in prefect?
m
thought for 795.1 seconds
Great question. Assuming you’re on Prefect 3.x, there are two good ways to clean up Kubernetes Jobs created by Prefect after runs finish. Option A (recommended): Let Kubernetes auto-delete finished Jobs - Prefect’s Kubernetes worker exposes the Job spec’s ttlSecondsAfterFinished as the variable finished_job_ttl. - Set this to a small number (e.g., 60 for 1 minute) or 0 for immediate cleanup after a Job finishes. - This deletes the Job and its Pods automatically without extra code. How to set it: 1) In your deployment (Python)
Copy code
from prefect import flow

@flow
def my_flow():
    ...

if __name__ == "__main__":
    my_flow.deploy(
        name="my-deploy",
        work_pool_name="k8s-pool",
        job_variables={"finished_job_ttl": 60}  # seconds; 0 means immediate
    )
2) In prefect.yaml (using prefect deploy)
Copy code
deployments:
- name: my-deploy
  entrypoint: path/to/flow.py:my_flow
  work_pool:
    name: k8s-pool
    job_variables:
      finished_job_ttl: 60
3) Confirm the field exists in the default job template You can inspect the default base job template; it includes:
Copy code
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
Run:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes
Notes: - This cleans up Jobs after any terminal outcome (success or failure). If you want to keep failed jobs for debugging, use Option B below. - Your cluster must have the Kubernetes TTL controller enabled. Most modern clusters do by default. Docs: - Kubernetes worker how-to Option B: Delete only on success using a flow “on_completion” hook If you want to delete Jobs only when the flow succeeds (and keep failures), add a hook that runs at completion and deletes the Job(s) labeled for the current flow run. Example:
Copy code
from prefect import flow, get_run_logger
from prefect.runtime import flow_run
from prefect_kubernetes.credentials import KubernetesCredentials
from <http://prefect_kubernetes.jobs|prefect_kubernetes.jobs> import list_namespaced_job, delete_namespaced_job
from kubernetes.client import V1DeleteOptions

# Runs when the flow completes in a Completed state
def cleanup_k8s_jobs_on_success(state):
    logger = get_run_logger()
    fr_id = flow_run.id
    namespace = "your-namespace"

    # Use in-cluster config by default; or configure kubeconfig in KubernetesCredentials
    creds = KubernetesCredentials()

    # Select the Job(s) for this Prefect flow run
    jobs = list_namespaced_job(
        creds,
        namespace=namespace,
        label_selector=f"<http://prefect.io/flow-run-id={fr_id}|prefect.io/flow-run-id={fr_id}>",
    )
    for job in jobs.items:
        <http://logger.info|logger.info>(f"Deleting Kubernetes Job {job.metadata.name}")
        delete_namespaced_job(
            creds,
            job_name=job.metadata.name,
            namespace=namespace,
            delete_options=V1DeleteOptions(propagation_policy="Background"),
        )

@flow(on_completion=[cleanup_k8s_jobs_on_success])
def my_flow():
    ...
Notes and gotchas: - RBAC: Your Job’s service account must be allowed to list and delete Jobs (and Pods). Grant permissions on batch/jobs and pods in the target namespace. - Labels: Prefect labels Jobs with identifiers like the flow run ID (e.g., prefect.io/flow-run-id), which lets you reliably select the right resources. - Self-deletion: Using TTL is safer than trying to delete the currently-running Job from within itself. If you do self-delete on completion, ensure your cleanup runs after the flow is done and uses background propagation. Which do you prefer? - Do you want to delete only successful runs and keep failed ones for debugging? If yes, go with Option B. - If you’re happy to delete all finished Jobs after a short window, Option A is simplest and most robust. Helpful references:
- Kubernetes worker how-to - Runtime context (accessing flow_run.id and job_variables)