https://prefect.io logo
m

Marwan Sarieddine

10/15/2020, 1:19 PM
Hi dask folks - is there an environment variable I can set for dask-kubernetes not to use a nanny ?
I am asking this because I am facing this issue running prefect with a kubernetes agent and a dask kubernetes execution environment
Copy code
/usr/local/lib/python3.8/site-packages/distributed/cli/dask_worker.py:277: UserWarning: The --bokeh/--no-bokeh flag has been renamed to --dashboard/--no-dashboard. 
  warnings.warn(
distributed.nanny - INFO -         Start Nanny at: '<tcp://192.168.83.227:45857>'
distributed.worker - INFO -       Start worker at: <tcp://192.168.83.227:34167>
distributed.worker - INFO -          Listening to: <tcp://192.168.83.227:34167>
distributed.worker - INFO -          dashboard at:       192.168.83.227:42701
distributed.worker - INFO - Waiting to connect to: <tcp://192.168.66.175:37303>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                    7.90 GB
distributed.worker - INFO -       Local Directory: /dask-worker-space/dask-worker-space/worker-oytfsgjy
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to: <tcp://192.168.66.175:37303>
distributed.nanny - INFO - Closing Nanny at '<tcp://192.168.83.227:45857>'
distributed.worker - INFO - Stopping worker at <tcp://192.168.83.227:34167>
distributed.worker - INFO - Closed worker has not yet started: Status.undefined
distributed.dask_worker - INFO - End worker
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 295, in _
    await asyncio.wait_for(self.start(), timeout=timeout)
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 490, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/bin/dask-worker", line 8, in <module>
    sys.exit(go())
  File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 456, in go
    main()
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 442, in main
    loop.run_sync(run)
  File "/usr/local/lib/python3.8/site-packages/tornado/ioloop.py", line 532, in run_sync
    return future_cell[0].result()
  File "/usr/local/lib/python3.8/site-packages/distributed/cli/dask_worker.py", line 436, in run
    await asyncio.gather(*nannies)
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/local/lib/python3.8/site-packages/distributed/core.py", line 299, in _
    raise TimeoutError(
asyncio.exceptions.TimeoutError: Nanny failed to start in 60 seconds
z

Zanie

10/15/2020, 1:31 PM
Hi @Marwan Sarieddine, I’m not an expert but I think the nanny is needed for distributed dask. From https://github.com/dask/dask-jobqueue/issues/391 it seems likely that the compute node is unable to contact the scheduler and this is likely a network issue?
m

Marwan Sarieddine

10/15/2020, 1:37 PM
Hi @Zanie - thank you for you response ... Hmm ... with a static dask cluster set up I was able to get away without using a nanny: from the dask distributed docs https://distributed.dask.org/en/latest/_modules/distributed/nanny.html
Copy code
class Nanny(ServerNode):
    """A process to manage worker processes

    The nanny spins up Worker processes, watches then, and kills or restarts
    them as necessary. It is necessary if you want to use the
    ``Client.restart`` method, or to restart the worker automatically if
    it gets to the terminate fractiom of its memory limit.
so I suppose you are saying in dask-k8s -
Client.restart
needs to be called to restart workers (hence the need for a Nanny)?
z

Zanie

10/15/2020, 1:41 PM
From that docstring it seems like you may be able to get away with not having the nanny! But I think that the nanny not starting is an issue with compute <-> scheduler communication which will make it so you can’t run jobs on your cluster anyway. I’m not confident about this though and it may be worth waiting for someone (@Jim Crist-Harif) with more dask knowledge to chime in. Did you set up dask-kubernetes with the provided helm chart? Are you sure the network interface being used is allowing communication?
m

Marwan Sarieddine

10/15/2020, 1:50 PM
my current dask-kubernetes set up works fine for small workloads - but starts throwing these errors when the ratio of mapped tasks to available workers gets large ...
trying to throw more workers at the problem but I am still facing the same issue
asyncio.exceptions.TimeoutError: Nanny failed to start in 60 seconds
I should also note that occasionally the mapped child tasks are never spawned and the flow hangs in a running state - please see the screenshot below
inspecting the worker logs when this issue of mapped children not spawning occurs:
Copy code
distributed.worker - WARNING -  Compute Failed
Function:  _maybe_run
args:      ('prefect-99690e79d0b849ed8f1e05792eb4818d', <function run_task at 0x7f4f5bc54940>)
kwargs:    {'task': <Task: do_repartition>, 'state': <Pending: "Task run created">, 'upstream_states': {<Edge(key=test, mapped=False, flattened=False): test to do_repartition>: <Success: "Task run succeeded.">, <Edge(key=output_location, mapped=False, flattened=False): output_location to do_repartition>: <Success: "Task run succeeded.">, <Edge(key=input_location, mapped=False, flattened=False): input_location to do_repartition>: <Success: "Task run succeeded.">, <Edge(key=part_vals_list, mapped=True, flattened=False): get_dataset_partitions to do_repartition>: <Success: "Task run succeeded.">}, 'context': {'image': '<http://154279524260.dkr.ecr.us-west-2.amazonaws.com/pyinfima:0.33.1|154279524260.dkr.ecr.us-west-2.amazonaws.com/pyinfima:0.33.1>', 'namespace': 'prefect', 'flow_run_id': '674e5f6c-7410-4845-a60d-2b7ec9d4f619', 'config': <Box: {'debug': True, 'home_dir': '/root/.prefect', 'backend': 'cloud', 'server': {'host': '<http://localhost>', 'port': 4200, 'host_port': 4200, 'endpoint': '<http://localhost:4200>', 'database': {'host': 'localhost', 'port': 5432, 'host_
Exception: OSError("Timed out trying to connect to '<tcp://192.168.79.8:38019>' after 30 s: Timed out trying to connect to '<tcp://192.168.79.8:38019>' after 30 s: connect() didn't finish in time")
ok - so increasing the
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT
from 30s to 300s helped in resolving this issue of mapped children spawning ... But I am still facing the Nanny timeout error in some of the worker pods - is there an environment variable I can set to increate the Nanny timeout time ?
z

Zanie

10/15/2020, 3:07 PM
dask-worker
has a
--death-timout
option which appears to be passed down to the Nanny
m

Marwan Sarieddine

10/15/2020, 3:18 PM
@Zanie - thanks I noticed - i will modify the
args
accordingly and see if it helps
@Zanie - increasing
death-timeout
helped me resolve the Nanny Timeout Error - thanks again for the support
z

Zanie

10/15/2020, 5:36 PM
Great! Glad to hear it. If you post a bit of the code you used to resolve this I’ll try to get it included in the docs somewhere.
j

Jim Crist-Harif

10/15/2020, 9:39 PM
Nanny's aren't required for operation, but are required if you ever want
client.restart()
to work (which shouldn't be necessary for prefect). The
--death-timeout
can be useful for ensuring stranded dask-worker pods don't linger around after the cluster has shutdown, but a long death-timeout shouldn't be necessary for the cluster to run. dask-kubernetes should ensure that your scheduler is up and running before starting workers, so this sounds like a symptom of a deeper network connectivity or resource-starvation issue. • Does your flow-runner (scheduler) pod have enough CPU/memory to operate properly? For large flow runs I'd give no less than 1 core for the flow runner pod (making it much larger than 1 core shouldn't be needed, but if it's much less you may run into problems like dropped connections). You probably want at least 512 MiB for this pod as well, but maybe more depending on your flow. • Do you have any custom networking configs in your cluster that may be affecting things here? If you don't know, the answer is likely no.
m

Marwan Sarieddine

10/15/2020, 11:22 PM
Hi @Jim Crist-Harif - thank you for the explanation - helps clear things. It seems my issue is resource starvation related given I am not changing the default resource allocations for the flow runner pod which is less than adequate from what you are describing... (I am not using any custom networking configs ...)
5 Views