https://prefect.io logo
Title
s

Sharath Chandra

03/15/2022, 3:16 PM
Hi I am using prefect to orchestrate my spark jobs. The spark jobs are submitted with
spark-submit
using prefect’s
ShellTask
. I have created a subclass of ShellTask to invoke the `spark-submit.`The spark jobs are running on k8s. I am facing an issue where prefect tasks are not completing and continuously running. I see this happening on the tasks which are slightly long running (> 10 mins). The master flow maps over list and orchestrates the prefect
ShellTask
e.g.
K8sSparkSubmitTask.map(
  id = ["id1", "id2", "idx"]
)
The task starts running for
id1
and pod status is observed to be
successful
. However prefect does not move to the next task. Why are these tasks not getting completed?
k

Kevin Kho

03/15/2022, 3:17 PM
Does id1 end as successful? Or it’s state does not update?
s

Sharath Chandra

03/15/2022, 3:18 PM
The prefect state does not update
k

Kevin Kho

03/15/2022, 3:18 PM
Are you on a LocalDaskExecutor?
s

Sharath Chandra

03/15/2022, 3:18 PM
no prefect is hosted on k8s and spark also on k8s
k

Kevin Kho

03/15/2022, 3:19 PM
That should be different from the Flow executor. Are you just using the default executor?
This is Prefect server right? I am wondering if you are not pointed to the API properly. Do you know if this works for simpler tasks? Like a ShellTask that just uses
echo
?
s

Sharath Chandra

03/15/2022, 3:21 PM
Yes simple tasks are working
Even the above ones sometimes works without any issue, The issue is inconsistent
Can you look at this thread? Does it help you?
s

Sharath Chandra

03/15/2022, 3:23 PM
Hi, I had myself raised the issue. That was more to get the status and that issue is fixed
as you can see from above image other spark tasks are executing whereas the mapped tasks are stuck
d

davzucky

03/15/2022, 3:25 PM
Are you using a map task to create multiple spark job?
s

Sharath Chandra

03/15/2022, 3:25 PM
yes, I am using map to create multiple spark tasks
d

davzucky

03/15/2022, 3:26 PM
Sorry missed it was you add well on the other thread
s

Sharath Chandra

03/15/2022, 3:26 PM
I have added `
"<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>": "false"
` to my prefect job template
d

davzucky

03/15/2022, 3:27 PM
Do you have enough resource to create so the workers? Because kubernetes will only create pod if you heard resource and the rest well queue
s

Sharath Chandra

03/15/2022, 3:28 PM
hmm, I can check that, but still does not explain why it does not map to next task in the map
and keeps the existing task as running even when pod state is
successful
d

davzucky

03/15/2022, 3:32 PM
Did you check as well as PodDisruptionBudget to ensure that pod are nicely removed when cluster is scaling down?
What is your task code?
s

Sharath Chandra

03/15/2022, 3:33 PM
I will check that, I dont have access to cluster now
What is your task code?
u mean logic inside spark job ?
the spark jobs does some mathematical computation and appends data to postgres via jdbc
the postgres data is also updated as part of the first mapped job
The
ShellTask
wrapper is
class K8sSparkSubmitTask(ShellTask):

  def __init__(
    self,
    command: str = None,
    master_uri: str = None,
    deploy_mode: str = None,
    job_name: str = None,
    conf_kwargs: dict = None,
    env: dict = None,
    helper_script: str = None,
    shell: str = "bash",
    return_all: bool = False,
    log_stderr: bool = False,
    **kwargs: Any,
  )
    super().__init__(
         **kwargs,
         command=command,
         env=env,
         helper_script=helper_script,
         shell=shell,
         return_all=return_all,
         log_stderr=log_stderr,
     )

 @defaults_from_attrs("job_name", "conf_kwargs")
 def run(
      self,
      job_name: str = None,
      job_uri: str = None,
      job_args: List[str] = None,
      conf_kwargs: dict = None,
  ) -> str:

  command = self._build_command(
        master_uri=self.master_uri,
        deploy_mode=self.deploy_mode,
        job_name=job_name,
        job_uri=job_uri,
        job_args=job_args,
        conf_kwargs=conf_kwargs,
    )
    print(f"The spark-submit command is {command}")
    return super(K8sSparkSubmitTask, self).run(command=command)
d

davzucky

03/16/2022, 1:54 AM
Look simple. Can you show me as well your spark.conf?
Can you ensure you have
spark.kubernetes.submission.waitAppCompletion
is set to true in your spark.conf ?
s

Sharath Chandra

03/16/2022, 5:46 AM
# "spark.kubernetes.container.image" = "<image>"
# "spark.kubernetes.namespace" = "<namespace>"
# "spark.kubernetes.authenticate.driver.serviceAccountName" = "<service_principal>"
# "spark.kubernetes.driver.podTemplateFile" = "/path/to/podtemplate.yaml"
# "spark.kubernetes.executor.podTemplateFile" = "/path/to/podtemplate.yaml"
# "spark.executor.instances" = 3
# "spark.driver.cores" = "1"
# "spark.driver.memory" = "8g"
# "spark.executor.cores" = "1"
# "spark.executor.memory" = "8g"
# "spark.hadoop.fs.azure.account.auth.type" = "OAuth"
# "spark.hadoop.fs.azure.account.oauth.provider.type" = "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
# "spark.hadoop.fs.azure.account.oauth2.client.endpoint" = "<https://login.microsoftonline.com/><tenant>/oauth2/token"
# "spark.hadoop.fs.azure.account.oauth2.client.id" = "<azure_client_id>"
# "spark.hadoop.fs.azure.account.oauth2.client.secret" = "<azure_client_secret>"
I dont have the key
spark.kubernetes.submission.waitAppCompletion
explicitly in my conf. The documentation says its `True`by default. Anyways I will add this now and check back
The issue I am having is that the behaviour is inconsistent. Yesterday’s run for the same job succeeded
d

davzucky

03/16/2022, 5:53 AM
Yes nothing obvious here. Are you running a Prometheus on your cluster that can help you monitor what is happening?
s

Sharath Chandra

03/16/2022, 6:51 AM
no I need to setup monitoring yet
d

davzucky

03/16/2022, 7:02 AM
Yes. That will help to see what is happening with your Pods. We have some resources problem that the metrics helped us identify
s

Sharath Chandra

03/21/2022, 6:56 AM
One observation - I broke the flow in to 2 smaller flows each currently running under less than 10 minutes. In case where spark job completes in less than 10 mins, I don’t see the issue.
d

davzucky

03/21/2022, 8:50 AM
Did you long in your log if your see any error with liveness?
s

Sharath Chandra

03/21/2022, 11:44 AM
In case where spark jobs was executing for more than 15 minutes, the spark was logging and exited by closing the context