Hi :slightly_smiling_face: im trying to trigger k8...
# ask-community
r
Hi 🙂 im trying to trigger k8s job using prefect flow, i have self hosted prefect with kubernetes worker pool i want the flow to use in-cluster kubernetes client when im trying to use run_namespaced_job flow i get this error:
Copy code
aiohttp.client_exceptions.InvalidUrlClientError: /apis/batch/v1/namespaces/prefect-worker/jobs
that means it not using in-cluster config when i try to run the job with my kubernetes client its working fine any help appreciated thanks
Copy code
from kubernetes import config
from kubernetes.client import V1Job, V1JobSpec, V1ObjectMeta, V1PodTemplateSpec, V1PodSpec, V1Container, BatchV1Api
from prefect import task, flow
from prefect_kubernetes import KubernetesCredentials, run_namespaced_job
from <http://prefect_kubernetes.jobs|prefect_kubernetes.jobs> import KubernetesJob


# Task to create and run a Kubernetes job
@task
def generate_job():
    job_metadata = V1ObjectMeta(
        generate_name="hello-job-",
    )

    # Define the job spec
    job_spec = V1JobSpec(
        template=V1PodTemplateSpec(
            spec=V1PodSpec(
                containers=[
                    V1Container(
                        name="hello",
                        image="busybox",
                        command=["echo", "Hello, Kubernetes!"]
                    )
                ],
                restart_policy="Never"
            )
        ),
        backoff_limit=4  # Retry limit if the job fails
    )

    # Create the job definition
    job = V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=job_metadata,
        spec=job_spec
    )

    return job


@task()
def run_job(job: V1Job, namespace: str):
    config.load_incluster_config()
    batch_v1 = BatchV1Api()
    # Create the job in the Kubernetes cluster
    response = batch_v1.create_namespaced_job(
        body=job,
        namespace=namespace
    )
    return response


@flow
def k8s_job_flow():
    job_obj = generate_job()
    sub_job = run_job(job_obj, "prefect-worker")
    run_namespaced_job(
        kubernetes_job=KubernetesJob(
            credentials=KubernetesCredentials(),
            v1_job=job_obj.to_dict(),
            namespace="prefect-worker"
        )
    )


if __name__ == "__main__":
    k8s_job_flow.serve()