Hello ! Been trying to wrap my head around running...
# prefect-server
s
Hello ! Been trying to wrap my head around running an already existing k8s job from Prefect but I can't seem to figure it out. Here's where I am :
Copy code
from typing import Dict, List
from prefect import Flow, Parameter, task
import prefect
from prefect.tasks.kubernetes.job import ListNamespacedJob, RunNamespacedJob
from kubernetes.client import V1JobList, V1Job


@task
def get_job(jobs_list: V1JobList) -> Dict:
    candidates = jobs_list.items
    job = candidates[0]
    if len(candidates) > 1:
        prefect.context.get("logger").warning(
            f"Multiple candidates retrieved. Chose {graph_job.metadata.name}."
        )
    return job.to_dict()


with Flow("Test") as flow:
    jobs = ListNamespacedJob(
        kube_kwargs={"field_selector": "metadata.name=JOB_NAME"},
        kubernetes_api_key_secret=None,
    )()

    job = get_job(jobs)

    job_result = RunNamespacedJob(
        kubernetes_api_key_secret=None,
        delete_job_after_completion=False,
    )(body=job)
Right now, this gets a 422 error starting with "Job.batch JOB_NAME is invalid..." from the k8s API when trying to run the job. Am I just doing it wrong ?
a
I think I had the same issue and I ended up adding a task beforehand that deletes the job if exists before trying to create a new one. The problem is that Kubernetes can’t run two jobs with the same name in the same namespace. Alternatively, you can use
delete_job_after_completion=True
, or change the job name in the body to e.g. uuid
s
Just to make sure
delete_job_after_completion=True
is what I want to do : are logs from the created job collected by Prefect in any way ? If I wanted to check logs even if the job succeeded, would I be able to with that option ?
a
in the same example I shared before there is an example how to get logs from the pod before it gets deleted - this will display logs in the UI: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py#L110
s
Oh that's great, thanks a lot !