Dask based question this time - anyone know what t...
# prefect-community
d
Dask based question this time - anyone know what the underlying issue might be if I’m getting a Dask KilledWorker error? Error occured while running a Flow with 20 mapped tasks where each one triggers an instance of another flow. File “/usr/local/lib/python3.6/site-packages/distributed/client.py”, line 1841, in _gather raise exception.with_traceback(traceback) distributed.scheduler.KilledWorker: (‘trigger_subflow-10-e6f77a46d8da4b4ca8643d4fee63d6cb’, <Worker ‘tcp://127.0.0.1:39261’, name: 1, memory: 0, processing: 2>)
j
Hi Darragh. There's a few reasons why a dask worker could fail - the above log doesn't give me enough information to help further unfortunately. Usually a worker fails if: • If exceeds its memory limit • It's killed by the process manager (e.g. you're running on kubernetes, and kubernetes decides to kill that process) • Your code hits an irrecoverable error (e.g. a segfault) How did you start your dask cluster? Do you have access to the logs from that worker? or the scheduler?
d
Hey @Jim Crist-Harif I was afraid you were going to say that 🙂 I don’t have access to any of it as I’m using the DaskExecutor, so it’s just using the local/temp cluster that gets created. The only input I feed to the DaskExecutor right now is the number of workers (
n_workers=10
) so I don’t have any logs other than the line above… Is there any way to pass config to the DaskExecutor to get more debug or logging out of it?
j
No worries. So this is all running locally for you using a local cluster. You can increase the log verbosity locally by passing in
debug=True
to
DaskExecutor
. A few more questions: • Are you running inside of any process-management software? Your OS normally won't kill processes that exceed a memory limit, but if you're running inside say a docker container there may be some external process monitoring resource usage. Just trying to determine if the reason the worker died could be an external kill signal. • Is your flow particularly resource intensive in any way?
d
No not overly intensive at all - also something I should’ve mentioned, it’s not running locally, each flow is running on Fargate, I don’t provision any Dask infrastrucutre at any point, far as I know the Executor does that? Or am I making an assumption about the DaskExecutor that I shouldn’t be? Background - we have a flow on Fargate that creates up to 20 mapped tasks. This flow uses the DaskExecutor to achieve parallell mapping. Each mapped task that gets run triggers the running of another flow, and all of these run on Fargate. The mapped sub flows all start with no problems, but the parent flow that’s tracking them is the one that then throws the KilledWorker exception. We don’t have any access to whatever the underlying Dask stuff is doing, and we don’t have a local cluster that we specifically setup, just whatever gets started as part of the executor. Bit long winded, but hopefully that’s useful for getting some detail! The parent flow, the one that does the mapping/triggering of sub flows, does run in a docker container so it’s possible that’s having memory problems when run with the DaskExecutor? And as an aside, are right in using the DaskExecutor this way? Or is there a better approach for the case I’ve described?
j
That's actually quite useful. I'm not very familiar with fargate, so I'm not sure if there's any resource monitoring or process management there that could be causing the issue. Given the flow you described though, what you actually want is to run with multiple threads, not processes (since you're mostly doing network-y things, threads should be fine). You can do this with the
DaskExecutor
by passing in
cluster_kwargs={"processes": False}
. You might also add
debug=True
so if you run into a problem like this again, you'll get more log information that we can work with.
Alternatively, you could use the
LocalDaskExecutor
, which will use an alternative dask scheduler to run your workload (by default this one uses threads). In this case the
LocalDaskExecutor
would probably work fine for you, but for most prefect workloads the
DaskExecutor
(which uses the distributed scheduler) will be more performant.
d
Ah, interesting, thanks! Ok I’m going to test that now, should get an answer on it pretty soon. Much appreciated 👍
👍 1
Follow up question - the threads instead of processes change seems to have done the trick [muchas gracias!], but the downstream task the one that takes all the mapped outputs and reduces/collates them, is now sitting in “Pending” and shows no sign of running. Related to the Dask stuff maybe? No idea…
j
Hmmm, that's odd. Given only that I'm not sure what could be the cause.
d
Yeah I couldn’t see anything obvious either. Only guess that threads are tied and the final task can’t run?
j
I would be surprised if there are deadlocks in the dask executor (no other tasks running, pending task sitting idle). If you are seeing this behavior, getting logs would be useful, and a reproducible example even better.
If other tasks are still running, but a task you think should run hasn't yet, that might just be because dask has no free threads to run it.
d
Trying to reproduce it now, but there don’t seem to be any logs in the UI, literally just says pending. And I don’t think it’s due to other tasks, this one is the last in the DAG and all preceding tasks have completed
j
That is odd. Are you using a
DaskExecutor
or
LocalDaskExecutor
?
d
DaskExecutor
j
The log output to the terminal when running with
debug=True
passed to
DaskExecutor
may be useful here.
d
👍 Need few minutes to clean out running flows and pull the log
Ok, back again. The issue is proving to be consistently repeatable, which is always good and bad 🙂 However, even with the debug=True I’m still getting no extra output, either from the agent logs or the flow logs in the UI. The debug flag for Dask, I’m setting it like so, just to verify..
Copy code
flow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=20, cluster_kwargs={"processes": False}, debug=True))
And I’m actually not even getting DEBUG log from the agent now that I look closely - what’s the flag to get the agent to do debug logging? I thought have an env var of
PREFECT___LOGGING___LEVEL=DEBUG
would do it?
j
Hmmm, looks like that setting doesn't set the logs low enough to get something useful. Try:
Copy code
flow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=20, cluster_kwargs={"processes": False, "silence_logs": 10}))
Just tested that locally, and I do get logs from dask then. We might want to change that flag, thanks for bringing that up.
I think you want
PREFECT__CLOUD__AGENT__LEVEL=DEBUG
d
Aha, ok now I’m getting a whole bunch of warnings to the cloudwatch logs - nothing specific to the final task not running, but definitely indicative of a problem: 2020-06-30T175921.826+01:00 distributed.utils_perf - WARNING - full garbage collections took 96% CPU time recently (threshold: 10%) 2020-06-30T175921.826+01:00 distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 189.50 MB -- Worker memory limit: 196.89 MB
Step 1 will be to boost memory for the flows, but at the same time it doesn’t really tell me why the last task isn’t running..
Is there any way to get the workers cleaned up after their task is complete?
j
The worker threads rely on python gc to clean up - if an intermediate result isn't used anymore then it's garbage collected. You're doing network-y tasks, right? Are you cleaning up your http clients properly? Python objects that aren't properly cleaned up may be one reason here.
If there's no obvious memory leak, give the
LocalDaskExecutor
a try. It's much lighter weight, and fits your problem well.
Copy code
executor = LocalDaskExecutor(num_workers=10)  # defaults to number of cores
d
I am, but not in that particular case - that log sample comes from the parent flow, the one triggers the sub flows and polls for updates. Although having said that, there are numerous creations of
prefect.client.Client
- do they need to be cleaned? I’ll try switch to the Local version now as well 👍
Also, would the use of
result=LocalResult()
have any implications?
j
Hmmm, the client code could be improved to better use a shared session, but I don't see any obvious resource leaks in our client.
The only implications I can think of using a
LocalResult
is disk write speed, since it will be writing to disk in your fargate task. Seems unlikely to be the culprit here.
I have a PR up now that should make it easier for code like yours to reuse the same client everywhere, so that would be more efficient than creating a new client in each task (see https://github.com/PrefectHQ/prefect/pull/2891).
d
Thanks for that Jim - I’ve switched to the LocalDaskExectuor but it seems to be ignoring the n_workers setting, only running 2 tasks in prallell..
Copy code
flow.environment = LocalEnvironment(executor=LocalDaskExecutor(n_workers=10))
j
n_workers
->
num_workers
d
Tried that - doesn’t run at all 🙂
j
Hmmm, one sec
d
With num_workers I got a parent flow that just sat in submitted and never started
Offline for a bit with kids in case of slow replies 👍
👍 1
j
Hmmm,
num_workers
works fine for me - the following flow runs 10 tasks in true parallel:
Copy code
import time

from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor


@task
def inc(x):
    time.sleep(10)
    return x + 1


with Flow("test") as flow:
    inc.map(range(10))


state = flow.run(executor=LocalDaskExecutor(num_workers=10))
num_workers
is definitely the proper keyword argument - if it doesn't work for you, that's odd. Can you try running the above flow on your infrastructure to see if that works as intended?
d
Yeah I'll give it a try shortly, thanks Jim - Bedtime waits for no man 🤣 I'll rerun the one I had that stalled as well, might be something buried in cloudwatch that I missed
👍 1
In the true spirit of the perverse, I tried the num_workers again and this time it worked! I think what I saw last time was a separate issue that I’ve raised previously - on the first run after registering a new version of a flow, intermittently it doesn’t start. According to Braun this is related to the registerTaskDefinition and runTask being tightly linked I believe? Either way it seems to be running and completing all the way through - I have some scale tests to run to prove I can get up to 20 workers/threads, but hopefully that’s it solved
👍 1
j
Glad you got it working!
d
Thanks! seems to be scaling up to 20 workers as well, which is great. Really appreciate the help! 👍