My flows use KubernetesJobEnvironment and I specif...
# prefect-community
t
My flows use KubernetesJobEnvironment and I specify a custom job_spec, but I'm noticing that I need to manually delete the k8s job between flow runs or else subsequent runs will fail to create the k8s job due to the job already existing.
For instance, i have a job that loads google analytics data to snowflake, and here's the error it throws when I don't manually delete the k8s job:
Copy code
{
  "kind": "Status",
  "apiVersion": "v1",
  "metadata": {},
  "status": "Failure",
  "message": "jobs.batch \"load-google-analytics-data-to-snowflake\" already exists",
  "reason": "AlreadyExists",
  "details": {
    "name": "load-google-analytics-data-to-snowflake",
    "group": "batch",
    "kind": "jobs"
  },
  "code": 409
}
so every time I run the job, I do this:
Copy code
kubectl --namespace=prefect delete job load-google-analytics-data-to-snowflake
I assume that once I put this job into production and have it run on a schedule, it will immediately break?
My main question is this: I must be missing something, right? How can I make the flow work without manual intervention?
Perhaps I just have to put a TTL directly in my job spec?
Copy code
spec:
  ttlSecondsAfterFinished: 60
...
n
Hi @Troy Sankey! The task library has a task that can help with this called
DeleteNamespacedJob
, and is something you can build into your flow so that it cleans up after itself with every run. You could also check that the job exists and remove it before trying to recreate. Otherwise, I think this is a problem with the KubernetesJobEnvironment that @josh is going to try to PR a fix for ASAP.
t
cool I'll take a look at
DeleteNamespacedJob
but I should clarify that this seems to happen at a layer before my first task can even run (i.e. my code to check if the job exist would be running inside the job), so I think checking for the existence of a job would not work.
n
Ah that's a good point! I'll let you know when that PR gets in which should fix this by appending a run ID to the job (so there should be no collisions)
t
thank you!
n
j
Jinx
n
😂 @josh is too fast
j
@Troy Sankey also noting here that in the meantime you could also use the kubernetes agent’s resource manager to clean up completed jobs 🙂 https://docs.prefect.io/orchestration/agents/kubernetes.html#resource-manager
t
holy crap, that was fast
thanks for the pointer! I have noticed that the "first" job that the prefect agent creates for a given flow run just lingers around, and since the job spec for those are completely hard-coded I can't add like a TTL to them.