Sharath Chandra
03/15/2022, 3:16 PMspark-submit
using prefect’s ShellTask
.
I have created a subclass of ShellTask to invoke the `spark-submit.`The spark jobs are running on k8s.
I am facing an issue where prefect tasks are not completing and continuously running. I see this happening on the tasks which are slightly long running (> 10 mins).
The master flow maps over list and orchestrates the prefect ShellTask
e.g.
K8sSparkSubmitTask.map(
id = ["id1", "id2", "idx"]
)
The task starts running for id1
and pod status is observed to be successful
. However prefect does not move to the next task. Why are these tasks not getting completed?Kevin Kho
03/15/2022, 3:17 PMSharath Chandra
03/15/2022, 3:18 PMKevin Kho
03/15/2022, 3:18 PMSharath Chandra
03/15/2022, 3:18 PMKevin Kho
03/15/2022, 3:19 PMecho
?Sharath Chandra
03/15/2022, 3:21 PMSharath Chandra
03/15/2022, 3:23 PMdavzucky
03/15/2022, 3:25 PMSharath Chandra
03/15/2022, 3:25 PMdavzucky
03/15/2022, 3:26 PMSharath Chandra
03/15/2022, 3:26 PM"<http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>": "false"
` to my prefect job templatedavzucky
03/15/2022, 3:27 PMSharath Chandra
03/15/2022, 3:28 PMsuccessful
davzucky
03/15/2022, 3:32 PMSharath Chandra
03/15/2022, 3:33 PMWhat is your task code?
ShellTask
wrapper isclass K8sSparkSubmitTask(ShellTask):
def __init__(
self,
command: str = None,
master_uri: str = None,
deploy_mode: str = None,
job_name: str = None,
conf_kwargs: dict = None,
env: dict = None,
helper_script: str = None,
shell: str = "bash",
return_all: bool = False,
log_stderr: bool = False,
**kwargs: Any,
)
super().__init__(
**kwargs,
command=command,
env=env,
helper_script=helper_script,
shell=shell,
return_all=return_all,
log_stderr=log_stderr,
)
@defaults_from_attrs("job_name", "conf_kwargs")
def run(
self,
job_name: str = None,
job_uri: str = None,
job_args: List[str] = None,
conf_kwargs: dict = None,
) -> str:
command = self._build_command(
master_uri=self.master_uri,
deploy_mode=self.deploy_mode,
job_name=job_name,
job_uri=job_uri,
job_args=job_args,
conf_kwargs=conf_kwargs,
)
print(f"The spark-submit command is {command}")
return super(K8sSparkSubmitTask, self).run(command=command)
davzucky
03/16/2022, 1:54 AMspark.kubernetes.submission.waitAppCompletion
is set to true in your spark.conf ?Sharath Chandra
03/16/2022, 5:46 AM# "spark.kubernetes.container.image" = "<image>"
# "spark.kubernetes.namespace" = "<namespace>"
# "spark.kubernetes.authenticate.driver.serviceAccountName" = "<service_principal>"
# "spark.kubernetes.driver.podTemplateFile" = "/path/to/podtemplate.yaml"
# "spark.kubernetes.executor.podTemplateFile" = "/path/to/podtemplate.yaml"
# "spark.executor.instances" = 3
# "spark.driver.cores" = "1"
# "spark.driver.memory" = "8g"
# "spark.executor.cores" = "1"
# "spark.executor.memory" = "8g"
# "spark.hadoop.fs.azure.account.auth.type" = "OAuth"
# "spark.hadoop.fs.azure.account.oauth.provider.type" = "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
# "spark.hadoop.fs.azure.account.oauth2.client.endpoint" = "<https://login.microsoftonline.com/><tenant>/oauth2/token"
# "spark.hadoop.fs.azure.account.oauth2.client.id" = "<azure_client_id>"
# "spark.hadoop.fs.azure.account.oauth2.client.secret" = "<azure_client_secret>"
spark.kubernetes.submission.waitAppCompletion
explicitly in my conf. The documentation says its `True`by default. Anyways I will add this now and check backdavzucky
03/16/2022, 5:53 AMSharath Chandra
03/16/2022, 6:51 AMdavzucky
03/16/2022, 7:02 AMSharath Chandra
03/21/2022, 6:56 AMdavzucky
03/21/2022, 8:50 AMSharath Chandra
03/21/2022, 11:44 AM