can you set it with kwargs on your DaskExecutor in...
# prefect-community
r
can you set it with kwargs on your DaskExecutor initialisation?
n
Hi Rob. Thank you. Forgot to mention my DaskExecutor
Copy code
executor = DaskExecutor(
    address=cluster.scheduler_address,
    client_kwargs={"timeout": 20},
)
but I'll try the following then:
Copy code
executor = DaskExecutor(
    cluster_kwargs={"address": "<tcp://51.105.126.161:8786>", "timeout": 20},
)
cannot use cluster_kwargs timeout because I need to specify an address and I get the error
Copy code
"Cannot specify `address` and `cluster_class`/`cluster_kwargs`"
other suggestion how to specify the timeout?
reading dask.distributed.Client documentation,
Copy code
tiemout: Timeout duration for initial connection to the scheduler
I believe the problem is during normal call to the cluster to execute operations, hence I was trying to change the value:
dask.config.set({"distributed.comm.timeouts.connect": 20})
. don't know how to pass this as an argument to client though
j
Where are you seeing the error for the 3s timeout? None of the timeouts in distributed default to 3 seconds, so I'm not sure what that would be from.
Ah, found it, hardcoded in distributed not in the config. Thinking through a solution now, but I know where it's coming from.
Ok, this will require a patch to Prefect (for now), probably will push up a patch to distributed to resolve this for everyone. We usually release on tuesdays, so the fix should hopefully be out the door by EOD today for you to try.
❤️ 1
Distributed's config system evolved over time, and not all timeouts are configured by it (currently). The timeout you're seeing is from creating a
Client
on a
Worker
, which is hardcoded in Dask's distributed scheduler as
3s
. This code path is only hit in Prefect if you're using an existing dask cluster (meaning you pass in an
address=
to
DaskExecutor
). If you let the
DaskExecutor
create and manage its own temporary dask cluster this code path won't be hit at all.
🙏 1
I'm going to push a patch to prefect to force this timeout to use the proper distributed config value. You can then adjust this via environment variable or dask config value, rather than using the hardcoded value. The default will also be higher (10s, same as the connect timeout in other places).
🙏 1
m
Hello everyone, i have the same error: Unexpected error: OSError("Timed out trying to connect to 'tcp://192.168.164.82:46297' after 3 s: Timed out trying to connect to 'tcp://192.168.164.82:46297' after 3 s: connect() didn't finish in time") I use DaskKubernetesEnvironment
n
@Jim Crist-Harif thank you for the amazing findings. makes sense. thank you for going that deep. this will make things a lot more resilient when working with existing dask cluster. awesome
j
Prefect 0.13.7 has been released with this fix. The new default timeout is 10 s (matching the default in dask's config file). If you find this still isn't sufficient, the timeout will now respect dask's config - you can change this bey setting the following environment variable in your dask worker environments (e.g. if using k8s, add this to the pod spec):
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=20s