Billy McMonagle
08/01/2022, 2:29 PMPREFECT_API_URL
and PREFECT_API_KEY
environment variables. I am able to produce the exact error locally that I see in my Kubernetes deployment:
❯ prefect agent start kubernetes --api $PREFECT_API_URL
Starting agent connected to <https://app.prefect.cloud/account/251efcb0-78a9-4556-8783-32c643f1f3ef/workspace/49ff4325-3577-4a6a-9f13-c6cb19f6591e>...
Traceback (most recent call last):
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/cli/agent.py", line 93, in start
async with OrionAgent(
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/agent.py", line 249, in __aenter__
await self.start()
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/agent.py", line 237, in start
await self.default_infrastructure._save(is_anonymous=True)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/blocks/core.py", line 618, in _save
await self.register_type_and_schema(client=client)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/blocks/core.py", line 560, in register_type_and_schema
block_type = await client.read_block_type_by_slug(
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/client.py", line 1091, in read_block_type_by_slug
return BlockType.parse_obj(response.json())
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/httpx/_models.py", line 743, in json
return jsonlib.loads(self.text, **kwargs)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Taylor Curran
08/01/2022, 2:56 PMKubernetes Cluster Config
Block?
https://docs.prefect.io/api-ref/prefect/blocks/kubernetes/Kubernetes Job
block.Jeff Hale
08/01/2022, 3:04 PMBilly McMonagle
08/01/2022, 3:06 PMTaylor Curran
08/01/2022, 3:14 PM--- Building Blocks: The Configuration API ---
Blocks are a powerful new way to configure and share business logic. We like them so much that many other Prefect 2.0 features are built directly on top of their API, including flow and result storage, notifications, deployments, and infrastructure configuration.We are continuing to write recipes and examples that show the many different ways blocs can be implemented, so thank you for bearing with us as we get those out!
Billy McMonagle
08/01/2022, 3:31 PM❯ prefect work-queue create -t test test_queue
Traceback (most recent call last):
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/cli/work_queue.py", line 38, in create
result = await client.create_work_queue(
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/client.py", line 817, in create_work_queue
response = await <http://self._client.post|self._client.post>("/work_queues/", json=data)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/client.py", line 278, in send
response.raise_for_status()
File "/Users/william.mcmonagle/.pyenv/versions/3.8.10/lib/python3.8/site-packages/prefect/client.py", line 224, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<https://app.prefect.cloud/account/xxx/workspace/yyy/work_queues/>'
For more information check: <https://httpstatuses.com/404>
I believe the URL should reference work-queues
rather than work_queues
https://github.com/PrefectHQ/prefect/blob/main/src/prefect/client.py#L819Anna Geller
08/01/2022, 6:06 PMprefect config set PREFECT_API_URL="<http://localhost:4200/api>"
but in your case, what you need is to deploy your agent as deployment in the same K8s namespace that you want your flow runs to be deployedapiVersion: apps/v1
kind: Deployment
metadata:
name: agent
namespace: dev
spec:
selector:
matchLabels:
app: agent
replicas: 1
template:
metadata:
labels:
app: agent
spec:
containers:
- name: agent
image: prefecthq/prefect:2-python3.9
command: ["prefect", "agent", "start", "--tag", "k8s"]
imagePullPolicy: "IfNotPresent"
env:
- name: PREFECT_API_URL
value: "<https://api.prefect.cloud/api/accounts/c5276cbb-62a2-4501-b64a-74d3d900d781/workspaces/aaeffa0e-13fa-460e-a1f9-79b53c05ab36>"
- name: PREFECT_API_KEY
value: xxxx
# valueFrom:
# secretKeyRef:
# name: prefect-secrets
# key: api-key
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
name: agent
namespace: dev
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/status"]
verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: RoleBinding
metadata:
name: agent
namespace: dev
subjects:
- kind: ServiceAccount
name: default
namespace: dev
roleRef:
kind: Role
name: agent
apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
Billy McMonagle
08/01/2022, 7:20 PMapiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Release.Name }}-orion
namespace: {{ .Release.Namespace }}
labels:
<http://app.kubernetes.io/name|app.kubernetes.io/name>: {{ .Chart.Name }}
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: {{ .Release.Name }}
<http://app.kubernetes.io/component|app.kubernetes.io/component>: orion
<http://tags.datadoghq.com/service|tags.datadoghq.com/service>: {{ .Release.Name }}
spec:
replicas: 1
selector:
matchLabels:
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: {{ .Release.Name }}
<http://app.kubernetes.io/component|app.kubernetes.io/component>: orion
template:
metadata:
labels:
<http://app.kubernetes.io/name|app.kubernetes.io/name>: {{ .Chart.Name }}
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: {{ .Release.Name }}
<http://app.kubernetes.io/component|app.kubernetes.io/component>: orion
<http://tags.datadoghq.com/service|tags.datadoghq.com/service>: {{ .Release.Name }}
cloudwatch/log_group_name: {{ .Release.Name }}
spec:
containers:
- name: agent
image: {{ .Values.agent.image }}
command: ["prefect", "agent", "start", "kubernetes"]
env:
- name: PREFECT_API_KEY
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}
key: prefect_api_key
- name: PREFECT_API_URL
valueFrom:
secretKeyRef:
name: {{ .Release.Name }}
key: prefect_api_url
imagePullPolicy: Always
resources:
requests:
cpu: {{ .Values.agent.requests.cpu }}
memory: {{ .Values.agent.requests.memory }}
limits:
cpu: {{ .Values.agent.limits.cpu }}
memory: {{ .Values.agent.limits.memory }}
OrionAgent.start
method now takes the arguments default_infrastructure
and default_infrastructure_document_id
. However, the CLI doesn't seem to expose those arguments, so default_infrastructure
is set to a default of Process()
. I think this code is then trying to save the default infrastructure to a block, which it can't do because the agent doesn't have any place to store that information.
# Convert the passed default infrastructure to an id
if self.default_infrastructure and not self.default_infrastructure_document_id:
self.default_infrastructure_document_id = (
await self.default_infrastructure._save(is_anonymous=True)
)
Apologies if I'm getting some of this wrong, I'm still learning the concepts and intended usage.Anna Geller
08/01/2022, 9:03 PMBilly McMonagle
08/01/2022, 9:13 PMcommand: ["prefect", "agent", "start", "--tag", "k8s"]
to command: ["prefect", "agent", "start", "k8s"]
(named queue instead of tagged) I get the original message I posted up top of the thread.Anna Geller
08/01/2022, 9:18 PMBilly McMonagle
08/01/2022, 9:19 PMprefecthq/prefect:2-python3.9
(tried prefecthq/prefect:2.0.1-python3.9
as well but I think they point to the same image)Anna Geller
08/01/2022, 9:20 PMBilly McMonagle
08/01/2022, 9:23 PMdefault
namespace 🙂. I've copied your template and modified only the env vars to point to my workspace and the namespace.Anna Geller
08/01/2022, 9:33 PMBilly McMonagle
08/01/2022, 9:36 PM│ Starting agent connected to ││ <https://app.prefect.cloud/account/251efcb0-78a9-4556-8783-32c643f1f3ef/workspace> ││ /49ff4325-3577-4a6a-9f13-c6cb19f6591e... ││ Traceback (most recent call last): ││ File "/usr/local/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper ││ return fn(*args, **kwargs) ││ File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper ││ return run_async_in_new_loop(async_fn, *args, **kwargs) ││ File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop ││ return anyio.run(partial(__fn, *args, **kwargs)) ││ File "/usr/local/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run ││ return asynclib.run(func, *args, **backend_options) ││ File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run ││ return native_run(wrapper(), debug=debug) ││ File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run ││ return loop.run_until_complete(main) ││ File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete ││ return future.result() ││ File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper ││ return await func(*args) ││ File "/usr/local/lib/python3.9/site-packages/prefect/cli/agent.py", line 93, in start ││ async with OrionAgent( ││ File "/usr/local/lib/python3.9/site-packages/prefect/agent.py", line 249, in __aenter__
│ await self.start()
│ File "/usr/local/lib/python3.9/site-packages/prefect/agent.py", line 237, in start
│ await self.default_infrastructure._save(is_anonymous=True)
│ File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 618, in _save
│ await self.register_type_and_schema(client=client)
│ File "/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py", line 560, in register_type_and_schema
│ block_type = await client.read_block_type_by_slug(
│ File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1091, in read_block_type_by_slug
│ return BlockType.parse_obj(response.json())
│ File "/usr/local/lib/python3.9/site-packages/httpx/_models.py", line 743, in json
│ return jsonlib.loads(self.text, **kwargs)
│ File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
│ return _default_decoder.decode(s)
│ File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
│ obj, end = self.raw_decode(s, idx=_w(s, 0).end())
│ File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
│ raise JSONDecodeError("Expecting value", s, err.value) from None
│ json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
│ An exception occurred.
│ Stream closed EOF for default/agent-f49f6454-f2v8k (agent)
Anna Geller
08/01/2022, 11:36 PMprefect agent start kubernetes --api $PREFECT_API_URL --tag k8s
or rather
kubectl apply -f agentspec.yaml
the latter is how you are supposed to set this up. Not sure if I was clear about this. Sorry if notBilly McMonagle
08/01/2022, 11:37 PMAnna Geller
08/02/2022, 1:27 AMBilly McMonagle
08/02/2022, 1:34 AMAnna Geller
08/02/2022, 1:41 AMBilly McMonagle
08/02/2022, 2:54 AM│ Starting agent connected to │
│ <https://api.prefect.cloud/api/accounts/xxx/work> │
│ spaces/yyy... │
│ │
│ ___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____ │
│ | _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _| │
│ | _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | | │
│ |_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_| │
│
│ Agent started! Looking for work from queue 'kubernetes'...
Anna Geller
08/02/2022, 11:18 AM<https://api.prefect.cloud/api/accounts/uuid/workspaces/uuid>
rather than:
<https://app.prefect.cloud/api/accounts/uuid/workspaces/uuid>
terrence
08/03/2022, 5:28 PMBilly McMonagle
08/03/2022, 5:53 PM