Jie Lou
04/10/2020, 3:20 PM@task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
def cal(x):
print("starting sleep {} seconds".format(x))
time.sleep(x)
return x
with Flow("test", result_handler=s3_result_handler) as flow:
time = Parameter("time", default=[60])
results = cal.map(time)
When the cal task was running, I manually killed the worker, and kept observing the log in Cloud UI, but found this
starting sleep 60 seconds . #then I killed the worker...
Task 'cal[0]': Starting task run...
Task 'cal[0]': task is already running.
Task 'cal[0]': finished task run for task with final state: 'Running'
1 mapped tasks submitted for execution.
Task 'cal': Handling state change from Mapped to Mapped
Task 'cal': task has been mapped; ending run.
Task 'cal': finished task run for task with final state: 'Mapped'
Flow run RUNNING: terminal tasks are incomplete.
Marked "Failed" by a Zombie Killer process.
It seems that the state of task does not change from running after I killed the worker. And that’s why the flow was finally tagged as zombie.
However, if cal task is a regular task without mapping, the retries mechanism worked as expected. I just tweaked the flow above a little bit:
@task(log_stdout=True, max_retries=1, retry_delay=datetime.timedelta(seconds=10))
def cal(x):
print("starting sleep {} seconds".format(x[0]))
time.sleep(x[0])
return x[0]
with Flow("test", result_handler=s3_result_handler) as flow:
time = Parameter("time", default=[60])
results = cal(time)
Still, when the task was running, I killed the worker. And this time retries worked. Here are the logs:
starting sleep 60 seconds #then I killed the worker...
Task 'time': Starting task run...
Task 'time': Handling state change from Pending to Running
Task 'time': Calling task.run() method...
Task 'time': Handling state change from Running to Success
Task 'time': finished task run for task with final state: 'Success'
Task 'cal': Starting task run...
Task 'cal': Handling state change from Pending to Running
Task 'cal': Calling task.run() method...
starting sleep 60 seconds
and then the flow finished successfully. I would expect mapped tasks and regular tasks can handle this issue in a consistent way, but not sure why mapped tasks did not survive a killed worker. Sorry for this long message, and any thoughts are welcome. Thanks!!Jeremiah
04/10/2020, 6:01 PMRunning
, then the system won’t rerun and it will be marked as a zombie later.
However, there is a possibility (as you’re encountering) that an Executor reruns work if its execution environment allows it. When this happens, the rerun task is in a stale Pending
state (since that’s the original payload) and therefore the task runner permits the execution.
In this case, when a Dask worker dies, the Dask scheduler is aware of clients waiting for in-flight work and attempts to rerun it. When your “regular” task is rerun, that’s what’s happening. However, “child” tasks are submitted from worker_clients
on the Dask worker. Therefore, when the Dask worker dies it takes the clients with it, and no work is resubmitted. That’s why you see the zombie behavior in children.
The upcoming mapping refactor will actually align both of these by removing the worker client, and we are looking at additional ways to improve transparency of interacting with Dask.Jie Lou
04/10/2020, 6:20 PMJeremiah
04/10/2020, 6:23 PMJie Lou
04/10/2020, 6:26 PMAlex Cano
04/10/2020, 8:12 PM