Ritesh
07/24/2025, 11:59 AMMarvin
07/24/2025, 11:59 AMMarvin
07/24/2025, 12:00 PMpython
from prefect.infrastructure.kubernetes import KubernetesJob
kubernetes_job = KubernetesJob(
    job_configuration={
        "spec": {
            "template": {
                "spec": {
                    # Option 1: Using nodeSelector
                    "nodeSelector": {
                        "node-group": "your-node-group-label"  # Use your actual node group label
                    },
                    
                    # Option 2: Using node affinity (more flexible)
                    "affinity": {
                        "nodeAffinity": {
                            "requiredDuringSchedulingIgnoredDuringExecution": {
                                "nodeSelectorTerms": [{
                                    "matchExpressions": [{
                                        "key": "node-group",
                                        "operator": "In",
                                        "values": ["your-node-group-name"]
                                    }]
                                }]
                            }
                        }
                    }
                }
            }
        }
    }
)python
from prefect import flow
from prefect.deployments import Deployment
from prefect.infrastructure.kubernetes import KubernetesJob
@flow
def my_flow():
    # your flow code here
    pass
kubernetes_job = KubernetesJob(
    job_configuration={
        "spec": {
            "template": {
                "metadata": {
                    "labels": {
                        "app": "my-flow",
                        "node-group": "specific-group"
                    }
                },
                "spec": {
                    "tolerations": [{
                        "key": "dedicated",
                        "value": "ml-workload",
                        "effect": "NoSchedule"
                    }],
                    "nodeSelector": {
                        "node-group": "ml-nodes"  # Your specific node group
                    }
                }
            }
        }
    }
)
deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my-flow-deployment",
    infrastructure=kubernetes_job
)bash
kubectl get pods -n your-namespace -o wideRitesh
07/28/2025, 7:44 AMMarvin
07/28/2025, 7:45 AMMarvin
07/28/2025, 7:46 AMbash
   prefect work-pool create "my-k8s-pool" --type kubernetesjson
   {
     "spec": {
       "template": {
         "spec": {
           "containers": [{
             "resources": {
               "requests": {
                 "memory": "512Mi",
                 "cpu": "500m"
               },
               "limits": {
                 "memory": "1Gi",
                 "cpu": "1"
               }
             }
           }]
         }
       }
     }
   }Ritesh
07/28/2025, 7:49 AMMarvin
07/28/2025, 7:50 AMMarvin
07/28/2025, 7:51 AMpython
from prefect import flow
from prefect.deployments import run_deployment
from prefect.events import Event
@flow(name="high_resource_flow")
def flow1():
    # Your high-resource computation here
    pass
@flow(name="low_resource_flow")
def flow2():
    # Your low-resource computation here
    pass
# Deploy both flows to different work pools based on resource requirements
flow1_deployment = flow1.from_source(
    source="your_repo",
    entrypoint="path/to/file.py:flow1"
).deploy(
    name="high-resource-deployment",
    work_pool_name="high-resource-pool"  # Work pool configured with higher resources
)
flow2_deployment = flow2.from_source(
    source="your_repo",
    entrypoint="path/to/file.py:flow2"
).deploy(
    name="low-resource-deployment",
    work_pool_name="low-resource-pool"  # Work pool configured with lower resources
)
@flow
def orchestrator_flow():
    # Run flow1 and wait for completion
    flow1_run = run_deployment(
        name="high-resource-deployment/flow1",
        timeout=3600  # Adjust timeout as needed
    )
    
    # After flow1 completes, run flow2
    if flow1_run.state.is_completed():
        flow2_run = run_deployment(
            name="low-resource-deployment/flow2",
            timeout=3600
        )python
