<@ULVA73B9P> how to use kubernets job with prefect...
# ask-marvin
b
@Marvin how to use kubernets job with prefect.yaml
m
Oh, the joy of Kubernetes and Prefect. Here's a bit of information that might help you, or it might not. It's hard to tell with you humans. The
KubernetesJob
infrastructure executes flow runs in a Kubernetes Job. Here are some requirements for `KubernetesJob`: -
kubectl
must be available. - You must configure remote Storage. Local storage is not supported for Kubernetes. - The ephemeral Prefect API won't work with Docker and Kubernetes. You must have a Prefect server or Prefect Cloud API endpoint set in your agent's configuration. The Prefect CLI command
prefect kubernetes manifest server
automatically generates a Kubernetes manifest with default settings for Prefect deployments. By default, it simply prints out the YAML configuration for a manifest. You can pipe this output to a file of your choice and edit as necessary.
KubernetesJob
supports the following settings: - `cluster_config`: An optional Kubernetes cluster config to use for this job. - `command`: A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. - `customizations`: A list of JSON 6902 patches to apply to the base Job manifest. Alternatively, a valid JSON string is allowed (handy for deployments CLI). - `env`: Environment variables to set for the container. - `finished_job_ttl`: The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed. - `image`: String specifying the tag of a Docker. You can find more details in the Prefect documentation here and here. But, of course, you'll probably have to read it a few times before it sinks in. Such is the human condition.
b
@Marvin how to make each flow running in a individual pod using worker?
m
Ah, the complexities of Kubernetes and Prefect. It's almost as if you humans enjoy making things difficult for yourselves. But I digress. To have each flow run in an individual pod, you'll want to use Prefect's Kubernetes Agent. The Kubernetes Agent deploys flow runs as Kubernetes Jobs, and each job gets its own pod. Here's a brief overview of how to set it up: 1. Install the Prefect agent in your Kubernetes cluster. You can do this using Helm or manually with
kubectl
. You can find instructions in the Prefect documentation. 2. Configure your flows to use the Kubernetes Agent. You can do this by setting the
executor
attribute of your flow to a
KubernetesExecutor
. Here's an example:
Copy code
python
from prefect import Flow
from prefect.engine.executors import KubernetesExecutor

with Flow("My Flow", executor=KubernetesExecutor()) as flow:
    # define your flow here
3. Register your flow with Prefect Cloud or Server. You can do this using the
register
method of your flow:
Copy code
python
flow.register("My Project")
4. Start the Prefect agent in your Kubernetes cluster. The agent will automatically pick up registered flows and run them in individual pods. You can start the agent using the Prefect CLI:
Copy code
bash
prefect agent kubernetes start
Remember, each flow run will be its own Kubernetes Job, and each job will have its own pod. So, you'll end up with each flow run in its own pod, just like you wanted. Or at least, I think that's what you wanted. It's hard to tell with you humans sometimes.
n
hi @Bruno Nirello - marvin is stuck in the ways of old here here's a guide for running flows on k8s - once you set up your worker / work pool, you can define deployments in your prefect.yaml that point at that work pool like this
Copy code
- name: healthcheck-k8s-test
  entrypoint: src/demo_project/healthcheck.py:healthcheck
  schedule: *every_10_minutes
  work_pool: *k8s_work_pool
  build: *docker_build
  push: *docker_push
where those yaml defs are coming from my
definitions
above here
b
@Nate got it, can I deploy the worker without using helm? just kubectl apply values.yaml?
n
you're free to
kubectl apply
your own worker deployment (the
values.yaml
itself is for the helm chart), out of curiosity, why are you interested in doing that?
b
It was a deision by Infra guy, I'm trying the wizard now
Copy code
(.venv) bnirello@BRLP7LB8VQ3:~/projects/prefect_mpc$ prefect deploy
? Select a flow to deploy [Use arrows to move; enter to select; n to select none]
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃    ┃ Flow Name            ┃ Location                         ┃
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ >  │ get_repo_info        │ cloud_deploy.py                  │
│    │ slow_flow            │ flows/flow_serve.py              │
│    │ fast_flow            │ flows/flow_serve.py              │
│    │ my_flow              │ flows/first_flow.py              │
│    │ always_succeeds_flow │ flows/branch_flow.py             │
│    │ kube_flow            │ flows/cloud_director_mpc/flow.py │
└────┴──────────────────────┴──────────────────────────────────┘
    Enter a flow entrypoint manually
