Hi I am using prefect to orchestrate my spark jobs...
# prefect-server
s
Hi I am using prefect to orchestrate my spark jobs. The spark jobs are submitted with
spark-submit
using prefect’s
ShellTask
. I have created a subclass of ShellTask to invoke the `spark-submit.`The spark jobs are running on k8s. I am facing an issue where prefect tasks are not completing and continuously running. I see this happening on the tasks which are slightly long running (> 10 mins). The master flow maps over list and orchestrates the prefect
ShellTask
e.g.
Copy code
K8sSparkSubmitTask.map(
  id = ["id1", "id2", "idx"]
)
The task starts running for
id1
and pod status is observed to be
successful
. However prefect does not move to the next task. Why are these tasks not getting completed?
k
Does id1 end as successful? Or it’s state does not update?
s
The prefect state does not update
k
Are you on a LocalDaskExecutor?
s
no prefect is hosted on k8s and spark also on k8s
k
That should be different from the Flow executor. Are you just using the default executor?
This is Prefect server right? I am wondering if you are not pointed to the API properly. Do you know if this works for simpler tasks? Like a ShellTask that just uses
echo
?
s
Yes simple tasks are working
Even the above ones sometimes works without any issue, The issue is inconsistent
Can you look at this thread? Does it help you?
s
Hi, I had myself raised the issue. That was more to get the status and that issue is fixed
as you can see from above image other spark tasks are executing whereas the mapped tasks are stuck
d
Are you using a map task to create multiple spark job?
s
yes, I am using map to create multiple spark tasks
d
Sorry missed it was you add well on the other thread
s
I have added `
Copy code
"<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>": "false"
` to my prefect job template
d
Do you have enough resource to create so the workers? Because kubernetes will only create pod if you heard resource and the rest well queue
s
hmm, I can check that, but still does not explain why it does not map to next task in the map
and keeps the existing task as running even when pod state is
successful
d
Did you check as well as PodDisruptionBudget to ensure that pod are nicely removed when cluster is scaling down?
What is your task code?
s
I will check that, I dont have access to cluster now
Copy code
What is your task code?
u mean logic inside spark job ?
the spark jobs does some mathematical computation and appends data to postgres via jdbc
the postgres data is also updated as part of the first mapped job
The
ShellTask
wrapper is
Copy code
class K8sSparkSubmitTask(ShellTask):

  def __init__(
    self,
    command: str = None,
    master_uri: str = None,
    deploy_mode: str = None,
    job_name: str = None,
    conf_kwargs: dict = None,
    env: dict = None,
    helper_script: str = None,
    shell: str = "bash",
    return_all: bool = False,
    log_stderr: bool = False,
    **kwargs: Any,
  )
    super().__init__(
         **kwargs,
         command=command,
         env=env,
         helper_script=helper_script,
         shell=shell,
         return_all=return_all,
         log_stderr=log_stderr,
     )

 @defaults_from_attrs("job_name", "conf_kwargs")
 def run(
      self,
      job_name: str = None,
      job_uri: str = None,
      job_args: List[str] = None,
      conf_kwargs: dict = None,
  ) -> str:

  command = self._build_command(
        master_uri=self.master_uri,
        deploy_mode=self.deploy_mode,
        job_name=job_name,
        job_uri=job_uri,
        job_args=job_args,
        conf_kwargs=conf_kwargs,
    )
    print(f"The spark-submit command is {command}")
    return super(K8sSparkSubmitTask, self).run(command=command)
d
Look simple. Can you show me as well your spark.conf?
Can you ensure you have
Copy code
spark.kubernetes.submission.waitAppCompletion
is set to true in your spark.conf ?
s
Copy code
# "spark.kubernetes.container.image" = "<image>"
# "spark.kubernetes.namespace" = "<namespace>"
# "spark.kubernetes.authenticate.driver.serviceAccountName" = "<service_principal>"
# "spark.kubernetes.driver.podTemplateFile" = "/path/to/podtemplate.yaml"
# "spark.kubernetes.executor.podTemplateFile" = "/path/to/podtemplate.yaml"
# "spark.executor.instances" = 3
# "spark.driver.cores" = "1"
# "spark.driver.memory" = "8g"
# "spark.executor.cores" = "1"
# "spark.executor.memory" = "8g"
# "spark.hadoop.fs.azure.account.auth.type" = "OAuth"
# "spark.hadoop.fs.azure.account.oauth.provider.type" = "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
# "spark.hadoop.fs.azure.account.oauth2.client.endpoint" = "<https://login.microsoftonline.com/><tenant>/oauth2/token"
# "spark.hadoop.fs.azure.account.oauth2.client.id" = "<azure_client_id>"
# "spark.hadoop.fs.azure.account.oauth2.client.secret" = "<azure_client_secret>"
I dont have the key
spark.kubernetes.submission.waitAppCompletion
explicitly in my conf. The documentation says its `True`by default. Anyways I will add this now and check back
The issue I am having is that the behaviour is inconsistent. Yesterday’s run for the same job succeeded
d
Yes nothing obvious here. Are you running a Prometheus on your cluster that can help you monitor what is happening?
s
no I need to setup monitoring yet
d
Yes. That will help to see what is happening with your Pods. We have some resources problem that the metrics helped us identify
s
One observation - I broke the flow in to 2 smaller flows each currently running under less than 10 minutes. In case where spark job completes in less than 10 mins, I don’t see the issue.
d
Did you long in your log if your see any error with liveness?
s
In case where spark jobs was executing for more than 15 minutes, the spark was logging and exited by closing the context