Sylvain Hazard

    Sylvain Hazard

    9 months ago
    Hello ! Been trying to wrap my head around running an already existing k8s job from Prefect but I can't seem to figure it out. Here's where I am :
    from typing import Dict, List
    from prefect import Flow, Parameter, task
    import prefect
    from prefect.tasks.kubernetes.job import ListNamespacedJob, RunNamespacedJob
    from kubernetes.client import V1JobList, V1Job
    
    
    @task
    def get_job(jobs_list: V1JobList) -> Dict:
        candidates = jobs_list.items
        job = candidates[0]
        if len(candidates) > 1:
            prefect.context.get("logger").warning(
                f"Multiple candidates retrieved. Chose {graph_job.metadata.name}."
            )
        return job.to_dict()
    
    
    with Flow("Test") as flow:
        jobs = ListNamespacedJob(
            kube_kwargs={"field_selector": "metadata.name=JOB_NAME"},
            kubernetes_api_key_secret=None,
        )()
    
        job = get_job(jobs)
    
        job_result = RunNamespacedJob(
            kubernetes_api_key_secret=None,
            delete_job_after_completion=False,
        )(body=job)
    Right now, this gets a 422 error starting with "Job.batch JOB_NAME is invalid..." from the k8s API when trying to run the job. Am I just doing it wrong ?
    Anna Geller

    Anna Geller

    9 months ago
    I think I had the same issue and I ended up adding a task beforehand that deletes the job if exists before trying to create a new one. The problem is that Kubernetes can’t run two jobs with the same name in the same namespace. Alternatively, you can use
    delete_job_after_completion=True
    , or change the job name in the body to e.g. uuid
    Sylvain Hazard

    Sylvain Hazard

    9 months ago
    Just to make sure
    delete_job_after_completion=True
    is what I want to do : are logs from the created job collected by Prefect in any way ? If I wanted to check logs even if the job succeeded, would I be able to with that option ?
    Anna Geller

    Anna Geller

    9 months ago
    in the same example I shared before there is an example how to get logs from the pod before it gets deleted - this will display logs in the UI: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py#L110
    Sylvain Hazard

    Sylvain Hazard

    9 months ago
    Oh that's great, thanks a lot !