Robert Hales
08/24/2021, 10:36 AMDaskExecutor
Robert Hales
08/24/2021, 10:36 AMRobert Hales
08/24/2021, 10:50 AMMarwan Sarieddine
08/24/2021, 11:38 AMRobert Hales
08/24/2021, 11:43 AMMarwan Sarieddine
08/24/2021, 11:54 AMMarwan Sarieddine
08/24/2021, 12:09 PMAt 95% of memory load (as reported by the OS), terminate and restart the worker
Marwan Sarieddine
08/24/2021, 12:10 PMdavzucky
08/24/2021, 12:56 PMRobert Hales
08/24/2021, 1:25 PMRobert Hales
08/24/2021, 1:26 PM2021-08-24T11:19:08.209Z distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
2021-08-24T11:19:08.320Z distributed.nanny - INFO - Worker process 166 was killed by signal 15
2021-08-24T11:19:08.427Z distributed.nanny - WARNING - Restarting worker
Kevin Kho
Robert Hales
08/24/2021, 1:51 PMKevin Kho
Robert Hales
08/24/2021, 1:57 PMRobert Hales
08/24/2021, 1:58 PMKevin Kho
Kevin Kho
Robert Hales
08/24/2021, 2:08 PMKevin Kho
Robert Hales
08/24/2021, 2:14 PMKevin Kho
Marwan Sarieddine
08/24/2021, 3:04 PMRobert Hales
08/24/2021, 3:07 PMRobert Hales
08/24/2021, 3:08 PMMarwan Sarieddine
08/24/2021, 3:09 PMRobert Hales
08/24/2021, 3:28 PMLocalDaskExecutor
?Kevin Kho
LocalDaskExecutor
due to memory, there is no nanny
to help you restart the worker (because all the workers live in the same place). The flow will likely fail and then have to be restarted but it should restart at the right placeRobert Hales
08/24/2021, 5:05 PMRobert Hales
08/26/2021, 8:50 AMcheckpoint=True, result=PrefectTask()
on my task, and the result was populated in the ui, but when the downstream task caused worker reboot (sys.exit()
) the checkpointed task reran?Kevin Kho
Robert Hales
08/26/2021, 3:08 PMRobert Hales
08/26/2021, 3:32 PMimport sys
import time
import prefect
from prefect import Flow, task
from prefect.engine.results import PrefectResult
@task(checkpoint=True, result=PrefectResult())
def task_to_checkpoint():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("I should be checkpointed!")
time.sleep(10)
return [1, 2, 3]
@task
def bad_task(a):
sys.exit()
with Flow("checkpointing") as flow:
bad_task(task_to_checkpoint())
Robert Hales
08/26/2021, 3:33 PMdask-worker 192.168.0.46:8786 --nprocs 1 --nthreads 1
Robert Hales
08/26/2021, 3:34 PMI should be checkpointed!
once and bad_task
to be retried, however this is not the behaviour I see.Kevin Kho
sys.exit()
calls because there is some exit logic that needs to happen. I would suggest you raise FAIL
instead to fail that task.Robert Hales
08/26/2021, 4:04 PMsys.exit
is to emulate the worker being killed by the nanny for memory reasons, so in that case I cant raise a FAILRobert Hales
08/26/2021, 4:56 PMKevin Kho
Robert Hales
08/26/2021, 5:04 PMKevin Kho
Kevin Kho
Robert Hales
08/27/2021, 8:15 AMUnexpected error: KilledWorker('bad_task-2c4dec7ec8e142589c415e5d673b6843', <WorkerState '<tcp://10.130.46.68:49660>', name: <tcp://10.130.46.68:49660>, memory: 0, processing: 2>)
, but again this is just emulating that memory leak. So in the real tasks, the worker would have high unmanaged memory -> gets killed by nanny -> then flow is rerun with more available memory and succeeds. However during this re-run all of the successful tasks are run - even if they have checkpointing on (like in the example).Robert Hales
08/27/2021, 8:23 AMRobert Hales
08/27/2021, 8:23 AMRobert Hales
08/30/2021, 8:34 AMKevin Kho
Kevin Kho
Kevin Kho
state_handler
where the old_state
is Success and the new_state
is Running
. Short-circuit this task from running by directly returning the Success state.Robert Hales
08/30/2021, 6:53 PM