Hey folks, I may be going mental and apologies if ...
# ask-community
m
Hey folks, I may be going mental and apologies if I am, but I'm trying to find a way to define my KubernetesJob in python code an not in a YAML file, is this possible? Basically I don't fancy having to manage my YAML files as well as my flow code? I have put the code in the thread to unblock this feed.
Copy code
KubernetesJob().with_options({
        "metadata": {
            "generateName": "some-name-",
            "namespace": "spark"
        },
        "spec": {
            "template": {
                "spec": {
                    "serviceAccountName": "spark",
                    "securityContext": {
                        "runAsUser": 0,
                        "fsGroup": 0
                    },
                    "restartPolicy": "Never",
                    "containers": [
                        {
                            "name": "spark-container",
                            "image": "my-image:latest",
                            "securityContext": {
                                "runAsUser": 0
                            },
                            "volumeMounts": [
                                {
                                    "name": "pvc-deployments",
                                    "mountPath": "/mnt/deployments",
                                    "readOnly": False
                                }
                            ],
                            "command": ["/bin/sh", "-c"],
                            "args": [
                                """
                                /opt/spark/bin/spark-submit \
                                --master <k8s://https>://kubernetes.default.svc \
                                --deploy-mode cluster \
                                --name some-name \
                                --conf spark.eventLog.enabled=true \
                                --conf spark.eventLog.dir=/mnt/history \
                                --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
                                --conf spark.kubernetes.namespace=spark \
                                --conf spark.executor.instances=1 \
                                --conf spark.executor.memory=4g \
                                --conf spark.executor.cores=1 \
                                --conf spark.driver.memory=2g \
                                --conf spark.driver.cores=1 \
                                --conf spark.kubernetes.container.image=my-image:latest \
                                --conf spark.jars.ivy=/tmp \
                                --conf spark.kubernetes.file.upload.path=/mnt/deployments/temp \
                                --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
                                --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
                                --conf spark.kubernetes.driver.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
                                --conf spark.kubernetes.executor.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
                                --py-files /mnt/deployments/dev/some/file.py \
                                /mnt/deployments/dev/some/python.py --table some-name
                                """
                            ]
                        }
                    ],
                    "volumes": [
                        {
                            "name": "pvc-deployments",
                            "persistentVolumeClaim": {
                                "claimName": "pvc-deployments"
                            }
                        }
                    ]
                }
            }
        }
    })
Think i got there:
Copy code
@task
def create_kubernetes_job(job_name: str, table_name: str):
    job_manifest = {
        "metadata": {
            "generateName": f"{job_name}-",
            "namespace": "spark"
        },
        "spec": {
            "template": {
                "spec": {
                    "serviceAccountName": "spark",
                    "securityContext": {
                        "runAsUser": 0,
                        "fsGroup": 0
                    },
                    "restartPolicy": "Never",
                    "containers": [
                        {
                            "name": "spark-container",
                            "image": "my-image:latest",
                            "securityContext": {
                                "runAsUser": 0
                            },
                            "volumeMounts": [
                                {
                                    "name": "pvc-deployments",
                                    "mountPath": "/mnt/deployments",
                                    "readOnly": False
                                }
                            ],
                            "command": ["/bin/sh", "-c"],
                            "args": [
                                f"""
                                /opt/spark/bin/spark-submit \
                                --master <k8s://https>://kubernetes.default.svc \
                                --deploy-mode cluster \
                                --name {job_name} \
                                --conf spark.eventLog.enabled=true \
                                --conf spark.eventLog.dir=/mnt/history \
                                --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
                                --conf spark.kubernetes.namespace=spark \
                                --conf spark.executor.instances=1 \
                                --conf spark.executor.memory=4g \
                                --conf spark.executor.cores=1 \
                                --conf spark.driver.memory=2g \
                                --conf spark.driver.cores=1 \
                                --conf spark.kubernetes.container.image=my-image:latest \
                                --conf spark.jars.ivy=/tmp \
                                --conf spark.kubernetes.file.upload.path=/mnt/deployments/temp \
                                --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
                                --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
                                --conf spark.kubernetes.driver.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
                                --conf spark.kubernetes.executor.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
                                --py-files /mnt/deployments/dev/some/file.py \
                                /mnt/deployments/dev/some/python.py --table {table_name}
                                """
                            ]
                        }
                    ],
                    "volumes": [
                        {
                            "name": "pvc-deployments",
                            "persistentVolumeClaim": {
                                "claimName": "pvc-deployments"
                            }
                        }
                    ]
                }
            }
        }
    }

    # Create the Kubernetes Job
    api_instance = client.BatchV1Api()
    job = api_instance.create_namespaced_job(
        body=job_manifest,
        namespace="spark"
    )

    job_name = job.metadata.name

    while True:
        job_status = api_instance.read_namespaced_job_status(
            name=job_name,
            namespace="spark"
        )

        if job_status.status.succeeded is not None or job_status.status.failed is not None:
            print(f"Job {job_name} completed with status: {'Succeeded' if job_status.status.succeeded else 'Failed'}")
            break
        else:
            print(f"Job {job_name} is still running...")
            time.sleep(30)
k
Is this with Prefect 1 or 2?
s
2
k
I guess either way it doesn't matter that much, I'm just curious why you're starting a k8s job from within a flow rather than having the flow itself be the k8s job for the sake of observability. unless the job you want to run isn't written in python
s
Our job is a Python/spark job. So we want to use Prefect to orchestrate and monitor our runs of the job. We have already created the job/env separately from within a prefect flow. So we now just want to orchestrate it and multiple others.
k
gotcha
not sure if you're using our
prefect-kubernetes
library but you might find some of the functionality there convenient
s
Yeah we looked at that - this is where we seen the YAML stuff am sure