Thread
#prefect-community
    j

    Jie Lou

    2 years ago
    Hi All. I have a question about retries on mapped tasks. A little background: we use prefect cloud with Dask workers to run multiple tasks in parallel. The chances are some of the workers could die when the flow is running, and we hope retries can help recover the task run. Here is a simple flow with a cal function:
    @task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
    def cal(x):
        print("starting sleep {} seconds".format(x))
        time.sleep(x)
        return x
    
    with Flow("test", result_handler=s3_result_handler) as flow:
        time = Parameter("time", default=[60])
        results = cal.map(time)
    When the cal task was running, I manually killed the worker, and kept observing the log in Cloud UI, but found this
    starting sleep 60 seconds .    #then I killed the worker...
    Task 'cal[0]': Starting task run...
    Task 'cal[0]': task is already running.
    Task 'cal[0]': finished task run for task with final state: 'Running'
    1 mapped tasks submitted for execution.
    Task 'cal': Handling state change from Mapped to Mapped
    Task 'cal': task has been mapped; ending run.
    Task 'cal': finished task run for task with final state: 'Mapped'
    Flow run RUNNING: terminal tasks are incomplete.
    Marked "Failed" by a Zombie Killer process.
    It seems that the state of task does not change from running after I killed the worker. And that’s why the flow was finally tagged as zombie. However, if cal task is a regular task without mapping, the retries mechanism worked as expected. I just tweaked the flow above a little bit:
    @task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
    def cal(x):
        print("starting sleep {} seconds".format(x[0]))
        time.sleep(x[0])
        return x[0]
    
    with Flow("test", result_handler=s3_result_handler) as flow:
        time = Parameter("time", default=[60])
        results = cal(time)
    Still, when the task was running, I killed the worker. And this time retries worked. Here are the logs:
    starting sleep 60 seconds #then I killed the worker...
    Task 'time': Starting task run... 
    Task 'time': Handling state change from Pending to Running
    Task 'time': Calling task.run() method...
    Task 'time': Handling state change from Running to Success
    Task 'time': finished task run for task with final state: 'Success'
    Task 'cal': Starting task run...
    Task 'cal': Handling state change from Pending to Running
    Task 'cal': Calling task.run() method...
    starting sleep 60 seconds
    and then the flow finished successfully. I would expect mapped tasks and regular tasks can handle this issue in a consistent way, but not sure why mapped tasks did not survive a killed worker. Sorry for this long message, and any thoughts are welcome. Thanks!!
    Jeremiah

    Jeremiah

    2 years ago
    Hi @Jie Lou, thanks for the complete description. Here’s what we think is happening. tldr: Dask is intervening to rerun your “regular” task without interacting with Prefect at all. Prefect’s default behavior is what you’re seeing in the mapped chidlren tasks: if a task’s node fails while
    Running
    , then the system won’t rerun and it will be marked as a zombie later. However, there is a possibility (as you’re encountering) that an Executor reruns work if its execution environment allows it. When this happens, the rerun task is in a stale
    Pending
    state (since that’s the original payload) and therefore the task runner permits the execution. In this case, when a Dask worker dies, the Dask scheduler is aware of clients waiting for in-flight work and attempts to rerun it. When your “regular” task is rerun, that’s what’s happening. However, “child” tasks are submitted from
    worker_clients
    on the Dask worker. Therefore, when the Dask worker dies it takes the clients with it, and no work is resubmitted. That’s why you see the zombie behavior in children. The upcoming mapping refactor will actually align both of these by removing the worker client, and we are looking at additional ways to improve transparency of interacting with Dask.
    j

    Jie Lou

    2 years ago
    Thanks for your reply, @Jeremiah ! That well explains why they behaved differently. For now, is there any way a “child” tasks can survive a dead node?
    Jeremiah

    Jeremiah

    2 years ago
    For the moment no, but the refactor should be in place for an upcoming release of Core to resolve this. Interestingly, “officially” our opinion is that the child behavior is “correct” - a node failed while a task was running and therefore it’s a zombie that shouldn’t run again. Something we’ve discussed (and maybe should advance) is the ability to define “zombie behavior” for each flow. In other words, what should Cloud do with a zombie? Should it kill it? Should it retry it? etc.
    j

    Jie Lou

    2 years ago
    The ability to define “zombie behavior” sounds great to me. Anyway, looking forward to the upcoming release! Thanks for your work!
    a

    Alex Cano

    2 years ago
    If there’s also the possibility to define this behavior per task, that would be fantastic too. I know I have a decent chunk of tasks that are idempotent that can be retried as many times as needed, and some that are very much not. That would be neat if it’s possible!