Sharath Chandra
02/16/2022, 8:04 AMspark-submit
,
The issue I am facing is that in case of errors in the spark-job, the prefect task still shows it as a success.
I can see the pod status on k8s as failed with errors in the log.
How can we propagate the errors from spark jobs to prefect task?
The Task
is defined
class K8sSparkSubmitTask(ShellTask):
def __init__(
self,
command: str = None,
master_uri: str = None,
deploy_mode: str = None,
job_name: str = None,
conf_kwargs: dict = None,
env: dict = None,
helper_script: str = None,
shell: str = "bash",
return_all: bool = False,
log_stderr: bool = False,
**kwargs: Any,
)
super().__init__(
**kwargs,
command=command,
env=env,
helper_script=helper_script,
shell=shell,
return_all=return_all,
log_stderr=log_stderr,
)
@defaults_from_attrs("job_name", "conf_kwargs")
def run(
self,
job_name: str = None,
job_uri: str = None,
job_args: List[str] = None,
conf_kwargs: dict = None,
) -> str:
command = self._build_command(
master_uri=self.master_uri,
deploy_mode=self.deploy_mode,
job_name=job_name,
job_uri=job_uri,
job_args=job_args,
conf_kwargs=conf_kwargs,
)
print(f"The spark-submit command is {command}")
return super(K8sSparkSubmitTask, self).run(command=command)
The spark-job is
def main(args):
spark_session, spark_logger, config_dict = init_spark()
try:
# logic here
except Exception as err:
spark_logger.error(err)
raise
finally:
if spark_session is not None:
spark_session.stop()
davzucky
02/16/2022, 8:27 AMfrom datetime import timedelta
from typing import Optional, Dict
from mep.workflows.workflow_executor import Workflow, WorkflowExecutor
from prefect import task, Flow, Parameter
import prefect
import re
from prefect.tasks.kubernetes.job import RunNamespacedJob
from prefect.tasks.kubernetes.pod import ListNamespacedPod
def getDefaultparam():
return {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"name": "prefect-generate-spark-job-{}"},
"spec": {
"startingDeadlineSeconds": 3600,
"successfulJobsHistoryLimit": 3,
"failedJobsHistoryLimit": 3,
"concurrencyPolicy": "Forbid",
"template": {
"spec": {
"serviceAccountName": "deployer",
"containers": [
{
"name": "prefect-generate-spark-job-{}-{}",
"resources": {
"requests": {"memory": "1G", "cpu": "0.1"},
"limits": {"memory": "1G", "cpu": "0.5"},
},
"volumeMounts": [
{
"mountPath": "/secrets",
"name": "secrets-volume",
"readOnly": True,
},
{
"mountPath": "/spark-conf",
"name": "config-volume",
"readOnly": True,
},
],
"image": "spark:3.1.2-bin-hadoop2.7-00514386",
"args": [
"/bin/sh",
"-c",
# command will be added here
],
}
],
"restartPolicy": "OnFailure",
"volumes": [
{
"name": "secrets-volume",
"secret": {
"defaultMode": 420,
"secretName": "keytab",
},
},
{
"name": "config-volume",
"configMap": {"name": "spark-conf"},
},
],
}
},
},
}
@task
def generateConfig(use_case, spark_image, spark_core,
spark_mem, spark_executors, spark_partitions) -> Dict:
conf = getDefaultparam()
job_name = "prefect-generate-spark-job-{}-{}".format(
use_case, prefect.context.get("today_nodash")
)
conf["metadata"]["name"] = job_name
# Container conf
conf["spec"]["template"]["spec"]["containers"][0]["name"] = job_name
# Update Spark image
conf["spec"]["template"]["spec"]["containers"][0]["image"] = spark_image
# Set the args for the spark-submit script
args = [
"/opt/spark/bin/spark-submit",
"--deploy-mode cluster",
"--class spark-job.core",
"--properties-file /spark-conf/spark.conf",
]
jar_location = "local:///app/app.jar"
# use_case
args.append("--name prefect-spark-{}".format(use_case))
args.append("--conf spark.kubernetes.driverEnv.USE_CASE={}".format(use_case))
# cpu core
args.append("--conf spark.executor.cores={}".format(spark_core))
args.append("--conf spark.kubernetes.executor.limit.cores={}".format(spark_core))
# memory
args.append("--conf spark.executor.memory={}g".format(spark_mem))
# executors
args.append(
"--conf spark.dynamicAllocation.maxExecutors={}".format(spark_executors)
)
# partitions
args.append("--conf spark.sql.shuffle.partitions={}".format(spark_partitions))
# finalise the args
args.append(jar_location)
conf["spec"]["template"]["spec"]["containers"][0]["args"].append(" ".join(args))
return conf
@task(
task_run_name=lambda **kwargs: f"{kwargs['conf']['metadata']['name']}",
timeout=3600,
max_retries=2,
retry_delay=timedelta(seconds=60),
)
def runSparkJobTask(conf, namespace):
prefect.context.get("logger").info(f"Sending {conf}")
RunNamespacedJob(
body=conf,
namespace=namespace,
kubernetes_api_key_secret="",
kube_kwargs=None,
job_status_poll_interval=5,
log_level="debug",
delete_job_after_completion=True,
).run()
@task(max_retries=2, retry_delay=timedelta(seconds=60))
def checkJobStatus(pod_list, use_case):
latest_status = ""
latest_ts = None
pod_regex = "^prefect-spark-job-%s-[a-z0-9]{16}-driver$" % use_case
for pod in pod_list.items:
if (re.match(pod_regex, pod.metadata.name)):
prefect.context.get("logger").info(
f"==> Found completed pod with match the regex {pod.metadata.name}"
f"with status {pod.status.phase} "
f"at time {pod.status.start_time}"
)
if latest_ts is None or pod.status.start_time > latest_ts:
latest_ts = pod.status.start_time
latest_status = pod.status.phase
if latest_status == "Succeeded":
return True
raise ValueError("Job is not return as successed")
with Flow("Run_Spark") as f:
use_case_p = Parameter("use_case")
namespace_p = Parameter("namespace" )
spark_image_p = Parameter("spark_image", "spark:3.1.2-bin-hadoop2.7-00514386")
spark_core_p = Parameter("spark_core", 2)
spark_mem_p = Parameter("spark_mem", 8)
spark_executors_p = Parameter("spark_executors", 5)
spark_partitions_p = Parameter("spark_partitions", 50)
# Start the process
config = generateConfig(
use_case_p,
spark_image_p,
spark_core_p,
spark_mem_p,
spark_executors_p,
spark_partitions_p,
)
spark_task = runSparkJobTask(config, namespace_p)
# Check the task staus
pod_list_obj = ListNamespacedPod(
kube_kwargs=None,
kubernetes_api_key_secret=""
)
pod_list = pod_list_obj(
namespace=namespace_p,
upstream_tasks=[spark_task]
)
checkJobStatus(
pod_list,
use_case_p
)
Sharath Chandra
02/16/2022, 5:53 PM