Shaun Cutts
09/02/2020, 4:07 PMtowel
doing?) Some explanation as to how the pieces interact (and/or how I should best go about getting them to interact in my scenario) would be great.
Thanks!Jim Crist-Harif
09/02/2020, 4:18 PMFlowRunner
runs), while dask runs tasks (where the TaskRunner
runs). So whether you want to use the k8s agent or not depends on if you want your entire flow to run in k8s, or just the tasks (if you're used to dask, you can think of this as where does your dask client live (inside or outside the cluster), while the workers managed by dask-gateway are always inside the cluster). I personally would recommend using the k8s agent, as it keeps everything uniformly in k8s.
• To configure your jobs to use dask-gateway, you'd want to configure a GatewayCluster
on your DaskExecutor
, complete with cluster_kwargs
to configure the cluster. Something like:
executor = DaskExecutor(
cluster_class=GatewayCluster,
cluster_kwargs={
# any args you need to pass to configure your cluster
},
adapt_kwargs={"minimum": min_workers, "maximum": max_workers},
)
flow.environment.executor = executor
• The server stores jobs that should be run, and never pushes anything out. Agents run elsewhere, and poll the server for any jobs they should run (the server never reaches out actively to agents). Towel is for periodic background jobs to manage server resources, you shouldn't need to worry about that. I agree, we definitely could document better here.josh
09/02/2020, 4:20 PMShaun Cutts
09/02/2020, 4:22 PMjosh
09/02/2020, 4:24 PM--api
flag or you can just set the PREFECT__CLOUD__API
env var to that endpointJim Crist-Harif
09/02/2020, 4:25 PMgateway.cluster.options
and gateway.auth
, for prefect you'd want to configure engine.executor.default_class
and engine.executor.dask.cluster_class
Shaun Cutts
09/02/2020, 4:27 PMPREFECT__CLOUD___API
was what I was looking for I think (though this is “server” not “cloud”). @Jim Crist-Harif — ok. So I can set that up as environment config in the agent container?Jim Crist-Harif
09/02/2020, 4:29 PMjosh
09/02/2020, 4:30 PMPREFECT__BACKEND
to server
and PREFECT___SERVER___ENDPOINT
to your location to also set it. The default backend is set to cloud so setting the CLOUD__API does the same trick. Setting the backend to server and the server endpoint is more explicit actually so you might want to go that wayShaun Cutts
09/02/2020, 4:31 PMJim Crist-Harif
09/02/2020, 4:32 PMLocalExecutor
. So the prefect config would have to go in your users environments when they're registering flows. The dask config could still go in the runtime environments.Shaun Cutts
09/02/2020, 4:34 PMJim Crist-Harif
09/02/2020, 4:36 PMShaun Cutts
09/02/2020, 4:38 PMJim Crist-Harif
09/02/2020, 4:41 PM--env
option, by adding it to the default job yaml spec, or by setting the config in the images themselves - any of these would work. The only thing you'd need to ensure on the user side is that they configure flow environments that should use dask with a DaskExecutor
with executor_class="dask_gateway.GatewayCluster
on their environments before registering.Shaun Cutts
09/02/2020, 4:44 PMJim Crist-Harif
09/02/2020, 4:46 PMShaun Cutts
09/03/2020, 5:27 AMTraceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/usr/local/lib/python3.6/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/usr/local/lib/python3.6/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
self.send(msg)
File "/usr/local/lib/python3.6/http/client.py", line 980, in send
self.connect()
File "/usr/local/lib/python3.6/site-packages/urllib3/connection.py", line 187, in connect
conn = self._new_conn()
File "/usr/local/lib/python3.6/site-packages/urllib3/connection.py", line 172, in _new_conn
self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f4b536d77f0>: Failed to establish a new connection: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
timeout=timeout
File "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
File "/usr/local/lib/python3.6/site-packages/urllib3/util/retry.py", line 439, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f4b536d77f0>: Failed to establish a new connection: [Errno 111] Connection refused',))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 8, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/prefect/cli/agent.py", line 295, in start
agent_address=agent_address,
File "/usr/local/lib/python3.6/site-packages/prefect/agent/kubernetes/agent.py", line 88, in __init__
no_cloud_logs=no_cloud_logs,
File "/usr/local/lib/python3.6/site-packages/prefect/agent/agent.py", line 153, in __init__
self.client = Client(api_server=config.cloud.api, api_token=token)
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 124, in __init__
tenant_info = self.graphql({"query": {"tenant": {"id"}}})
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 281, in graphql
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 237, in post
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 401, in _request
session=session, method=method, url=url, params=params, headers=headers
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 319, in _send_request
response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 578, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 530, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 643, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.6/site-packages/requests/adapters.py", line 516, in send
raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f4b536d77f0>: Failed to establish a new connection: [Errno 111] Connection refused',))
- name: PREFECT__BACKEND
value: server
- name: PREFECT__SERVER__API
value: <http://prefect-server-apollo.prefect-server.svc.cluster.local:4200>
- name: PREFECT__CLOUD__API
value: <http://prefect-server-apollo.prefect-server.svc.cluster.local:4200>
I think this means that PREFECT__BACKEND isn’t always respected…?Chris White
09/03/2020, 5:43 AMShaun Cutts
09/03/2020, 12:35 PMPREFECT___SERVER___API
and PREFECT___CLOUD___API
. If I specify just the former (together with PREFECT__BACKEND=server
), it fails.josh
09/03/2020, 12:38 PMPREFECT__SERVER__API
The config looks like this:
backend = "cloud" or "server"
[server]
host = "<http://localhost>"
port = "4200"
endpoint = "${server.host}:${server.port}"
[cloud]
api = "${${backend}.endpoint}"
endpoint = "<https://api.prefect.io>"
graphql = "${cloud.api}/graphql"
Where ultimately cloud.api
is what is used however it is interpreted off of each backend’s respective endpoint
Shaun Cutts
09/03/2020, 12:49 PMPREFECT__SERVER__ENDPOINT
, and leave the default PREFECT__CLOUD__API
, it doesn’t work, even when backend is set to server. But it works if PREFECT__CLOUD__API
is not present.
Ok — so that makes sense in terms of what you wrote, but is confusing if you are trying to override values from “default configuration” as I was.
Perhaps better would be to have something like:
[server]
host = "<http://localhost>"
port = "4200"
endpoint = "${server.host}:${server.port}"
[cloud]
endpoint = "<https://api.prefect.io>"
[api]
endpoint = "${${backend}.endpoint}"
graphql = "${api.endpoint}/graphql"
and use PREFECT__API__ENDPOINT
and PREFECT__API__GRAPHQL
as the canonical settings….josh
09/03/2020, 1:07 PMcloud.api
for some backwards compatibility reasons but I’m starting to think we can alleviate that in the near future especially because we are already doing a core version check on flows in the agent to determine which version of prefect they were registered on. aka anything under something like 0.14.0
would use the old config and so forthShaun Cutts
09/03/2020, 1:25 PM