Hello :slightly_smiling_face: I’ve been searching ...
# ask-community
m
Hello 🙂 I’ve been searching through these channels but couldn’t find the answer to this question. I’m using
RunNamespacedJob
task to run k8s jobs. When they fail they don’t get deleted, which is fine and described in the docstring
If job is in the failed status, resources will not be removed from the cluster so that user can check the logs on the cluster.
. The problem appears the next time I run a job because it complains that the job with the same name already exists and I don’t want to delete the job manually every time it happens. What would be the ideal solution to go around this? Thanks!
Ideally I would like the next run to delete the job with the same name if it exist, but not having to create extra task to do it.
Alternatively I thought about adding a random salt to the job name so that the new run doesn’t collide w/ the previous one and having a cleanup flow that just removes all failed jobs in a defined schedule e.g.
m
Hello Marko! The second option seems reasonable for now. Some users asked for opposite feature, where jobs are not deleted even if job was created/ran successfully, but I see a reason for deleting failed jobs in the situation you've described. I would encourage to open the issue in Prefect repo.
m
Thanks for a quick reply Mariia 🙂 I was hoping that there might have been some configuration I could use, but this is good as well 🙂 🙂
j
Hi Marko, I was also running into this. In my case I used a state_handler to delete the job on failure
Copy code
def cleanup_on_failure(sync_job: Task, old_state: State, new_state: State) -> State:
    """
    On job failure, cleans up the old job. If the job will be retried, this is necessary because
    otherwise a duplicate job error is raised. If the job is ran successfully, it is automatically
    cleaned up.
    """
    # Only cleanup if it was running, and thus if the job was actually created. This state handler
    # is also triggered if the CloudSQL Proxy fails for instance (old_state=Pending).
    if isinstance(old_state, state.Running) and isinstance(new_state, state.Failed):
        job_name, _ = generate_job_name(sync_job.spec)
        # TODO: get kubernetes logs on failure before cleaning up the job
        try:
            DeleteNamespacedJob(
                job_name=job_name,
                namespace="prefect",
                kubernetes_api_key_secret=None,
            ).run(delete_option_kwargs=dict(propagation_policy="Foreground"))

            <http://prefect.context.logger.info|prefect.context.logger.info>(f"Cleaned up old job: {job_name}")
        # Build may faile with kubernetes.client.ApiException
        except Exception as exception:
            prefect.context.logger.warn(
                f"Failed to remove old job, might be deleted already {job_name}", exc_info=exception
            )

    return new_state
_generate_job_name creates a deterministic job name for given task_
m
Cool, thanks @Joël Luijmes! I’ll check out this solution as well 🙂