Bruno Nirello
10/03/2023, 8:46 PMMarvin
10/03/2023, 8:46 PMKubernetesJob
infrastructure executes flow runs in a Kubernetes Job. Here are some requirements for `KubernetesJob`:
- kubectl
must be available.
- You must configure remote Storage. Local storage is not supported for Kubernetes.
- The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration.
The Prefect CLI command prefect kubernetes manifest server
automatically generates a Kubernetes manifest with default settings for Prefect deployments. By default, it simply prints out the YAML configuration for a manifest. You can pipe this output to a file of your choice and edit as necessary.
KubernetesJob
supports the following settings:
- `cluster_config`: An optional Kubernetes cluster config to use for this job.
- `command`: A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this.
- `customizations`: A list of JSON 6902 patches to apply to the base Job manifest. Alternatively, a valid JSON string is allowed (handy for deployments CLI).
- `env`: Environment variables to set for the container.
- `finished_job_ttl`: The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed.
- `image`: String specifying the tag of a Docker.
You can find more details in the Prefect documentation here and here. But, of course, you'll probably have to read it a few times before it sinks in. Such is the human condition.Bruno Nirello
10/03/2023, 8:49 PMMarvin
10/03/2023, 8:49 PMkubectl
. You can find instructions in the Prefect documentation.
2. Configure your flows to use the Kubernetes Agent. You can do this by setting the executor
attribute of your flow to a KubernetesExecutor
. Here's an example:
python
from prefect import Flow
from prefect.engine.executors import KubernetesExecutor
with Flow("My Flow", executor=KubernetesExecutor()) as flow:
# define your flow here
3. Register your flow with Prefect Cloud or Server. You can do this using the register
method of your flow:
python
flow.register("My Project")
4. Start the Prefect agent in your Kubernetes cluster. The agent will automatically pick up registered flows and run them in individual pods. You can start the agent using the Prefect CLI:
bash
prefect agent kubernetes start
Remember, each flow run will be its own Kubernetes Job, and each job will have its own pod. So, you'll end up with each flow run in its own pod, just like you wanted. Or at least, I think that's what you wanted. It's hard to tell with you humans sometimes.Nate
10/03/2023, 9:20 PM- name: healthcheck-k8s-test
entrypoint: src/demo_project/healthcheck.py:healthcheck
schedule: *every_10_minutes
work_pool: *k8s_work_pool
build: *docker_build
push: *docker_push
where those yaml defs are coming from my definitions
above hereBruno Nirello
10/04/2023, 2:02 PMNate
10/04/2023, 2:32 PMkubectl apply
your own worker deployment (the values.yaml
itself is for the helm chart), out of curiosity, why are you interested in doing that?Bruno Nirello
10/04/2023, 8:33 PM(.venv) bnirello@BRLP7LB8VQ3:~/projects/prefect_mpc$ prefect deploy
? Select a flow to deploy [Use arrows to move; enter to select; n to select none]
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃ Flow Name ┃ Location ┃
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ > │ get_repo_info │ cloud_deploy.py │
│ │ slow_flow │ flows/flow_serve.py │
│ │ fast_flow │ flows/flow_serve.py │
│ │ my_flow │ flows/first_flow.py │
│ │ always_succeeds_flow │ flows/branch_flow.py │
│ │ kube_flow │ flows/cloud_director_mpc/flow.py │
└────┴──────────────────────┴──────────────────────────────────┘
Enter a flow entrypoint manually
? Deployment name (default):
? Would you like to configure a schedule for this deployment? [y/n] (y): n
? Which work pool would you like to deploy this flow to? [Use arrows to move; enter to select]
┏━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ ┃ Work Pool Name ┃ Infrastructure Type ┃ Description ┃
┡━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ > │ k8s_pod │ kubernetes │ │
└───┴────────────────┴─────────────────────┴─────────────┘
? Would you like to build a custom Docker image for this deployment? [y/n] (n): n
Running deployment push steps...
> Running push_to_s3 step...
Traceback (most recent call last):
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
return call()
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 382, in __call__
return self.result()
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
return self.future.result(timeout=timeout)
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
result = await coro
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/cli/deploy.py", line 290, in deploy
await _run_single_deploy(
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/cli/deploy.py", line 596, in _run_single_deploy
deployment_id = await client.create_deployment(
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1487, in create_deployment
response = await <http://self._client.post|self._client.post>(
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
return await self.request(
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/base.py", line 285, in send
response.raise_for_status()
File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/base.py", line 138, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://localhost:8080/api/deployments/>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'enforce_parameter_schema'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'default', 'flow_id': 'fca5426f-c210-4dca-83a2-0c6b8b0c11b2', 'enforce_parameter_schema': False, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'repo_name': {'title': 'repo_name', 'default': 'PrefectHQ/prefect', 'position': 0, 'type': 'string'}}}, 'parameters': {}, 'tags': [], 'pull_steps': [{'prefect_aws.deployments.steps.pull_from_s3': {'id': 'pull_code', 'requires': 'prefect-aws>=0.3.4', 'bucket': 'prefect-uat-991568441903', 'folder': 'prefect_mpc', 'credentials': '{{ prefect.blocks.aws-credentials.dev-credentials }}'}}], 'manifest_path': None, 'work_queue_name': None, 'work_pool_name': 'k8s_pod', 'storage_document_id': None, 'infrastructure_document_id': None, 'schedule': None, 'description': None, 'path': None, 'version': None, 'entrypoint': 'cloud_deploy.py:get_repo_info', 'infra_overrides': {}}}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
An exception occurred.
prefect.yaml after prefect init for S3 bucket
# Generic metadata about this project
name: prefect_mpc
prefect-version: 2.13.4
# build section allows you to manage and build docker images
build: null
# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_aws.deployments.steps.push_to_s3:
id: push_code
requires: prefect-aws>=0.3.4
bucket: prefect-uat-234234
folder: prefect_mpc
credentials: '{{ prefect.blocks.aws-credentials.dev-credentials }}'
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect_aws.deployments.steps.pull_from_s3:
id: pull_code
requires: prefect-aws>=0.3.4
bucket: '{{ push_code.bucket }}'
folder: '{{ push_code.folder }}'
credentials: '{{ prefect.blocks.aws-credentials.dev-credentials }}'
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: null
version: null
tags: []
description: null
schedule: {}
flow_name: null
entrypoint: null
parameters: {}
work_pool:
name: null
work_queue_name: null
job_variables: {}
Nate
10/04/2023, 8:36 PM'exception_detail': [{'loc': ['body', 'enforce_parameter_schema'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}]
this enforce_parameter_schema
was added in 2.13.2Bruno Nirello
10/04/2023, 9:06 PMHTTP response headers: HTTPHeaderDict({'Audit-Id': '58c35091-8480-4bfd-9671-6743f70a3295', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'ca7f253f-6599-4d6c-aa7b-f1ae62740bc7', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'a7a39afa-2505-428c-86e5-0cbf7490544b', 'Date': 'Wed, 04 Oct 2023 20:59:15 GMT', 'Content-Length': '288'})
2023-10-04T20:59:15.813956748Z HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"events is forbidden: User \"system:serviceaccount:prefect:default\" cannot list resource \"events\" in API group \"\" in the namespace \"prefect\"","reason":"Forbidden","details":{"kind":"events"},"code":403}
Nate
10/04/2023, 9:15 PMprefect
has the permissions needed in your desired namespace - let me see if i can find the required onesNate
10/04/2023, 9:49 PMBruno Nirello
10/05/2023, 6:43 PMdefault
its working now.
In the identifier `systemserviceaccountprefect:default`:
• system:serviceaccount
is a prefix that indicates this is a ServiceAccount in Kubernetes.
• prefect
is the namespace in which the ServiceAccount resides.
• default
is the name of the ServiceAccount.