Michael Law
02/20/2024, 10:06 AMMichael Law
02/20/2024, 10:06 AMKubernetesJob().with_options({
"metadata": {
"generateName": "some-name-",
"namespace": "spark"
},
"spec": {
"template": {
"spec": {
"serviceAccountName": "spark",
"securityContext": {
"runAsUser": 0,
"fsGroup": 0
},
"restartPolicy": "Never",
"containers": [
{
"name": "spark-container",
"image": "my-image:latest",
"securityContext": {
"runAsUser": 0
},
"volumeMounts": [
{
"name": "pvc-deployments",
"mountPath": "/mnt/deployments",
"readOnly": False
}
],
"command": ["/bin/sh", "-c"],
"args": [
"""
/opt/spark/bin/spark-submit \
--master <k8s://https>://kubernetes.default.svc \
--deploy-mode cluster \
--name some-name \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/mnt/history \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.executor.instances=1 \
--conf spark.executor.memory=4g \
--conf spark.executor.cores=1 \
--conf spark.driver.memory=2g \
--conf spark.driver.cores=1 \
--conf spark.kubernetes.container.image=my-image:latest \
--conf spark.jars.ivy=/tmp \
--conf spark.kubernetes.file.upload.path=/mnt/deployments/temp \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.kubernetes.driver.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
--conf spark.kubernetes.executor.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
--py-files /mnt/deployments/dev/some/file.py \
/mnt/deployments/dev/some/python.py --table some-name
"""
]
}
],
"volumes": [
{
"name": "pvc-deployments",
"persistentVolumeClaim": {
"claimName": "pvc-deployments"
}
}
]
}
}
}
})
Michael Law
02/20/2024, 3:20 PM@task
def create_kubernetes_job(job_name: str, table_name: str):
job_manifest = {
"metadata": {
"generateName": f"{job_name}-",
"namespace": "spark"
},
"spec": {
"template": {
"spec": {
"serviceAccountName": "spark",
"securityContext": {
"runAsUser": 0,
"fsGroup": 0
},
"restartPolicy": "Never",
"containers": [
{
"name": "spark-container",
"image": "my-image:latest",
"securityContext": {
"runAsUser": 0
},
"volumeMounts": [
{
"name": "pvc-deployments",
"mountPath": "/mnt/deployments",
"readOnly": False
}
],
"command": ["/bin/sh", "-c"],
"args": [
f"""
/opt/spark/bin/spark-submit \
--master <k8s://https>://kubernetes.default.svc \
--deploy-mode cluster \
--name {job_name} \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/mnt/history \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.executor.instances=1 \
--conf spark.executor.memory=4g \
--conf spark.executor.cores=1 \
--conf spark.driver.memory=2g \
--conf spark.driver.cores=1 \
--conf spark.kubernetes.container.image=my-image:latest \
--conf spark.jars.ivy=/tmp \
--conf spark.kubernetes.file.upload.path=/mnt/deployments/temp \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \
--conf spark.kubernetes.driver.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
--conf spark.kubernetes.executor.podTemplateFile=/mnt/deployments/dev/templates/spark-pod.yaml \
--py-files /mnt/deployments/dev/some/file.py \
/mnt/deployments/dev/some/python.py --table {table_name}
"""
]
}
],
"volumes": [
{
"name": "pvc-deployments",
"persistentVolumeClaim": {
"claimName": "pvc-deployments"
}
}
]
}
}
}
}
# Create the Kubernetes Job
api_instance = client.BatchV1Api()
job = api_instance.create_namespaced_job(
body=job_manifest,
namespace="spark"
)
job_name = job.metadata.name
while True:
job_status = api_instance.read_namespaced_job_status(
name=job_name,
namespace="spark"
)
if job_status.status.succeeded is not None or job_status.status.failed is not None:
print(f"Job {job_name} completed with status: {'Succeeded' if job_status.status.succeeded else 'Failed'}")
break
else:
print(f"Job {job_name} is still running...")
time.sleep(30)
Kevin Grismore
02/20/2024, 3:23 PMSean Caldwell
02/20/2024, 3:27 PMKevin Grismore
02/20/2024, 3:27 PMSean Caldwell
02/20/2024, 3:31 PMKevin Grismore
02/20/2024, 3:31 PMKevin Grismore
02/20/2024, 3:37 PMprefect-kubernetes
library but you might find some of the functionality there convenientSean Caldwell
02/20/2024, 3:40 PM