Sylvain Hazard
12/22/2021, 11:05 AMfrom typing import Dict, List
from prefect import Flow, Parameter, task
import prefect
from prefect.tasks.kubernetes.job import ListNamespacedJob, RunNamespacedJob
from kubernetes.client import V1JobList, V1Job
@task
def get_job(jobs_list: V1JobList) -> Dict:
candidates = jobs_list.items
job = candidates[0]
if len(candidates) > 1:
prefect.context.get("logger").warning(
f"Multiple candidates retrieved. Chose {graph_job.metadata.name}."
)
return job.to_dict()
with Flow("Test") as flow:
jobs = ListNamespacedJob(
kube_kwargs={"field_selector": "metadata.name=JOB_NAME"},
kubernetes_api_key_secret=None,
)()
job = get_job(jobs)
job_result = RunNamespacedJob(
kubernetes_api_key_secret=None,
delete_job_after_completion=False,
)(body=job)
Right now, this gets a 422 error starting with "Job.batch JOB_NAME is invalid..." from the k8s API when trying to run the job.
Am I just doing it wrong ?Anna Geller
delete_job_after_completion=True
, or change the job name in the body to e.g. uuidSylvain Hazard
12/22/2021, 12:25 PMdelete_job_after_completion=True
is what I want to do : are logs from the created job collected by Prefect in any way ? If I wanted to check logs even if the job succeeded, would I be able to with that option ?Anna Geller
Sylvain Hazard
12/22/2021, 1:18 PM