https://prefect.io logo
Title
d

Dileep Damodaran

02/08/2023, 7:50 AM
Hi, I want to retry a task with updated arguments. The value of these arguments will be dynamically generated inside the task during the run. Is there a way to pass these arguments to the next retry by raising a
FAIL
signal?
class K8sSparkSubmitTask(ShellTask):
    def __init__(
        self,
        command: str = None,
        max_retries: int = 10,
        retry_delay_seconds: int = 3,
        master_uri: str = None,
        deploy_mode: str = None,
        job_name: str = None,
        conf_kwargs: dict = None,
        env: dict = None,
        helper_script: str = None,
        shell: str = "bash",
        return_all: bool = False,
        log_stderr: bool = False,
        **kwargs: Any,
    ):
        self.command = command
        self.master_uri = master_uri
        self.deploy_mode = deploy_mode
        self.job_name = job_name

        self.conf_kwargs = conf_kwargs or {}

        super().__init__(
            **kwargs,
            command=command,
            max_retries=max_retries,
            retry_delay=timedelta(seconds=retry_delay_seconds),
            env=env,
            helper_script=helper_script,
            shell=shell,
            return_all=return_all,
            log_stderr=log_stderr,
        )

    def run(
        self,
        run_id: str = None,
    ) -> str:

        if some_condition:
            # TODO : Update the value of run_id so that
            # new value of run_id will be reflected from next retry onwards
            raise FAIL("Retrying with an updated value of run_id")
        else:
            # Do something
            pass


with Flow("Test") as f:
    k = K8sSparkSubmitTask()
    k.map(run_id=unmapped("123"))


f.run()
Using prefect 1.4