from prefect.deployments import DeploymentEventTrigger
from prefect.server.schemas.triggers import DeploymentTriggerTypes
# Create a trigger for flow2 that depends on flow1's completion
trigger = DeploymentEventTrigger(
    name="flow2_trigger",
    deployment_name="low-resource-deployment/flow2",  # Target deployment to run
    event_type="prefect.flow-run.Completed",  # Trigger on completion
    source_deployment_name="high-resource-deployment/flow1"  # Source deployment to watch
)bash
# Create work pool for high-resource tasks
prefect work-pool create "high-resource-pool" --type kubernetes
# Create work pool for low-resource tasks
prefect work-pool create "low-resource-pool" --type kubernetesjson
{
  "spec": {
    "template": {
      "spec": {
        "containers": [{
          "resources": {
            "requests": {
              "memory": "8Gi",
              "cpu": "2"
            },
            "limits": {
              "memory": "16Gi",
              "cpu": "4"
            }
          }
        }]
      }
    }
  }
}json
{
  "spec": {
    "template": {
      "spec": {
        "containers": [{
          "resources": {
            "requests": {
              "memory": "512Mi",
              "cpu": "500m"
            },
            "limits": {
              "memory": "1Gi",
              "cpu": "1"
            }
          }
        }]
      }
    }
  }
}Ritesh
08/06/2025, 8:19 AMMarvin
08/06/2025, 8:19 AMMarvin
08/06/2025, 8:20 AMbash
prefect work-pool create "k8s-specific-nodes" --type kubernetesyaml
# job-template.yaml
job_configuration:
  spec:
    template:
      spec:
        nodeSelector:
          node-group: your-node-group-label  # Replace with your actual node group labelyaml
# job-template.yaml
job_configuration:
  spec:
    template:
      spec:
        affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: node-group
                      operator: In
                      values:
                        - your-node-group-namebash
prefect work-pool create "k8s-specific-nodes" --base-job-template job-template.yaml --overwritepython
from prefect import flow
@flow
def my_flow():
    # your flow code here
    pass
if __name__ == "__main__":
    my_flow.deploy(
        name="my-deployment",
        work_pool_name="k8s-specific-nodes"
    )kubectl get nodes --show-labelsRitesh
08/06/2025, 1:38 PMMarvin
08/06/2025, 1:38 PMMarvin
08/06/2025, 1:39 PMRitesh
08/06/2025, 1:40 PMMarvin
08/06/2025, 1:40 PMMarvin
08/06/2025, 1:40 PMMarvin
08/06/2025, 1:40 PMdockerfile
FROM python:3.9
RUN pip install prefect
# Add any other dependencies you need
COPY your_flows /flowsyaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-worker
spec:
  template:
    spec:
      containers:
      - name: worker
        image: your-prefect-image:tag
        command: ["prefect", "worker", "start", "--pool", "your-pool"]yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: prefect-worker
---
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
kind: Role
metadata:
  name: prefect-worker
rules:
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: ["create", "get", "list", "watch", "delete"]Marvin
08/06/2025, 1:41 PMyaml
# Example Kubernetes deployment manifest
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-worker
spec:
  template:
    spec:
      containers:
      - name: prefect-worker
        image: prefecthq/prefect:2-python3.9
        command: ["prefect", "worker", "start", "--pool", "your-work-pool"]
        env:
        - name: PREFECT_API_KEY
          valueFrom:
            secretKeyRef:
              name: prefect-api-key
              key: key
        - name: PREFECT_API_URL
          value: "<https://api.prefect.cloud/api/accounts/[YOUR-ACCOUNT-ID]/workspaces/[YOUR-WORKSPACE-ID]>"Ritesh
