https://prefect.io logo
Title
j

Jie Lou

04/10/2020, 3:20 PM
Hi All. I have a question about retries on mapped tasks. A little background: we use prefect cloud with Dask workers to run multiple tasks in parallel. The chances are some of the workers could die when the flow is running, and we hope retries can help recover the task run. Here is a simple flow with a cal function:
@task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
def cal(x):
    print("starting sleep {} seconds".format(x))
    time.sleep(x)
    return x

with Flow("test", result_handler=s3_result_handler) as flow:
    time = Parameter("time", default=[60])
    results = cal.map(time)
When the cal task was running, I manually killed the worker, and kept observing the log in Cloud UI, but found this
starting sleep 60 seconds .    #then I killed the worker...
Task 'cal[0]': Starting task run...
Task 'cal[0]': task is already running.
Task 'cal[0]': finished task run for task with final state: 'Running'
1 mapped tasks submitted for execution.
Task 'cal': Handling state change from Mapped to Mapped
Task 'cal': task has been mapped; ending run.
Task 'cal': finished task run for task with final state: 'Mapped'
Flow run RUNNING: terminal tasks are incomplete.
Marked "Failed" by a Zombie Killer process.
It seems that the state of task does not change from running after I killed the worker. And that’s why the flow was finally tagged as zombie. However, if cal task is a regular task without mapping, the retries mechanism worked as expected. I just tweaked the flow above a little bit:
@task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
def cal(x):
    print("starting sleep {} seconds".format(x[0]))
    time.sleep(x[0])
    return x[0]

with Flow("test", result_handler=s3_result_handler) as flow:
    time = Parameter("time", default=[60])
    results = cal(time)
Still, when the task was running, I killed the worker. And this time retries worked. Here are the logs:
starting sleep 60 seconds #then I killed the worker...
Task 'time': Starting task run... 
Task 'time': Handling state change from Pending to Running
Task 'time': Calling task.run() method...
Task 'time': Handling state change from Running to Success
Task 'time': finished task run for task with final state: 'Success'
Task 'cal': Starting task run...
Task 'cal': Handling state change from Pending to Running
Task 'cal': Calling task.run() method...
starting sleep 60 seconds
and then the flow finished successfully. I would expect mapped tasks and regular tasks can handle this issue in a consistent way, but not sure why mapped tasks did not survive a killed worker. Sorry for this long message, and any thoughts are welcome. Thanks!!
j

Jeremiah

04/10/2020, 6:01 PM
Hi @Jie Lou, thanks for the complete description. Here’s what we think is happening. tldr: Dask is intervening to rerun your “regular” task without interacting with Prefect at all. Prefect’s default behavior is what you’re seeing in the mapped chidlren tasks: if a task’s node fails while
Running
, then the system won’t rerun and it will be marked as a zombie later. However, there is a possibility (as you’re encountering) that an Executor reruns work if its execution environment allows it. When this happens, the rerun task is in a stale
Pending
state (since that’s the original payload) and therefore the task runner permits the execution. In this case, when a Dask worker dies, the Dask scheduler is aware of clients waiting for in-flight work and attempts to rerun it. When your “regular” task is rerun, that’s what’s happening. However, “child” tasks are submitted from
worker_clients
on the Dask worker. Therefore, when the Dask worker dies it takes the clients with it, and no work is resubmitted. That’s why you see the zombie behavior in children. The upcoming mapping refactor will actually align both of these by removing the worker client, and we are looking at additional ways to improve transparency of interacting with Dask.
j

Jie Lou

04/10/2020, 6:20 PM
Thanks for your reply, @Jeremiah ! That well explains why they behaved differently. For now, is there any way a “child” tasks can survive a dead node?
j

Jeremiah

04/10/2020, 6:23 PM
For the moment no, but the refactor should be in place for an upcoming release of Core to resolve this. Interestingly, “officially” our opinion is that the child behavior is “correct” - a node failed while a task was running and therefore it’s a zombie that shouldn’t run again. Something we’ve discussed (and maybe should advance) is the ability to define “zombie behavior” for each flow. In other words, what should Cloud do with a zombie? Should it kill it? Should it retry it? etc.
j

Jie Lou

04/10/2020, 6:26 PM
The ability to define “zombie behavior” sounds great to me. Anyway, looking forward to the upcoming release! Thanks for your work!
👍 1
a

Alex Cano

04/10/2020, 8:12 PM
If there’s also the possibility to define this behavior per task, that would be fantastic too. I know I have a decent chunk of tasks that are idempotent that can be retried as many times as needed, and some that are very much not. That would be neat if it’s possible!