Hi all - I have a really annoying problem that is ...
# ask-community
t
Hi all - I have a really annoying problem that is starting to bite me hard. I have a
flow
that seems to be running fine, except tasks that read a
Complete()
state are, for whatever reason, being restarted. Sometimes hours after they were successfully completed. As far as I can tell, there are no errors in my files. I am seeing the correct set of end data products from tasks that are completed after the task that is restarted needlessly. Is there something I should be looking into for this? I am a little unsure where to even begin.
t
hey Tim! A few questions for context. Are you running this on your local system or another platform? Does the entire run restart or just the task, and then shuts off after that? Do you have some example code you can send over?
t
Hi Tess! Thanks for asking! First off - I am vision impaired / blind. If I miss anything in your question or am unclear in my text, please let me know. i do my best to catch these things but it is easy to overlook. So, I am running a self-hosted Prefect server. I am running prefect with
WEB_CONCURRENCY=24
, and a postgres database server through a container:
Copy code
SINGULARITYENV_POSTGRES_PASSWORD="$POSTGRES_PASS" SINGULARITYENV_POSTGRES_DB="$POSTGRES_DB" SINGULARITYENV_PGDATA="$POSTGRES_SCRATCH/pgdata" \
        singularity run --cleanenv --bind "$POSTGRES_SCRATCH":/var postgres_latest.sif -c max_connections=4096 -c shared_buffers=4096MB
I can see the workers and connections. The actual code is running on a set of compute nodes managed by SLURM on a HPC cluster. Each jobs has effectively twice as many CPUs and memory that I have spec'd that is required for the workflow. The entire run does not restart. When I am looking at these restarted tasks the UI is reporting their run count as 1. I can also see that the prefect messaged
Finished in state Complete()
is issued, and then the task restarts some time later in the same page. Other tasks that are still running continue to run without any issue. In my prefect server I am getting these messages _very occasionally_:
Copy code
Invalid HTTP request received.
Invalid HTTP request received.
Invalid HTTP request received.
Invalid HTTP request received.
Invalid HTTP request received.
Invalid HTTP request received.
Are there are extra options I can activate to try to track those down and see if they are related? I am completely reinstalling my environment in the hopes there is a version mismatch or some other patched upstream library. I have some memory of a http2 type error in a dependency lurking around. I don't have a MWE that can consistently reproduce the issue. When I try to track it down and rerun my failed flow, it works. It could very well be related to the HPC I am using, but there are no errors reported or known by the technical staff who managed it. The attached screen shot is an example. The particular error is because the data file has been zipped by a later stage, which was completed successfully. The only way that it could have gotten to this zip stage is if there were a number of other stages that completed successfully against this correctly preprocessed measurement set. I can even see some figured produced against data from this data that imply everything worked at least once. The other tasks in my flow that fail also have the same basic structure - they complete, I see the finished data products, prefect marks them as
Complete()
, but some time after they rerun. In this attached screenshot it was some 2 hours later.
t
Thanks for providing this context! This doesn't seem to be a Prefect issue. When a Prefect flow runs, it does so in a thread and the tasks from that flow run in the same thread unless another tool is distributing the runs. Once the flow run completes, the thread ends and there would be no reason that a task restarts. Assuming what is happening is there's a task that runs in a node, completes, and then isn't fully yielded back/relinquished, so it for some reason starts back up again at a later time.
t
I need to think this through a little. I am using
dask_jobqueue
to establish a set of dask workers. So long as these dask workers have work to do, they are alive for the duration of the flow. The SLURM script that runs on the compute node is purely starting a dask worker that connects to the dask task runner's scheduler. While there are tasks to do, the set of workers remain alive and available for work. When the dask worker does shut down, I would hope it is has reported back to the scheduler that it is, and informed the prefect engine as such? Also, I am noticing that this is happening while the flow run is still running. The only other obvious thing I can think of is to turn of the adapt mode of the dask cluster (which is what the dask task runner is under the hood using). I am unclear on how to further go about trying to isolate and test things further.
t
I think the next best thing to do is running a simplified version of your code locally, using Prefect, but without SLURM and Dask. You'll be able to confirm if the code runs as planned. Also, are you using Dask Task Runner?
t
Yes, I am running the Dask Task Runner.
I can begin to try to get a more simplified example, but the problem I face is that I really need a large sequence of resources to get through the workflow in a timely manner.
By timely, I mean within 6 hours I need around 36 large jobs with 192GB memory / 16 cores.
May I ask, how are tasks distributed to the dask workers when using a Dask Task Runner? A multiple items given some dask worker in one go? Or is it a case (since prefect isn't using a DAG), that each worker requests a new unit of compute from the prefect orchestration engine when it becomes available / alive? I am just curious is this is some weird dask worker stealing work type issue. I thought that these were transactional, i.e. worker 2 won't take work from worker 1 if those items were already running or completed
t
To my knowledge once Prefect sends Dask the code, Dask handles it from there. So Dask would control the distribution of work to workers.
t
This is interesting. I found tweaking some of the postgres settings, specifically increasing the shared buffers, the problem seems to have gone away. By problem, I mean the restarting tasks - I still do see the
Invalid http message
errpr
Hi Tess - You were entirely right about the Dask controlling the tasks. My tweaks to the prefect server and postgres database were not the fix like I thought there were. just a happy coincidence. The problem is related to the use of the adapative scaling, and how quickly work is redistributed among workers. I think there has to be some logic with the dask scheduler electing to recompute small units of work, rather then trying to transfer data units between workers. I say this based on some older issues I found, and only seeing these failure modes when workflows are trying to acquire resources in a over subscribed setting -- typically when the cluster is under heavy load and I am only getting a small number of workers at a time. These settings have helped a little, though not outright solved the problem.
Copy code
# See <https://docs.dask.org/en/latest/configuration.html#distributed-scheduler>
# For more information on these variables
# This attempts to set distributed.scheduler.worker-saturation
export DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=0.01
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING_INTERVAL="120s"
export DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="3600s"
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=100
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL="10000ms"
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE="1000000ms"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="300s"
export DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG="16384"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="300s"
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=12
and setting the adaptive mode to something like
Copy code
adapt_kwargs:
    minimum: 2
    maximum: 36
    wait_count: 20
    target_interval: "300s"
    interval: "30s"
Would there be any other insights you or teams members could share? I am very close to having a proper working solution that I can go all out with. Is it also worth considering putting something together in the docs about this? This seems to be a larger issue for me since I am using the dask_jobqueue.SLURMCluster, which means I am at times slowly acquiring the compute resources.
t
Glad you got that initial issue figured out. Let me chat with some of the team and see what I can find.
t
Well, I have circled back to it being a weird dask issue, and nothing being solved. I am one step closer to writing something that attempts to query the prefedt server for a task uuid to see if it has completed previously.....
I might have to overload the
task
decorator / function to confirm that dask is not doing anything silly. Frustrating. But some of these functions have some very frustrating side effects that I might not be able to get around.
Well. to dig this one up again. It appears as though I have been able to side step the issue by using presistent task results in prefect enabled via the appropriate environment variable. Since activating this option I have not seen the error come back. I am very pleased with this. Still though, I kind of feel like there is something up in the dask scheduler side of things. I am not sure why these tasks rerunning - whether it was because a dask worker did not transmit the result to other workers before it was shutdown, or whether dask thought it would be quicker to recompute
150 Views