Hi, I want to run a flow on a remote dask-worker u...
# prefect-community
m
Hi, I want to run a flow on a remote dask-worker using the
dask-resource
in
tags
as argument for
@task
:
Copy code
running_server = "name_of_remote_server"
resource_tag = "dask-resource:{}=1".format(running_server)
@task(log_stdout=True, state_handlers=[email_on_failure], tags=[resource_tag])
def test_task():
    print(1/0)
When I started the remote dask-worker, I used command like this:
Copy code
dask-worker tcp://<address of local dask-scheduler>:8786 --nprocs 4 --nthreads 1 --worker-port xxx --resources "name_of_remote_server=1"
This connected local scheduler with remote worker perfectly. However, when I actually started the flow on local machine, it failed running on the remote worker first because of
ModuleNotFoundError: No module named 'prefect'
. After I installed prefect for the remote worker, another error occurred on it:
Copy code
[2020-11-24 17:02:05+0800] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6099fcf10>: Failed to establish a new connection: [Errno 111] Connection refused'))"))
Traceback (most recent call last):
  File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
...
2020-11-24 17:02:05+0800] INFO - prefect.CloudTaskRunner | Task 'test_task': Finished task run for task with final state: 'ClientFailed'
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
I tried opening port 4200 of local scheduler for the remote worker, but the same error occurred anyway. Wondering what it is that I’ve been doing wrong? 😶
k
Hey @Michelle Wu, Typically when I see this error, there is a discrepancy between the backend API the CloudRunner is relaying. Can you confirm that your backend is set to your Cloud backend API on your Dask worker, which is typically accomplished with the CLI command
prefect backend cloud
? Just want to cross this off the list first, as it looks like the worker is looking towards a localhost API.
m
@Kyle Moon-Wright I’ll try this, thank you. But I want to confirm, if I want to run a flow remotely using Dask, do I need to install the whole set of Prefect on the worker machine (including Docker) ?
k
Hey Michelle, Hmm I think this depends on your cluster setup, but speaking in terms of Kubernetes - using containers to provide Prefect across the Dask client/workers will certainly make things easier. Take these Dask Cluster on Kubernetes yaml files as an example: the workers/replicas are built using a base
prefecthq/prefect:latest
image and more importantly, the dependencies match between the client and workers. I’m sure there are exceptions to this, but this is the recommended setup for a Dask cluster on K8s.
m
I’ll look into this. Thank you!