Hi. Is it possible to define a timeout on a flow r...
# prefect-community
s
Hi. Is it possible to define a timeout on a flow run's (not task!) runtime? Because sometimes a flow is stalled. The flow is in state RUNNING but every one of the flows' tasks is at state PENDING. I have a prefect-server setup with a dask scheduler and some dask worker doing the heavy lifting. All components running in containers but prefect is NOT spawning containers (static server setup). Unfortunately I had to set a lifetime (=auto-restart) for dask workers (because after maybe some hundeds finished tasks the dask worker's cpu AND ram consumption is increasing -- even when no flow is running the cpu is running at >60%!). The RUNNING flow state with PENDING tasks seems to be persisted in the DB but is never updated, checked, deleted. After a complete restart of all components (apollo, dask, agent, etc.) the flow run(s) are still in the same state but nothing happens. This stalled flow run blocks every other task because its started by a scheduler. Could it be helpful to define a ResultHandler to allow the agent to pickup the flow run's crappy state and maybe continue or abort the broken flow run? I'm using a NFS to share data between dask worker and agent (mounted under
$PREFECT__HOME_DIR
. I see my pickled flows and a lot of pickled results. But Prefect seems not to be aware of that. The last log line is btw
Flow 'session-creation-watchdog': Handling state change from Scheduled to Running
. In a successful run, the next log line would be the start of my first task. I don't know where to start to debug that. What can I do to better understand the problem? Is there a timeout or can I force a "PING" to the flow to identify the real state?
k
Hey @Sven Teresniak, Let me get some more information for you - though this does seem like a very multi-faceted dilemma.
s
Yes, I have to dissect the problem. I have now deactivated the auto-restart of dask worker. Now I have to wait until the problem occurs (I will generate workload to maybe trigger the problem in case everything runs smoothly this night). I also disabled work stealing in the dask settings (was enabled before). In 11h I will be able to investigate this problem further. I hope I have stalled flow runs then…
k
Alrighty, we're happy to wait on your experiment. Based on this description it seems like the server isn't able to communicate with your Dask worker (relaying changing state to FlowRunner) - as your results are indeed stored as pickled results, but not reflected in the UI. We'd love to get more information on your configuration/specifications as a GitHub issue for a deeper dive whenever you're ready or you've exhausted your options.
s
But apart from that: I miss some kind of heartbeat or status-request for RUNNING flow runs.
Yes, I will describe my setup in detail (the easy part, Dockerfile+K8S.yaml). And then maybe generate a minimal example for you to confirm the problem. Give me some time. Thanks for your help.
m
Hi @Sven Teresniak - I couldn't help but notice this thread as we are facing a very similar - issue, we have a very similar execution environment, and the issue seems to be that with time the dask workers are being overloaded - the memory consumption of dask workers seems to constantly increase as the flow progresses - until it reaches an 80% or so threshold at which point dask stops execution, leading to tasks and the entire flow hanging in a running state - the problem is exacerbated as the ratio of tasks to workers increases ...
s
@Marwan Sarieddine exactly. I couldn't reproduce this with minimal examples yet. After holiday season I will upgrade to v0.13.x or later (we are still at 0.12.5). The problem seems to be that Dask Worker processes are running indefinitely. I can kill the worker by hand (just for testing), this solves a lot of stability issues. I can also restart the woker every
n
time units using the
--lifetime
CLI argument (see https://docs.dask.org/en/latest/setup/cli.html#cmdoption-dask-worker-lifetime ). But that's not good because it will cause running tasks to fail. When you put your boilerplate code (for flows) into separate modules, its hard to deploy new boilerplate because its not trivial in Python to "un-import" && re-import (boilerplate) packages. So I need an easy and robust way to (re-)start Dask Worker (or to command the nanny process to spawn worker). To some degree the behaviour of (standalone) Apache Spark should be adapted to Prefect-Dask: every flow spawns its worker (executor in spark), use them for tasks, exits them after some idle time (or explicitly somehow). I know, its much easier to spawn, manage and isolate tasks and flows using Docker or K8s. But in my company its not an option for a container to modify the state of its own container management (k8s, docker). Maybe I'm "holding it wrong" but when Dask workers are stalled, more and more flows are in status RUNNING (we want to schedule a LOT of work with Prefect). This is a clearly a negative data engineering pattern. 😞
m
@Sven Teresniak - I see, yes it is quite the scalability problem (coming up with a minimal example is a bit of a challenge ... ) I'll attempt to do so as well. I am hoping the prefect team can better investigate the dask-worker memory management going forward ...
j
upvote we are experiencing something similar, although I haven’t yet investigated the dask worker resources. FWIW we are running our dask workers as Fargate containers.
👀 1