Thanks to help from the prefect team :hattip:, I h...
# prefect-community
s
Thanks to help from the prefect team hattip, I have a prefect server running in Kubernetes with the experimental helm chart. I also have dask gateway running. To connect up the bits, I need to get the prefect agent running in the cluster, and submitting jobs to dask gateway. I can get the kubernetes agent running locally against the (port-forwarded) server in K8. But: • What config do I need to point it at the cluster FQDN of the prefect server, when running in a container? • It would seem that the kubernetes agent might not be what I want anyway — as I want to submit flows to the dask gateway, not as K8 jobs. Or perhaps I can configure the agent container environment as specified here? https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#next-steps ? • I’m a bit unsure about the relation between the agent and the server. Is there any doc explaining? Is the server purely passive, and the agent is the active component? (Then what is
towel
doing?) Some explanation as to how the pieces interact (and/or how I should best go about getting them to interact in my scenario) would be great. Thanks!
👏 3
j
Hi Shaun, this is great to hear. A few assorted answers: • The agent submits your jobs (where the
FlowRunner
runs), while dask runs tasks (where the
TaskRunner
runs). So whether you want to use the k8s agent or not depends on if you want your entire flow to run in k8s, or just the tasks (if you're used to dask, you can think of this as where does your dask client live (inside or outside the cluster), while the workers managed by dask-gateway are always inside the cluster). I personally would recommend using the k8s agent, as it keeps everything uniformly in k8s. • To configure your jobs to use dask-gateway, you'd want to configure a
GatewayCluster
on your
DaskExecutor
, complete with
cluster_kwargs
to configure the cluster. Something like:
Copy code
executor = DaskExecutor(
    cluster_class=GatewayCluster,
    cluster_kwargs={
        # any args you need to pass to configure your cluster
    },
    adapt_kwargs={"minimum": min_workers, "maximum": max_workers},
)
flow.environment.executor = executor
• The server stores jobs that should be run, and never pushes anything out. Agents run elsewhere, and poll the server for any jobs they should run (the server never reaches out actively to agents). Towel is for periodic background jobs to manage server resources, you shouldn't need to worry about that. I agree, we definitely could document better here.
I'll let @josh chime in more with configuring core to work with server on k8s, since he's had it working with k8s in the past.
j
I actually have it working in k8s as we speak using the helm chart @Shaun Cutts made 😉
🚀 1
💯 1
s
@Jim Crist-Harif — Thanks for your answer. And thanks for your help, @josh 🙂 — so when the agent gets a scheduled flow from the server, the executor is already “baked in”. Can I configure with env variables rather than have to submit with cloudpickle with every flow?
My goal is to have the agent submit to dask gateway by default if nothing else is specified on flows that have been send to the server.
j
WRT the agent -> server communication: the only thing you need to ensure for core to work with the server running in k8s is that it can talk to the Apollo endpoint of the server. So you can set the API on your agent either through the `start`/`install` commands with the
--api
flag or you can just set the
PREFECT__CLOUD__API
env var to that endpoint
j
You're asking if you can configure defaults for every flow, rather than relying on configuring per-flow? If so, you can currently configure the default executor class and cluster class, but not cluster kwargs. However! Dask-gateway supports configuring default kwargs itself as part of the dask config, so you can tell prefect to use dask-gateway, then configure your defaults via dask's config in your image/environment. https://gateway.dask.org/configuration-user.html#default-configuration
The dask config you'd want to set would be
gateway.cluster.options
and
gateway.auth
, for prefect you'd want to configure
engine.executor.default_class
and
engine.executor.dask.cluster_class
All can be set via environment variables, or as static config files in your images.
s
@josh — great
PREFECT__CLOUD___API
was what I was looking for I think (though this is “server” not “cloud”). @Jim Crist-Harif — ok. So I can set that up as environment config in the agent container?
j
That would go in your flow run images, not in the agent. The agent submits new jobs, which will create executors from their local config.
j
Yeah @Shaun Cutts the config on the API is a bit confusing because it interprets the value based on some settings. So you can set
PREFECT__BACKEND
to
server
and
PREFECT___SERVER___ENDPOINT
to your location to also set it. The default backend is set to cloud so setting the CLOUD__API does the same trick. Setting the backend to server and the server endpoint is more explicit actually so you might want to go that way
s
@Jim Crist-Harif Ah — so I need to tell the agent to put that into the environment of the flow images it creates. I see it has --env config for that. Hmm…
@josh 👍
j
Oh wait, nvm, it looks like flow's always default to configuring an executor themselves when they're created (defaults to
LocalExecutor
. So the prefect config would have to go in your users environments when they're registering flows. The dask config could still go in the runtime environments.
s
Ah … hmm… would you be interested in a PR to the K8 agent that (with option) allows it to override “LocalExecutor” with another? … I guess I could just write appropriate wrappers in user environment as well …
j
That wouldn't happen on the agent, since the agent has nothing to do with executors. Hmmmm. We're currently rethinking agents and environments, so I'm a bit hesitant to add more options here right now, but we can definitely work to support this use case in the future. Is this a blocker for you currently?
s
[And just to confirm I can get the dask config to be set up in the images using --env options on the k8 agent?] As to being a blocker — I guess not. I don’t want to have to expose dask gateway credentials in the user environment. But that part I could setup using the agent, right? And then I could write a wrapper for the other parts for flow creation.
j
All the dask-gateway stuff can be statically configured by you, yes. You could do it with the
--env
option, by adding it to the default job yaml spec, or by setting the config in the images themselves - any of these would work. The only thing you'd need to ensure on the user side is that they configure flow environments that should use dask with a
DaskExecutor
with
executor_class="dask_gateway.GatewayCluster
on their environments before registering.
s
Great — thanks, Jim! I think I’ve got enough to work with (until I get snarled up)!
j
Great! Please reach out if you run into any issues, I'm happy to help. Glad dask-gateway is working for you!
s
@josh -- I think you meant PREFECT__SERVER___API (not …__ENDPOINT) as above, right? With the latter, I can’t connect. Now with the former, (set to http://prefect-server-apollo.prefect-server.svc.cluster.local:4200) I’m getting error below: … which is different than previously. Does “PREFECT__CLOUD__AGENT__AGENT_ADDRESS” have to be set — out of the box it comes out as http//8080. Is this for healthcheck?
Copy code
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/urllib3/connection.py", line 160, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/usr/local/lib/python3.6/site-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
  File "/usr/local/lib/python3.6/site-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 392, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.6/http/client.py", line 1287, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1333, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1282, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.6/http/client.py", line 1042, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.6/http/client.py", line 980, in send
    self.connect()
  File "/usr/local/lib/python3.6/site-packages/urllib3/connection.py", line 187, in connect
    conn = self._new_conn()
  File "/usr/local/lib/python3.6/site-packages/urllib3/connection.py", line 172, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f4b536d77f0>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.6/site-packages/urllib3/util/retry.py", line 439, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f4b536d77f0>: Failed to establish a new connection: [Errno 111] Connection refused',))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 8, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/click/decorators.py", line 21, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/prefect/cli/agent.py", line 295, in start
    agent_address=agent_address,
  File "/usr/local/lib/python3.6/site-packages/prefect/agent/kubernetes/agent.py", line 88, in __init__
    no_cloud_logs=no_cloud_logs,
  File "/usr/local/lib/python3.6/site-packages/prefect/agent/agent.py", line 153, in __init__
    self.client = Client(api_server=config.cloud.api, api_token=token)
  File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 124, in __init__
    tenant_info = self.graphql({"query": {"tenant": {"id"}}})
  File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 281, in graphql
    retry_on_api_error=retry_on_api_error,
  File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 237, in post
    retry_on_api_error=retry_on_api_error,
  File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 401, in _request
    session=session, method=method, url=url, params=params, headers=headers
  File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 319, in _send_request
    response = <http://session.post|session.post>(url, headers=headers, json=params, timeout=30)
  File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 578, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.6/site-packages/requests/sessions.py", line 643, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/requests/adapters.py", line 516, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f4b536d77f0>: Failed to establish a new connection: [Errno 111] Connection refused',))
@josh @Chris White — AHA — so I go it working with this config:
Copy code
- name: PREFECT__BACKEND
          value: server
        - name: PREFECT__SERVER__API
          value: <http://prefect-server-apollo.prefect-server.svc.cluster.local:4200>
        - name: PREFECT__CLOUD__API
          value: <http://prefect-server-apollo.prefect-server.svc.cluster.local:4200>
I think this means that PREFECT__BACKEND isn’t always respected…?
c
Prefect backend needs to be set to server on any client process that needs to talk to server (as opposed to cloud), so it makes sense that you’d need to set it on the agent image 👍
s
@Chris White The issue is that it needs both
PREFECT___SERVER___API
and
PREFECT___CLOUD___API
. If I specify just the former (together with
PREFECT__BACKEND=server
), it fails.
j
That’s because there is no
Copy code
PREFECT__SERVER__API
The config looks like this:
Copy code
backend = "cloud" or "server"

[server]
host = "<http://localhost>"
port = "4200"
endpoint = "${server.host}:${server.port}"

[cloud]
api = "${${backend}.endpoint}"
endpoint = "<https://api.prefect.io>"
graphql = "${cloud.api}/graphql"
Where ultimately
cloud.api
is what is used however it is interpreted off of each backend’s respective
endpoint
s
Hmm… but I tried “ENDPOINT” as well AH — so If you specify
PREFECT__SERVER__ENDPOINT
, and leave the default
PREFECT__CLOUD__API
, it doesn’t work, even when backend is set to server. But it works if
PREFECT__CLOUD__API
is not present. Ok — so that makes sense in terms of what you wrote, but is confusing if you are trying to override values from “default configuration” as I was. Perhaps better would be to have something like:
Copy code
[server]
host = "<http://localhost>"
port = "4200"
endpoint = "${server.host}:${server.port}"

[cloud]
endpoint = "<https://api.prefect.io>"

[api]
endpoint = "${${backend}.endpoint}"
graphql = "${api.endpoint}/graphql"
and use
PREFECT__API__ENDPOINT
and
PREFECT__API__GRAPHQL
as the canonical settings….
j
Yeah IIRC it was kept under
cloud.api
for some backwards compatibility reasons but I’m starting to think we can alleviate that in the near future especially because we are already doing a core version check on flows in the agent to determine which version of prefect they were registered on. aka anything under something like
0.14.0
would use the old config and so forth
s
Thanks josh!