08/08/2025, 8:02 AMMarvin
08/08/2025, 8:02 AMMarvin
08/08/2025, 8:14 AMMarvin
08/08/2025, 8:14 AMRitesh
08/12/2025, 7:09 AMMarvin
08/12/2025, 7:09 AMMarvin
08/12/2025, 7:12 AMjob_variablesprefect work-pool get-default-base-job-template --type kubernetes -f k8s_base_job_template.jsonk8s_base_job_template.jsonresources{
  "...": "...",
  "job_configuration": {
    "job": {
      "apiVersion": "batch/v1",
      "kind": "Job",
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "name": "flow",
                "image": "{{ image }}",
                "args": [...],
                "resources": {
                  "limits": { "cpu": "1000m", "memory": "1Gi" },
                  "requests": { "cpu": "500m", "memory": "512Mi" }
                }
              }
            ],
            "restartPolicy": "Never"
          }
        }
      }
    }
  }
}# create (or update with --overwrite) a pool using this template
prefect work-pool create my-k8s-pool --type kubernetes --base-job-template k8s_base_job_template.json
# or update an existing pool
prefect work-pool update my-k8s-pool --base-job-template k8s_base_job_template.jsonresources{
  "variables": {
    "image": {"type": "string"},
    "resources": {"type": "object", "default": {}}
  },
  "job_configuration": {
    "job": {
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "name": "flow",
                "image": "{{ image }}",
                "resources": {{ resources | tojson }}
              }
            ]
          }
        }
      }
    }
  }
}from prefect import flow
@flow
def my_flow():
    ...
my_flow.from_source("<https://github.com/your/repo>").deploy(
    name="k8s-deploy",
    work_pool_name="my-k8s-pool",
    job_variables={
        "resources": {
            "limits": {"cpu": "2000m", "memory": "2Gi"},
            "requests": {"cpu": "1000m", "memory": "1Gi"}
        }
    }
)prefect deploydeployments:
  - name: k8s-deploy
    entrypoint: flows/my_flow.py:my_flow
    work_pool:
      name: my-k8s-pool
      job_variables:
        resources:
          limits:
            cpu: "2000m"
            memory: "2Gi"
          requests:
            cpu: "1000m"
            memory: "1Gi"flowresourcesDeployment.build_from_flow()prefect deployment buildflow.from_source(...).deploy(...)prefect deployRitesh
08/12/2025, 7:14 AMMarvin
08/12/2025, 7:14 AMMarvin
08/12/2025, 7:15 AMprefect[kubernetes]pip install "prefect[kubernetes]"Ritesh
08/12/2025, 7:29 AMMarvin
08/12/2025, 7:29 AMMarvin
08/12/2025, 7:31 AMprefect work-pool get-default-base-job-template --type kubernetes -f k8s_base.json
# edit k8s_base.json and add metadata.namespace (and optionally a variable){
  "variables": {
    "image": {
      "type": "string"
    },
    "namespace": {
      "type": "string",
      "default": "team-a"
    }
  },
  "job_configuration": {
    "job": {
      "apiVersion": "batch/v1",
      "kind": "Job",
      "metadata": {
        "namespace": "{{ namespace }}"
      },
      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "name": "flow",
                "image": "{{ image }}"
              }
            ],
            "restartPolicy": "Never"
          }
        }
      }
    }
  }
}prefect work-pool update my-k8s-pool --base-job-template k8s_base.jsonnamespacefrom prefect import flow
@flow
def my_flow(): ...
my_flow.from_source("<https://github.com/you/repo>").deploy(
    name="k8s-deploy",
    work_pool_name="my-k8s-pool",
    job_variables={"namespace": "team-b"}
)prefect deploydeployments:
  - name: k8s-deploy
    entrypoint: flows/my_flow.py:my_flow
    work_pool:
      name: my-k8s-pool
      job_variables:
        namespace: team-bmetadata.namespacenamespaceworker:
  enabled: true
  workPool:
    name: my-k8s-pool
    type: kubernetes
  baseJobTemplate:
    variables:
      namespace:
        type: string
        default: team-a
    job_configuration:
      job:
        apiVersion: batch/v1
        kind: Job
        metadata:
          namespace: "{{ namespace }}"
        spec:
          template:
            spec:
              serviceAccountName: prefect-worker
              containers:
                - name: flow
                  image: "{{ image }}"
              restartPolicy: Nevermetadata.namespaceRitesh
