itay livni
05/24/2020, 4:09 AM[2020-05-24 03:45:38] INFO - prefect.FlowRunner | Beginning Flow run for 'Dask Cloud Provider Test'
[2020-05-24 03:45:38] INFO - prefect.FlowRunner | Starting flow run.
[2020-05-24 03:45:48] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to '<tcp://172.31.44.64:8786>' after 10 s: Timed out trying to connect to '<tcp://172.31.44.64:8786>' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "miniconda3/envs/py37moc/lib/python3.7/site-packages/distributed/comm/core.py", line 232, in connect
_raise(error)
File "/miniconda3/envs/py37moc/lib/python3.7/site-packages/distributed/comm/core.py", line 213, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to '<tcp://172.31.44.64:8786>' after 10 s: connect() didn't finish in time
Is there a port to open? In the ECS console I do see a cluster generated and closed.
https://docs.prefect.io/orchestration/execution/dask_cloud_provider_environment.html#processKingsley Blatter
05/28/2020, 10:09 PMcluster.scheduler.address
is the internal IP. If you switch to
cluster.scheduler_address
in the flow.run that will connect on the external ip.max
09/20/2020, 8:16 PMFargateCluster
directly and run flow.execute(...)
as described, you can use the cluster public IP via cluster.scheduler_address
. if you are trying to use the DaskCloudProviderEnvironment
, because the cluster is instantiated dynamically, i don't know that you can actually control that (i.e., trying to access dask_cloud_provider_instance.cluster
will just return None
until execution begins and _create_dask_cluster
is called). as far as i can tell, when using this environment, you need to have a machine in your VPC
that can communicate with the cluster via the private IP. if you're just testing things locally and want to confirm this is your issue, you can go into the source code for cloud_provider.py
and change the line in the _create_dask_cluster
method
self.executor_kwargs["address"] = self.cluster.scheduler.address
to
self.executor_kwargs["address"] = self.cluster.scheduler_address
and your flow should execute without issue. i'm a new prefect user, so i'm still a little shaky on the environment/executor/agent distinction and some of the above may be wrong, but hopefully this helps prevent others from wasting a few hours of their time like i did : )