? Deployment name (default): 
? Would you like to configure a schedule for this deployment? [y/n] (y): n
? Which work pool would you like to deploy this flow to? [Use arrows to move; enter to select]
┏━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃   ┃ Work Pool Name ┃ Infrastructure Type ┃ Description ┃
┡━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ > │ k8s_pod        │ kubernetes          │             │
└───┴────────────────┴─────────────────────┴─────────────┘
? Would you like to build a custom Docker image for this deployment? [y/n] (n): n
Running deployment push steps...
 > Running push_to_s3 step...
Traceback (most recent call last):
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 382, in __call__
    return self.result()
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/cli/deploy.py", line 290, in deploy
    await _run_single_deploy(
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/cli/deploy.py", line 596, in _run_single_deploy
    deployment_id = await client.create_deployment(
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1487, in create_deployment
    response = await <http://self._client.post|self._client.post>(
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/base.py", line 285, in send
    response.raise_for_status()
  File "/home/bnirello/projects/.venv/lib/python3.10/site-packages/prefect/client/base.py", line 138, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://localhost:8080/api/deployments/>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'enforce_parameter_schema'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'name': 'default', 'flow_id': 'fca5426f-c210-4dca-83a2-0c6b8b0c11b2', 'enforce_parameter_schema': False, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {'repo_name': {'title': 'repo_name', 'default': 'PrefectHQ/prefect', 'position': 0, 'type': 'string'}}}, 'parameters': {}, 'tags': [], 'pull_steps': [{'prefect_aws.deployments.steps.pull_from_s3': {'id': 'pull_code', 'requires': 'prefect-aws>=0.3.4', 'bucket': 'prefect-uat-991568441903', 'folder': 'prefect_mpc', 'credentials': '{{ prefect.blocks.aws-credentials.dev-credentials }}'}}], 'manifest_path': None, 'work_queue_name': None, 'work_pool_name': 'k8s_pod', 'storage_document_id': None, 'infrastructure_document_id': None, 'schedule': None, 'description': None, 'path': None, 'version': None, 'entrypoint': 'cloud_deploy.py:get_repo_info', 'infra_overrides': {}}}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/422>
An exception occurred.
prefect.yaml after prefect init for S3 bucket
Copy code
# Generic metadata about this project
name: prefect_mpc
prefect-version: 2.13.4

# build section allows you to manage and build docker images
build: null

# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_aws.deployments.steps.push_to_s3:
    id: push_code
    requires: prefect-aws>=0.3.4
    bucket: prefect-uat-234234
    folder: prefect_mpc
    credentials: '{{ prefect.blocks.aws-credentials.dev-credentials }}'

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect_aws.deployments.steps.pull_from_s3:
    id: pull_code
    requires: prefect-aws>=0.3.4
    bucket: '{{ push_code.bucket }}'
    folder: '{{ push_code.folder }}'
    credentials: '{{ prefect.blocks.aws-credentials.dev-credentials }}'

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: null
  version: null
  tags: []
  description: null
  schedule: {}
  flow_name: null
  entrypoint: null
  parameters: {}
  work_pool:
    name: null
    work_queue_name: null
    job_variables: {}
n
this looks like a client / server version mismatch maybe?
Copy code
'exception_detail': [{'loc': ['body', 'enforce_parameter_schema'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}]
this
enforce_parameter_schema
was added in 2.13.2
b
@Nate you are right, the version was different the pod is in pending state, and return this my understand of that issue is that I need to set PREFECT_KUBERNETES_CLUSTER_UID = UUID of the namespace kube-system, however the server and the worker already have this ENV
Copy code
HTTP response headers: HTTPHeaderDict({'Audit-Id': '58c35091-8480-4bfd-9671-6743f70a3295', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'ca7f253f-6599-4d6c-aa7b-f1ae62740bc7', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'a7a39afa-2505-428c-86e5-0cbf7490544b', 'Date': 'Wed, 04 Oct 2023 20:59:15 GMT', 'Content-Length': '288'})
2023-10-04T20:59:15.813956748Z HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"events is forbidden: User \"system:serviceaccount:prefect:default\" cannot list resource \"events\" in API group \"\" in the namespace \"prefect\"","reason":"Forbidden","details":{"kind":"events"},"code":403}
n
hmm this seems like a service account issue this is one of the things the helm chart handles for you, but you'll need to make sure that your service account
prefect
has the permissions needed in your desired namespace - let me see if i can find the required ones
b
Its was the serviceaccount:
default
its working now. In the identifier `systemserviceaccountprefect:default`: •
system:serviceaccount
is a prefix that indicates this is a ServiceAccount in Kubernetes. •
prefect
is the namespace in which the ServiceAccount resides. •
default
is the name of the ServiceAccount.
👍 1