Miguel Moncada
05/08/2024, 9:53 AMdef deploy_hello_flow():
hello_flow.deploy(
name="hello_flow_k8s_deployment",
work_pool_name="default-worker-pool", # k8s work pool
image="europe-docker.pkg.dev/cambium-earth/prefect-runners-prod/default:latest",
tags=["prod"],
schedule=CronSchedule(cron="0 12 1 * *", timezone="UTC"),
)
Jeff Hale
05/08/2024, 5:58 PMjob_variables
param. Example code below.Jeff Hale
05/08/2024, 5:59 PM{
"variables": {
"type": "object",
"properties": {
"env": {
"type": "object",
"title": "Environment Variables",
"description": "Environment variables to set when starting a flow run.",
"additionalProperties": {
"type": "string"
}
},
"name": {
"type": "string",
"title": "Name",
"description": "Name given to infrastructure created by a worker."
},
"image": {
"type": "string",
"title": "Image",
"example": "<http://docker.io/prefecthq/prefect:2-latest|docker.io/prefecthq/prefect:2-latest>",
"description": "The image reference of a container image to use for created jobs. If not set, the latest Prefect image will be used."
},
"labels": {
"type": "object",
"title": "Labels",
"description": "Labels applied to infrastructure created by a worker.",
"additionalProperties": {
"type": "string"
}
},
"command": {
"type": "string",
"title": "Command",
"description": "The command to use when starting a flow run. In most cases, this should be left blank and the command will be automatically generated by the worker."
},
"cpu_limit": {
"type": "string",
"title": "CPU limit",
"description": "CPU limit for pod"
},
"mem_limit": {
"type": "string",
"title": "Memory limit",
"description": "Memory limit for pod"
},
"namespace": {
"type": "string",
"title": "Namespace",
"default": "default",
"description": "The Kubernetes namespace to create jobs within."
},
"cpu_request": {
"type": "string",
"title": "CPU request",
"default": "500m",
"description": "CPU request for pod"
},
"mem_request": {
"type": "string",
"title": "Memory request",
"default": "128Mi",
"description": "Memory request for pod"
},
"stream_output": {
"type": "boolean",
"title": "Stream Output",
"default": true,
"description": "If set, output will be streamed from the job to local standard output."
},
"cluster_config": {
"allOf": [
{
"$ref": "#/definitions/KubernetesClusterConfig"
}
],
"title": "Cluster Config",
"description": "The Kubernetes cluster config to use for job creation."
},
"finished_job_ttl": {
"type": "integer",
"title": "Finished Job TTL",
"description": "The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If not set, jobs will be retained indefinitely."
},
"image_pull_policy": {
"enum": [
"IfNotPresent",
"Always",
"Never"
],
"type": "string",
"title": "Image Pull Policy",
"default": "IfNotPresent",
"description": "The Kubernetes image pull policy to use for job containers."
},
"service_account_name": {
"type": "string",
"title": "Service Account Name",
"description": "The Kubernetes service account to use for job creation."
},
"job_watch_timeout_seconds": {
"type": "integer",
"title": "Job Watch Timeout Seconds",
"description": "Number of seconds to wait for each event emitted by a job before timing out. If not set, the worker will wait for each event indefinitely."
},
"pod_watch_timeout_seconds": {
"type": "integer",
"title": "Pod Watch Timeout Seconds",
"default": 60,
"description": "Number of seconds to watch for pod creation before timing out."
}
},
"definitions": {
"KubernetesClusterConfig": {
"type": "object",
"title": "KubernetesClusterConfig",
"required": [
"config",
"context_name"
],
"properties": {
"config": {
"type": "object",
"title": "Config",
"description": "The entire contents of a kubectl config file."
},
"context_name": {
"type": "string",
"title": "Context Name",
"description": "The name of the kubectl context to use."
}
},
"description": "Stores configuration for interaction with Kubernetes clusters.\n\nSee `from_file` for creation.",
"secret_fields": [],
"block_type_slug": "kubernetes-cluster-config",
"block_schema_references": {}
}
},
"description": "Default variables for the Kubernetes worker.\n\nThe schema for this class is used to populate the `variables` section of the default\nbase job template."
},
"job_configuration": {
"env": "{{ env }}",
"name": "{{ name }}",
"labels": "{{ labels }}",
"command": "{{ command }}",
"namespace": "{{ namespace }}",
"job_manifest": {
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [
{
"env": "{{ env }}",
"args": "{{ command }}",
"name": "prefect-job",
"image": "{{ image }}",
"resources": {
"limits": {
"cpu": "{{ cpu_limit }}",
"memory": "{{ mem_limit }}"
},
"requests": {
"cpu": "{{ cpu_request }}",
"memory": "{{ mem_request }}"
}
},
"imagePullPolicy": "{{ image_pull_policy }}"
}
],
"completions": 1,
"parallelism": 1,
"restartPolicy": "Never",
"serviceAccountName": "{{ service_account_name }}"
}
},
"backoffLimit": 0,
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
},
"metadata": {
"labels": "{{ labels }}",
"namespace": "{{ namespace }}",
"generateName": "{{ name }}-"
},
"apiVersion": "batch/v1"
},
"stream_output": "{{ stream_output }}",
"cluster_config": "{{ cluster_config }}",
"job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
"pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}"
}
}
Jeff Hale
05/08/2024, 6:00 PMfrom prefect import flow
@flow(log_prints=True)
def hi():
print("Hi")
if __name__ == "__main__":
hi.deploy(
job_variables={"cpu_request": "200m", "mem_request": "64Mi"},
push=False,
image="discdiver/k8s:1",
work_pool_name="k8s1",
name="k8s-resources",
)
Jeff Hale
05/08/2024, 6:01 PMMiguel Moncada
05/09/2024, 5:41 AMMiguel Moncada
05/09/2024, 6:01 AMMiguel Moncada
05/09/2024, 6:03 AMJeff Hale
05/09/2024, 12:16 PMprefect work-pool create --type kubernetes --base-job-template path_to_template
If needed in the future, you can get the base job template for the K8s work pool and adjust it with prefect work-pool get-default-base-job-template --type kubernetes
Option 2: use the Terraform Provider see: Use a JSON file to load a base job configuration here.
I didn’t see a Helm chart option.Miguel Moncada
05/09/2024, 1:07 PMMiguel Moncada
05/09/2024, 1:08 PMapiVersion: <http://argoproj.io/v1alpha1|argoproj.io/v1alpha1>
kind: Application
metadata:
name: prefect-worker
namespace: argocd
spec:
project: default
sources:
# Chart from the Prefect chart repo
- chart: prefect-worker
repoURL: <https://prefecthq.github.io/prefect-helm>
targetRevision: 2023.10.5
helm:
valueFiles:
- $values/releases/dev/prefect/worker/values.yaml
# Values from Git repository
- repoURL: <https://github.com/cambium-earth/data-canopy-argocd.git>
targetRevision: HEAD
ref: values
destination:
namespace: prefect
server: <https://kubernetes.default.svc>
syncPolicy:
automated:
selfHeal: true
prune: true
syncOptions:
- CreateNamespace=true
Jeff Hale
05/09/2024, 1:19 PMMiguel Moncada
05/09/2024, 1:35 PMvalues.yaml
there's the configuration for the work pool
## general configuration of the worker
config:
# -- the work pool that your started worker will poll.
workPool: "default-worker-pool"
# -- one or more work queue names for the worker to pull from. if not provided, the worker will pull from all work queues in the work pool
workQueues: []
# -- how often the worker will query for runs
queryInterval: 5
# -- when querying for runs, how many seconds in the future can they be scheduled
prefetchSeconds: 10
# -- connect using HTTP/2 if the server supports it (experimental)
http2: true
# -- Maximum number of flow runs to start simultaneously (default: unlimited)
limit: null
## You can set the worker type here.
## The default image includes only the type "kubernetes".
## Custom workers must be properly registered with the prefect cli.
## See the guide here: <https://docs.prefect.io/2.11.3/guides/deployment/developing-a-new-worker-type/>
# -- specify the worker type
type: kubernetes
Miguel Moncada
05/09/2024, 1:36 PMJeff Hale
05/09/2024, 1:41 PMMiguel Moncada
05/09/2024, 1:52 PMMiguel Moncada
05/09/2024, 1:54 PMMiguel Moncada
05/09/2024, 2:03 PMJeff Hale
05/09/2024, 2:38 PMMiguel Moncada
05/09/2024, 2:40 PMMiguel Moncada
05/09/2024, 2:40 PM