Hi, I am using prefect to submit spark job. The sp...
# prefect-server
s
Hi, I am using prefect to submit spark job. The spark is managed via k8s(https://spark.apache.org/docs/latest/running-on-kubernetes.html) I have created a subclass of ShellTask to invoke the
spark-submit
, The issue I am facing is that in case of errors in the spark-job, the prefect task still shows it as a success. I can see the pod status on k8s as failed with errors in the log. How can we propagate the errors from spark jobs to prefect task? The
Task
is defined
Copy code
class K8sSparkSubmitTask(ShellTask):

  def __init__(
    self,
    command: str = None,
    master_uri: str = None,
    deploy_mode: str = None,
    job_name: str = None,
    conf_kwargs: dict = None,
    env: dict = None,
    helper_script: str = None,
    shell: str = "bash",
    return_all: bool = False,
    log_stderr: bool = False,
    **kwargs: Any,
  )
    super().__init__(
         **kwargs,
         command=command,
         env=env,
         helper_script=helper_script,
         shell=shell,
         return_all=return_all,
         log_stderr=log_stderr,
     )

 @defaults_from_attrs("job_name", "conf_kwargs")
 def run(
      self,
      job_name: str = None,
      job_uri: str = None,
      job_args: List[str] = None,
      conf_kwargs: dict = None,
  ) -> str:

  command = self._build_command(
        master_uri=self.master_uri,
        deploy_mode=self.deploy_mode,
        job_name=job_name,
        job_uri=job_uri,
        job_args=job_args,
        conf_kwargs=conf_kwargs,
    )
    print(f"The spark-submit command is {command}")
    return super(K8sSparkSubmitTask, self).run(command=command)
The spark-job is
Copy code
def main(args):
  spark_session, spark_logger, config_dict = init_spark()
  try:
    # logic here
  except Exception as err:
      spark_logger.error(err)
      raise
  finally:
    if spark_session is not None:
        spark_session.stop()
d
This is a bug on spark-submit that never been fixed 😞 https://stackoverflow.com/questions/37206088/spark-submit-sh-returns-0-if-job-fails
We ran into it as well. The way we are checking the status is after the run finish we check the status of the job that has been created. if it error we fail the task. Let me look for the code later
👍 1
upvote 2
Copy code
from datetime import timedelta
from typing import Optional, Dict
from mep.workflows.workflow_executor import Workflow, WorkflowExecutor
from prefect import task, Flow, Parameter
import prefect
import re
from prefect.tasks.kubernetes.job import RunNamespacedJob
from prefect.tasks.kubernetes.pod import ListNamespacedPod


def getDefaultparam():
    return {
        "apiVersion": "batch/v1",
        "kind": "Job",
        "metadata": {"name": "prefect-generate-spark-job-{}"},
        "spec": {
            "startingDeadlineSeconds": 3600,
            "successfulJobsHistoryLimit": 3,
            "failedJobsHistoryLimit": 3,
            "concurrencyPolicy": "Forbid",
            "template": {
                "spec": {
                    "serviceAccountName": "deployer",
                    "containers": [
                        {
                            "name": "prefect-generate-spark-job-{}-{}",
                            "resources": {
                                "requests": {"memory": "1G", "cpu": "0.1"},
                                "limits": {"memory": "1G", "cpu": "0.5"},
                            },
                            "volumeMounts": [
                                {
                                    "mountPath": "/secrets",
                                    "name": "secrets-volume",
                                    "readOnly": True,
                                },
                                {
                                    "mountPath": "/spark-conf",
                                    "name": "config-volume",
                                    "readOnly": True,
                                },
                            ],
                            "image": "spark:3.1.2-bin-hadoop2.7-00514386",
                            "args": [
                                "/bin/sh",
                                "-c",
                                # command will be added here
                            ],
                        }
                    ],
                    "restartPolicy": "OnFailure",
                    "volumes": [
                        {
                            "name": "secrets-volume",
                            "secret": {
                                "defaultMode": 420,
                                "secretName": "keytab",
                            },
                        },
                        {
                            "name": "config-volume",
                            "configMap": {"name": "spark-conf"},
                        },
                    ],
                }
            },
        },
    }


@task
def generateConfig(use_case, spark_image, spark_core,
                   spark_mem, spark_executors, spark_partitions) -> Dict:

    conf = getDefaultparam()

    job_name = "prefect-generate-spark-job-{}-{}".format(
        use_case, prefect.context.get("today_nodash")
    )

    conf["metadata"]["name"] = job_name
    # Container conf
    conf["spec"]["template"]["spec"]["containers"][0]["name"] = job_name

    # Update Spark image
    conf["spec"]["template"]["spec"]["containers"][0]["image"] = spark_image

    # Set the args for the spark-submit script
    args = [
        "/opt/spark/bin/spark-submit",
        "--deploy-mode cluster",
        "--class spark-job.core",
        "--properties-file /spark-conf/spark.conf",
    ]
    jar_location = "local:///app/app.jar"

    # use_case
    args.append("--name prefect-spark-{}".format(use_case))
    args.append("--conf spark.kubernetes.driverEnv.USE_CASE={}".format(use_case))

    # cpu core
    args.append("--conf spark.executor.cores={}".format(spark_core))
    args.append("--conf spark.kubernetes.executor.limit.cores={}".format(spark_core))

    # memory
    args.append("--conf spark.executor.memory={}g".format(spark_mem))

    # executors
    args.append(
        "--conf spark.dynamicAllocation.maxExecutors={}".format(spark_executors)
    )

    # partitions
    args.append("--conf spark.sql.shuffle.partitions={}".format(spark_partitions))

    # finalise the args
    args.append(jar_location)
    conf["spec"]["template"]["spec"]["containers"][0]["args"].append(" ".join(args))

    return conf


@task(
    task_run_name=lambda **kwargs: f"{kwargs['conf']['metadata']['name']}",
    timeout=3600,
    max_retries=2,
    retry_delay=timedelta(seconds=60),
)
def runSparkJobTask(conf, namespace):

    prefect.context.get("logger").info(f"Sending {conf}")

    RunNamespacedJob(
        body=conf,
        namespace=namespace,
        kubernetes_api_key_secret="",
        kube_kwargs=None,
        job_status_poll_interval=5,
        log_level="debug",
        delete_job_after_completion=True,
    ).run()


@task(max_retries=2, retry_delay=timedelta(seconds=60))
def checkJobStatus(pod_list, use_case):
    latest_status = ""
    latest_ts = None
    pod_regex = "^prefect-spark-job-%s-[a-z0-9]{16}-driver$" % use_case
    for pod in pod_list.items:
        if (re.match(pod_regex, pod.metadata.name)):
            prefect.context.get("logger").info(
                f"==> Found completed pod with match the regex {pod.metadata.name}"
                f"with status {pod.status.phase} "
                f"at time {pod.status.start_time}"
            )
            if latest_ts is None or pod.status.start_time > latest_ts:
                latest_ts = pod.status.start_time
                latest_status = pod.status.phase
    if latest_status == "Succeeded":
        return True
    raise ValueError("Job is not return as successed")



with Flow("Run_Spark") as f:
    use_case_p = Parameter("use_case")
    namespace_p = Parameter("namespace" )
    spark_image_p = Parameter("spark_image", "spark:3.1.2-bin-hadoop2.7-00514386")
    spark_core_p = Parameter("spark_core", 2)
    spark_mem_p = Parameter("spark_mem", 8)
    spark_executors_p = Parameter("spark_executors", 5)
    spark_partitions_p = Parameter("spark_partitions", 50)

    # Start the process
    config = generateConfig(
        use_case_p,
        spark_image_p,
        spark_core_p,
        spark_mem_p,
        spark_executors_p,
        spark_partitions_p,
    )
    spark_task = runSparkJobTask(config, namespace_p)

    # Check the task staus
    pod_list_obj = ListNamespacedPod(
        kube_kwargs=None,
        kubernetes_api_key_secret=""
    )
    pod_list = pod_list_obj(
        namespace=namespace_p,
        upstream_tasks=[spark_task]
    )
    checkJobStatus(
        pod_list,
        use_case_p
    )
This is a cut of our spark job submit
hope that will help you
s
@davzucky thanks a lot. This really helps