Getting some odd behaviour with a task, runs fine ...
# ask-community
r
Getting some odd behaviour with a task, runs fine and finishes successfully - but is then started again? The task timer duration stops but is printing to logs. Using
DaskExecutor
It is a long running task (57mins)
Looking at my dask worker logs, after this task had finished and moved on to other tasks in the flow, the worker ran out of memory. The worker was then restarted and it seems to have re ran all of the tasks in the flow.
m
@Robert Hales FWIW - we have also experienced the same thing when tasks run out of memory then occasionally restart. We thought at first enabling prefect cloud’s task version locking would prevent the task from running again but that wasn’t always the case. The workaround we ended up adopting for critical tasks is to save the task run parameters to a table and then query the said table at the start of the task to check if the task has already been run - if it has we short-circuit the task run ..
r
Hmmmm interesting, thanks for the info. Is this the expected behaviour from prefect or is it a limitation on the dask side?
m
good question - I defer to the prefect team to answer this because I am not entirely sure
I guess for some additional info - A worker restart is mentioned in dask’s memory management docs (https://distributed.dask.org/en/latest/worker.html#memory-management)
Copy code
At 95% of memory load (as reported by the OS), terminate and restart the worker
There is an option when configuring dask to turn off memory management - but I wonder if that is recommended by prefect (this is done by setting memory_limit=0)
d
Can you check if the heartbeat is not the problem? I had a problem like that and disabling the heartbeat on the flow fix the problem
r
I like the worker restart behaviour as we use libpostal which eats a tonne of memory and doesnt seem to be freed. However, I would expect prefect to pick up where it left off.
There was no mention of heartbeat in the logs and there was these three lines in the dask worker logs:
Copy code
2021-08-24T11:19:08.209Z	distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting

2021-08-24T11:19:08.320Z	distributed.nanny - INFO - Worker process 166 was killed by signal 15

2021-08-24T11:19:08.427Z	distributed.nanny - WARNING - Restarting worker
k
Hey @Robert Hales, what do your Prefect logs look like? Anything with Lazarus?
r
Hey @Kevin Kho, no nothing with Lazarus - all the expected logging output from my tasks.
k
When you say the all the tasks re-ran, do you see other evidence besides logs (files produced? memory used? etc)? Does it take the same amount of time?
r
Yep, dask worker is clearly working on the same task again - same memory usage, same length of time
The task ended up running for a third time
k
Ok will ask the team about this
In the meantime though, maybe explicit caching would help you?
r
Yes have been looking into that, however I really want to re run the tasks when ever the flow is re run. My understanding is that caching would persist across flow runs?
k
Yes but you can invalidate it or use a lower duration
r
Okay, cheers will keep looking into it
k
Hey, so this is expected behavior because Prefect submits the tasks to Dask and Dask makes the computation graph. If something is crashes, upstream tasks will have to be recomputed from Dask’s point-of-view. In order to run only once, you can try turning on Version Locking for the flow in the Flow settings so that a result will be loaded.
m
@Robert Hales apologies on the selfish request but I would appreciate it if you give the version locking a try, because I had tried it a while back and I am curious if it is more reliable by now …
r
Interesting okay, as this is expected I think it could be a little more clear what is going on - it kind of looks like a bug with tasks running but durations stopped etc.
If version locking is cloud only I cant unfortunately @Marwan Sarieddine
m
I see - thanks for letting me know …
r
@Kevin Kho Is this behaviour the same on the
LocalDaskExecutor
?
k
I am not sure, but I suspect that if you crash on
LocalDaskExecutor
due to memory, there is no
nanny
to help you restart the worker (because all the workers live in the same place). The flow will likely fail and then have to be restarted but it should restart at the right place
r
Interesting, will continue to look into this and let you know if anything else comes up. Cheers as always @Kevin Kho!
👍 1
Hey @Kevin Kho, trying to understand if checkpointing can help me here? I set
checkpoint=True, result=PrefectTask()
on my task, and the result was populated in the ui, but when the downstream task caused worker reboot (
sys.exit()
) the checkpointed task reran?
k
Hey so there is a difference between checkpoint and caching (and I think both can help). Checkpointing is related to persisting results of a task. When the task runs, the output is saved to the Result. So when you restart a flow, it would load the results of the tasks that already succeeded if they are needed. Caching on the other hand, is applied to future Flow Runs. For example, you cache a task for 24 hours, it won;t across other flow runs for 24 hours. In your case, I think you just want you restart the same flow run so you would just checkpoint . Upon flow restart, it will fetch the result of already SUCCESSFUL tasks. Are you seeing it running again entirely?
r
Okay, thanks, thats what I interpreted as the expected behaviour too. I was seeing Running -> Successful -> Running -> Successful. Will try knock up a MRE for you
Copy code
import sys
import time

import prefect
from prefect import Flow, task
from prefect.engine.results import PrefectResult


@task(checkpoint=True, result=PrefectResult())
def task_to_checkpoint():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("I should be checkpointed!")
    time.sleep(10)
    return [1, 2, 3]


@task
def bad_task(a):
    sys.exit()


with Flow("checkpointing") as flow:
    bad_task(task_to_checkpoint())
the worker being ran with
dask-worker 192.168.0.46:8786 --nprocs 1 --nthreads 1
I would expect to only see
I should be checkpointed!
once and
bad_task
to be retried, however this is not the behaviour I see.
k
Oh I see. So Prefect in general does not play nicely with the
sys.exit()
calls because there is some exit logic that needs to happen. I would suggest you
raise FAIL
instead to fail that task.
r
The
sys.exit
is to emulate the worker being killed by the nanny for memory reasons, so in that case I cant raise a FAIL
The state is correctly updated to success and the result populated before the sys.exit so prefect should be able to recover, despite the exit logic not running?
k
Oh I see what you’re going for. I think it should be, but I’ll give this a test in a bit
r
Cheers! Clocking off time this side of the pond, will catch up tomorrow
k
Did this task fail for you? It doesn’t even fail for me. Just running indefinitely.
Are you restarting the same flow run or creating a new flow run?
r
Yeah it failed eventually with
Unexpected error: KilledWorker('bad_task-2c4dec7ec8e142589c415e5d673b6843', <WorkerState '<tcp://10.130.46.68:49660>', name: <tcp://10.130.46.68:49660>, memory: 0, processing: 2>)
, but again this is just emulating that memory leak. So in the real tasks, the worker would have high unmanaged memory -> gets killed by nanny -> then flow is rerun with more available memory and succeeds. However during this re-run all of the successful tasks are run - even if they have checkpointing on (like in the example).
Obviously the best case scenario would be not having memory leaks, but thats down to some libs we use.
I am not restarting anything through the UI, this is all done by dask/prefect in the background on worker loss
@Kevin Kho any ideas on this???
k
Hey sorry @Robert Hales, this slipped me. Will test now
Hey I’m not sure there is anything we can do for you on the Prefect side because the successful tasks are restarted due to Dask’s computation graph. From Dask’s point of view, it needs to recompute all the upstream dependencies. This is why we have version locking on Cloud to address this. I am not seeing a way to get this to exit more gracefully also. If it could, then I believe the upstream tasks would be respected. Maybe you’ve seen this already, but the best suggestion I have would be to potentially reduce unmanaged memory with

this

. There is an env variable he shows. I have also seen people suggest upgrading to 2021.6.0 for better memory management.
I chatted with the team and maybe you can use a
state_handler
where the
old_state
is Success and the
new_state
is
Running
. Short-circuit this task from running by directly returning the Success state.
r
Thanks for this, will look at the provided link and the state handler. Cheers!