08/12/2025, 11:14 AMMarvin
08/12/2025, 11:14 AMexplore_module_offeringscheck_cli_commandexplore_module_offeringsMarvin
08/12/2025, 11:15 AMMarvin
08/12/2025, 11:15 AMjob_variablesjob_manifestimageenvnamespacejob_specjob_variables.job_spec<http://node.kubernetes.io/instance-type|node.kubernetes.io/instance-type>prefect work-pool get-default-base-job-template --type kubernetes -f base.jsonbase.jsonjob_configuration.job_manifest.spec.template.spec{
  "job_configuration": {
    "job_manifest": {
      "apiVersion": "batch/v1",
      "kind": "Job",
      "metadata": {
        "generateName": "{{ name }}-",
        "namespace": "{{ namespace }}"
      },
      "spec": {
        "template": {
          "spec": {
            "nodeSelector": {
              "<http://node.kubernetes.io/instance-type|node.kubernetes.io/instance-type>": "g4dn.xlarge"
            },
            "containers": [
              {
                "name": "prefect-job",
                "image": "{{ image }}",
                "args": "{{ command }}",
                "env": "{{ env }}",
                "resources": {
                  "requests": { "cpu": "2", "memory": "8Gi" },
                  "limits":   { "cpu": "3", "memory": "14Gi" }
                }
              }
            ],
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          }
        }
      }
    }
  },
  "variables": { ... keep existing ... }
}prefect work-pool update kube-test --base-job-template base.json{
  "variables": {
    "... keep existing ...": "...",
    "node_selector": { "type": "object", "default": {} },
    "container_resources": { "type": "object", "default": {} }
  },
  "job_configuration": {
    "job_manifest": {
      "spec": {
        "template": {
          "spec": {
            "nodeSelector": {{ node_selector | tojson }},
            "containers": [
              {
                "name": "prefect-job",
                "image": "{{ image }}",
                "args": "{{ command }}",
                "env": "{{ env }}",
                "resources": {{ container_resources | tojson }}
              }
            ],
            "restartPolicy": "Never"
          }
        }
      }
    }
  }
}work_pool:
  name: "kube-test"
  job_variables:
    namespace: "prefect3"
    node_selector:
      "<http://node.kubernetes.io/instance-type|node.kubernetes.io/instance-type>": "g4dn.xlarge"
    container_resources:
      requests:
        cpu: "2"
        memory: "8Gi"
      limits:
        cpu: "3"
        memory: "14Gi"kubectl get job -n prefect3 -l <http://prefect.io/work-queue-name=<your-queue|prefect.io/work-queue-name=<your-queue>> -o yamlresources:
  limits:
    <http://nvidia.com/gpu|nvidia.com/gpu>: "1"Ritesh
08/12/2025, 11:57 AMMarvin
08/12/2025, 11:57 AMMarvin
08/12/2025, 11:59 AMprefect work-pool get-default-base-job-template --type kubernetes -f base.json{
  "job_configuration": {
    "job_manifest": {
      "apiVersion": "batch/v1",
      "kind": "Job",
      "metadata": {
        "generateName": "{{ name }}-",
        "labels": "{{ labels }}",
        "namespace": "{{ namespace }}"
      },
      "spec": {
        "backoffLimit": 0,
        "template": {
          "spec": {
            "nodeSelector": {
              "<http://node.kubernetes.io/instance-type|node.kubernetes.io/instance-type>": "g4dn.xlarge"
            },
            "containers": [
              {
                "name": "prefect-job",
                "args": "{{ command }}",
                "env": "{{ env }}",
                "image": "{{ image }}",
                "imagePullPolicy": "{{ image_pull_policy }}",
                "resources": {
                  "requests": { "cpu": "2", "memory": "8Gi" },
                  "limits":   { "cpu": "3", "memory": "14Gi" }
                }
              }
            ],
            "parallelism": 1,
            "completions": 1,
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          }
        },
        "ttlSecondsAfterFinished": "{{ finished_job_ttl }}"
      }
    },
    "command": "{{ command }}",
    "env": "{{ env }}",
    "labels": "{{ labels }}",
    "name": "{{ name }}",
    "namespace": "{{ namespace }}",
    "cluster_config": "{{ cluster_config }}",
    "job_watch_timeout_seconds": "{{ job_watch_timeout_seconds }}",
    "pod_watch_timeout_seconds": "{{ pod_watch_timeout_seconds }}",
    "stream_output": "{{ stream_output }}"
  },
  "variables": { ... leave the existing variables as-is ... }
}prefect work-pool update kube-test --base-job-template base.jsonwork_pool:
  name: "kube-test"
  job_variables:
    namespace: "prefect3"prefect deployment run <FLOW_NAME>/<DEPLOYMENT_NAME> --watchkubectl -n prefect3 get pod -l job-name -o name
