Amir Kamal
09/02/2025, 9:15 AMMarvin
09/02/2025, 9:15 AMMarvin
09/02/2025, 9:28 AMfrom prefect import flow
@flow
def my_flow():
...
if __name__ == "__main__":
my_flow.deploy(
name="my-deploy",
work_pool_name="k8s-pool",
job_variables={"finished_job_ttl": 60} # seconds; 0 means immediate
)
2) In prefect.yaml (using prefect deploy)
deployments:
- name: my-deploy
entrypoint: path/to/flow.py:my_flow
work_pool:
name: k8s-pool
job_variables:
finished_job_ttl: 60
3) Confirm the field exists in the default job template
You can inspect the default base job template; it includes:
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
Run:
prefect work-pool get-default-base-job-template --type kubernetes
Notes:
- This cleans up Jobs after any terminal outcome (success or failure). If you want to keep failed jobs for debugging, use Option B below.
- Your cluster must have the Kubernetes TTL controller enabled. Most modern clusters do by default.
Docs:
- Kubernetes worker how-to
Option B: Delete only on success using a flow “on_completion” hook
If you want to delete Jobs only when the flow succeeds (and keep failures), add a hook that runs at completion and deletes the Job(s) labeled for the current flow run.
Example:
from prefect import flow, get_run_logger
from prefect.runtime import flow_run
from prefect_kubernetes.credentials import KubernetesCredentials
from <http://prefect_kubernetes.jobs|prefect_kubernetes.jobs> import list_namespaced_job, delete_namespaced_job
from kubernetes.client import V1DeleteOptions
# Runs when the flow completes in a Completed state
def cleanup_k8s_jobs_on_success(state):
logger = get_run_logger()
fr_id = flow_run.id
namespace = "your-namespace"
# Use in-cluster config by default; or configure kubeconfig in KubernetesCredentials
creds = KubernetesCredentials()
# Select the Job(s) for this Prefect flow run
jobs = list_namespaced_job(
creds,
namespace=namespace,
label_selector=f"<http://prefect.io/flow-run-id={fr_id}|prefect.io/flow-run-id={fr_id}>",
)
for job in jobs.items:
<http://logger.info|logger.info>(f"Deleting Kubernetes Job {job.metadata.name}")
delete_namespaced_job(
creds,
job_name=job.metadata.name,
namespace=namespace,
delete_options=V1DeleteOptions(propagation_policy="Background"),
)
@flow(on_completion=[cleanup_k8s_jobs_on_success])
def my_flow():
...
Notes and gotchas:
- RBAC: Your Job’s service account must be allowed to list and delete Jobs (and Pods). Grant permissions on batch/jobs and pods in the target namespace.
- Labels: Prefect labels Jobs with identifiers like the flow run ID (e.g., prefect.io/flow-run-id), which lets you reliably select the right resources.
- Self-deletion: Using TTL is safer than trying to delete the currently-running Job from within itself. If you do self-delete on completion, ensure your cleanup runs after the flow is done and uses background propagation.
Which do you prefer?
- Do you want to delete only successful runs and keep failed ones for debugging? If yes, go with Option B.
- If you’re happy to delete all finished Jobs after a short window, Option A is simplest and most robust.
Helpful references:Marvin
09/02/2025, 9:28 AM