Dileep Damodaran
02/08/2023, 7:50 AMFAIL
signal?
class K8sSparkSubmitTask(ShellTask):
def __init__(
self,
command: str = None,
max_retries: int = 10,
retry_delay_seconds: int = 3,
master_uri: str = None,
deploy_mode: str = None,
job_name: str = None,
conf_kwargs: dict = None,
env: dict = None,
helper_script: str = None,
shell: str = "bash",
return_all: bool = False,
log_stderr: bool = False,
**kwargs: Any,
):
self.command = command
self.master_uri = master_uri
self.deploy_mode = deploy_mode
self.job_name = job_name
self.conf_kwargs = conf_kwargs or {}
super().__init__(
**kwargs,
command=command,
max_retries=max_retries,
retry_delay=timedelta(seconds=retry_delay_seconds),
env=env,
helper_script=helper_script,
shell=shell,
return_all=return_all,
log_stderr=log_stderr,
)
def run(
self,
run_id: str = None,
) -> str:
if some_condition:
# TODO : Update the value of run_id so that
# new value of run_id will be reflected from next retry onwards
raise FAIL("Retrying with an updated value of run_id")
else:
# Do something
pass
with Flow("Test") as f:
k = K8sSparkSubmitTask()
k.map(run_id=unmapped("123"))
f.run()