kubectl -n prefect3 get pod <pod-name> -o jsonpath='{.spec.nodeName}{"\n"}{.spec.nodeSelector}{"\n"}'
kubectl -n prefect3 get pod <pod-name> -o jsonpath="{.spec.nodeSelector['<http://node.kubernetes.io/instance-type|node.kubernetes.io/instance-type>']}{'\n'}"{
  "variables": {
    "... keep existing ...": "...",
    "node_selector": { "type": "object", "default": {} },
    "container_resources": { "type": "object", "default": {} }
  },
  "job_configuration": {
    "... keep existing fields ...": "...",
    "job_manifest": {
      "spec": {
        "template": {
          "spec": {
            "nodeSelector": {{ node_selector | tojson }},
            "containers": [
              {
                "name": "prefect-job",
                "image": "{{ image }}",
                "args": "{{ command }}",
                "env": "{{ env }}",
                "resources": {{ container_resources | tojson }}
              }
            ],
            "restartPolicy": "Never",
            "serviceAccountName": "{{ service_account_name }}"
          }
        }
      }
    }
  }
}Marvin
08/12/2025, 11:59 AMprefect work-pool update kube-test --base-job-template base.jsonwork_pool:
  name: "kube-test"
  job_variables:
    namespace: "prefect3"
    node_selector:
      "<http://node.kubernetes.io/instance-type|node.kubernetes.io/instance-type>": "g4dn.xlarge"
    container_resources:
      requests:
        cpu: "2"
        memory: "8Gi"
      limits:
        cpu: "3"
        memory: "14Gi"prefect deploy
prefect deployment run <FLOW_NAME>/<DEPLOYMENT_NAME> --watchjob_variables.job_specjob_manifestresources:
  limits:
    <http://nvidia.com/gpu|nvidia.com/gpu>: "1"Ritesh
08/12/2025, 12:30 PMMarvin
08/12/2025, 12:30 PMMarvin
08/12/2025, 12:31 PMprefect worker start --pool kube-test --name kube-test-localworker:
  enabled: true
  name: kube-test-worker
  workPool:
    name: kube-test
    type: kubernetes
  # Optionally bake in your base job template or leave it to the pool config
  baseJobTemplate: {}  # or paste your customized template here
  # Point to Prefect Cloud workspace or self-hosted server
  prefectApiUrl: "<https://api.prefect.cloud/api/accounts/<acct>/workspaces/<ws>>"  # or your server URL
  prefectApiKeySecret:
    create: true
    name: prefect-api-key
    key: key
    value: "<YOUR_API_KEY>"helm repo add prefect <https://prefecthq.github.io/prefect-helm>
helm repo update
helm upgrade --install kube-test-worker prefect/prefect-agent -f values.yaml -n prefect3 --create-namespace--pool--work-queueprefect work-pool resume kube-testprefect deployment run <FLOW>/<DEPLOYMENT> --watchRitesh
08/13/2025, 5:51 AMMarvin
08/13/2025, 5:52 AMMarvin
08/13/2025, 5:53 AMjob_variablesjob_variablesdeployments:
- name: my-flow
  work_pool:
    name: my-k8s-pool
  job_variables:
    image: myrepo/myflow:latest
    cpu: "1"
    memory: "1Gi"
    env:
      PREFECT_LOGGING_LEVEL: INFORitesh
