Marko Jamedzija
06/18/2021, 3:40 PMRunNamespacedJob
task to run k8s jobs. When they fail they don’t get deleted, which is fine and described in the docstring If job is in the failed status, resources will not be removed from the cluster so that user can check the logs on the cluster.
. The problem appears the next time I run a job because it complains that the job with the same name already exists and I don’t want to delete the job manually every time it happens. What would be the ideal solution to go around this? Thanks!Marko Jamedzija
06/18/2021, 3:41 PMMarko Jamedzija
06/18/2021, 3:44 PMMariia Kerimova
06/18/2021, 4:07 PMMarko Jamedzija
06/18/2021, 4:10 PMJoël Luijmes
06/21/2021, 6:36 AMdef cleanup_on_failure(sync_job: Task, old_state: State, new_state: State) -> State:
"""
On job failure, cleans up the old job. If the job will be retried, this is necessary because
otherwise a duplicate job error is raised. If the job is ran successfully, it is automatically
cleaned up.
"""
# Only cleanup if it was running, and thus if the job was actually created. This state handler
# is also triggered if the CloudSQL Proxy fails for instance (old_state=Pending).
if isinstance(old_state, state.Running) and isinstance(new_state, state.Failed):
job_name, _ = generate_job_name(sync_job.spec)
# TODO: get kubernetes logs on failure before cleaning up the job
try:
DeleteNamespacedJob(
job_name=job_name,
namespace="prefect",
kubernetes_api_key_secret=None,
).run(delete_option_kwargs=dict(propagation_policy="Foreground"))
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Cleaned up old job: {job_name}")
# Build may faile with kubernetes.client.ApiException
except Exception as exception:
prefect.context.logger.warn(
f"Failed to remove old job, might be deleted already {job_name}", exc_info=exception
)
return new_state
_generate_job_name creates a deterministic job name for given task_Marko Jamedzija
06/22/2021, 12:27 PM