08/13/2025, 6:53 AMMarvin
08/13/2025, 6:53 AMMarvin
08/13/2025, 6:54 AMRitesh
08/13/2025, 9:14 AMMarvin
08/13/2025, 9:14 AMMarvin
08/13/2025, 9:18 AM--limit# Local
prefect worker start -p my-k8s-pool --limit 4
# If you need to specify the type explicitly
prefect worker start -p my-k8s-pool --worker-type kubernetes --limit 4apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-k8s-worker
spec:
  replicas: 1
  template:
    spec:
      containers:
      - name: worker
        image: prefecthq/prefect:3-latest
        command: ["prefect", "worker", "start"]
        args: ["-p", "my-k8s-pool", "--limit", "4"]prefect work-pool set-concurrency-limit "my-k8s-pool" 20prefect work-queue set-concurrency-limit "queue-a" 10job_variablesRitesh
08/13/2025, 1:14 PMMarvin
08/13/2025, 1:14 PMMarvin
08/13/2025, 1:17 PMjob_variables.imagedeployments:
- name: project-a
  work_pool:
    name: my-k8s-pool
  job_variables:
    image: <http://ghcr.io/acme/project-a:2025-08-12|ghcr.io/acme/project-a:2025-08-12>
- name: project-b
  work_pool:
    name: my-k8s-pool
  job_variables:
    image: <http://ghcr.io/acme/project-b:2025-08-12|ghcr.io/acme/project-b:2025-08-12>imagePullPolicy: IfNotPresentjob_variables.pod_overridesimageimagePullPolicyenvpod_overridesprefect work-pool get-default-base-job-template --type kubernetesprefect dev build-imageprefect.yamlpod_overridesRitesh
08/14/2025, 5:32 AMMarvin
08/14/2025, 5:32 AMMarvin
08/14/2025, 5:35 AMprefectFROM mambaorg/micromamba:1.5.8
ARG MAMBA_DOCKERFILE_ACTIVATE=1
SHELL ["/bin/bash", "-lc"]
# Add env spec
COPY environment.yml /tmp/environment.yml
# Create env and install Prefect + deps
RUN micromamba create -y -n flow -f /tmp/environment.yml && \
    micromamba run -n flow pip install --no-cache-dir prefect && \
    micromamba clean --all --yes
# Use the env by default
ENV MAMBA_DEFAULT_ENV=flow
ENV PATH=/opt/conda/envs/flow/bin:$PATH
WORKDIR /opt/prefect
# No special CMD needed; Prefect will override with the flow run commandname: flow
channels:
  - conda-forge
dependencies:
  - python=3.11
  - pip
  - pip:
      - your-package
      - another-depfrom prefect import flow
# your flow(s) defined in repo...
flow.from_source(
    source="<https://github.com/your-org/your-repo>",
    entrypoint="path/to/flow.py:my_flow",
).deploy(
    name="prod",
    work_pool_name="docker-pool",
    image="<http://ghcr.io/your-org/your-image:latest|ghcr.io/your-org/your-image:latest>",
)prefect.from_source(...)Ritesh
08/17/2025, 1:24 PMMarvin
08/17/2025, 1:25 PMMarvin
08/17/2025, 1:27 PMDeploymentprefect.client.schemas.objectsFlow.from_source(...).deploy(...)prefect deployfrom prefect.client.orchestration import get_client
  async def get_dep():
      async with get_client() as client:
          dep = await client.read_deployment_by_name("flow-name/deployment-name")
          return depDeployment.build_from_flow()prefect deployment buildflow.from_source(...).deploy(...)prefect deployRitesh
08/17/2025, 2:22 PMMarvin
08/17/2025, 2:22 PMMarvin
08/17/2025, 2:23 PMprefect config unset PREFECT_API_AUTH_STRING PREFECT_API_DATABASE_CONNECTION_URL -yprefect profile inspectprefect profile ls
prefect profile use <name>unset PREFECT_API_AUTH_STRING PREFECT_API_DATABASE_CONNECTION_URLRemove-Item Env:PREFECT_API_AUTH_STRING, Env:PREFECT_API_DATABASE_CONNECTION_URL