Hendrik
10/02/2025, 8:27 AMMarvin
10/02/2025, 8:27 AMMarvin
10/02/2025, 8:30 AMprefect work-pool get-default-base-job-template --type kubernetes -f k8s-job-template.json
2) Edit k8s-job-template.json and add resources under the container in spec.template.spec.containers[0].resources. For example:
{
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "flow-runner",
"resources": {
"requests": {
"cpu": "500m",
"memory": "1Gi"
},
"limits": {
"cpu": "2",
"memory": "2Gi"
}
}
}
]
}
}
}
}
Notes:
- Keep the rest of the template content intact; just add/modify the resources block on the flow container.
- Units: use “Mi”/“Gi” for memory and millicores (“m”) for CPU where appropriate.
3) Update your work pool to use the edited template:
prefect work-pool update "<your-pool-name>" --base-job-template k8s-job-template.json
4) Run a new flow; confirm the spawned Pod has the new requests/limits:
kubectl describe pod <pod-name> -n <ns> | grep -A5 "Limits\|Requests"
If it’s the worker pod (or any Helm-managed Prefect pod) being OOMKilled:
- Increase that Deployment’s container resources in your Helm values and upgrade the release. A typical snippet looks like:
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2"
memory: "2Gi"
Apply this under the component section for the worker (or the specific service) in your values file, then:
helm upgrade <release> <chart> -n <ns> -f values.yaml
Helpful docs:
- Kubernetes worker + job template guidance: Prefect 3.x: Kubernetes deployments and workers
- Managing work pools (including base job templates): Prefect 3.x: Manage work pools
If you can share the exact pod names/namespaces that are OOMKilled, I can point you to the precise place to change resources (job template vs Helm values) and provide a tailored example.Hendrik
10/02/2025, 4:19 PMMarvin
10/02/2025, 4:19 PMMarvin
10/02/2025, 4:20 PM"resources": {
"requests": {
"cpu": "500m",
"memory": "1Gi"
},
"limits": {
"cpu": "500m",
"memory": "1Gi"
}
}
Or allow some burst while capping memory:
"resources": {
"requests": {
"cpu": "500m",
"memory": "1Gi"
},
"limits": {
"cpu": "2",
"memory": "2Gi"
}
}
Useful checks
- See what your pod actually had:
kubectl describe pod <pod> -n <ns> | grep -A5 "Limits\|Requests"
- Check for namespace defaults:
kubectl get limitrange -n <ns>
kubectl describe limitrange -n <ns>
Docs and how-tos
- Prefect 3.x Kubernetes workers and templates: Kubernetes deployments and workers
- Managing work pools (including base job templates): Manage work pools
If you want, share a kubectl describe of one of the OOMKilled pods and your namespace LimitRange (if any) and I can suggest exact request/limit values.Hendrik
10/02/2025, 4:23 PMMarvin
10/02/2025, 4:23 PMMarvin
10/02/2025, 4:26 PMprefect work-pool set-concurrency-limit "<your-pool-name>" 10
- Pick a number that fits your cluster capacity and each run’s resource requests. For example, if each run requests ~1Gi and you can comfortably run 10 in parallel, set it to 10.
- To remove it later:
prefect work-pool clear-concurrency-limit "<your-pool-name>"
Related notes
- Tag-based concurrency limits (via prefect concurrency-limit ...) throttle task runs by tag, not flow runs. They’re useful inside flows but won’t stop the worker from creating Pods in the first place.
- If all deployments are scheduled for the same minute, consider staggering their schedules slightly to reduce burstiness.
- Still set CPU/memory requests/limits in the Kubernetes Base Job Template so each Pod gets reserved resources and has a memory cap that prevents node-wide pressure.
Docs
- Work pools and concurrency: Manage work pools (Prefect 3.x)
- Kubernetes worker and job templates: Kubernetes deployments and workers (Prefect 3.x)
If you share your cluster size and typical per-run memory needs, I can help estimate a good starting concurrency limit.Hendrik
10/02/2025, 5:02 PMMarvin
10/02/2025, 5:02 PMMarvin
10/02/2025, 5:04 PMprefect deployment ls | awk '$1 ~ /\/kl_/ {print $1}' | xargs -n1 -I {} echo prefect deployment run "{}"
- Execute:
prefect deployment ls | awk '$1 ~ /\/kl_/ {print $1}' | xargs -n1 -I {} prefect deployment run "{}"
This looks for deployment names with “/kl_” (since the NAME column is flow/deployment) and runs each.
PowerShell (Windows)
- Preview:
prefect deployment ls | Select-String '/kl_' | ForEach-Object { ($_ -split '\s+')[0] } | ForEach-Object { "prefect deployment run $($_)" }
- Execute:
prefect deployment ls | Select-String '/kl_' | ForEach-Object { ($_ -split '\s+')[0] } | ForEach-Object { prefect deployment run $_ }
Notes
- Your work pool concurrency limit of 10 will ensure only 10 flow runs execute at once; the rest will queue.
- If you want to stagger starts slightly, you can add a small sleep in the loop (e.g., sh -c 'prefect deployment run "{}"; sleep 0.5' inside xargs).
Docs
- Prefect 3.x CLI: deployment
- Run deploymentsHendrik
10/02/2025, 5:11 PMMarvin
10/02/2025, 5:11 PMMarvin
10/02/2025, 5:12 PM{
"@type": "MessageCard",
"@context": "<http://schema.org/extensions>",
"themeColor": "E81123",
"summary": "Prefect flow run failed",
"title": "Flow run failed",
"sections": [
{
"activityTitle": "A flow run failed",
"facts": [
{
"name": "Flow",
"value": "{{ event.payload.flow_run.name }}"
},
{
"name": "Deployment",
"value": "{{ event.payload.deployment.name }}"
},
{
"name": "Run ID",
"value": "{{ event.payload.flow_run.id }}"
},
{
"name": "State",
"value": "{{ event.payload.state.name }}"
},
{
"name": "Time (UTC)",
"value": "{{ event.time }}"
}
]
}
],
"potentialAction": [
{
"@type": "OpenUri",
"name": "Open in Prefect",
"targets": [
{
"os": "default",
"uri": "{{ event.payload.flow_run.url }}"
}
]
}
]
}
Notes:
- The example uses templating placeholders typical for Prefect Cloud automations. If you prefer, you can start with static text and add fields incrementally.
- Docs: Prefect Cloud: Create a webhook (general webhook how-to; the Automation UI uses the same principles)
Option B — Self-hosted (no Automations)
Post to Teams from your flow when it fails. A simple, reliable pattern is to catch exceptions at the top of your flow, send the Teams webhook, then re-raise so the run is still marked failed.
Example:
import os
import httpx
from prefect import flow
TEAMS_WEBHOOK_URL = os.environ["TEAMS_WEBHOOK_URL"]
def send_teams_card(title: str, text: str, link: str = ""):
card = {
"@type": "MessageCard",
"@context": "<http://schema.org/extensions>",
"themeColor": "E81123",
"summary": title,
"title": title,
"sections": [{ "text": text }],
}
if link:
card["potentialAction"] = [{
"@type": "OpenUri",
"name": "Open in Prefect",
"targets": [{ "os": "default", "uri": link }]
}]
<http://httpx.post|httpx.post>(TEAMS_WEBHOOK_URL, json=card, timeout=10).raise_for_status()
@flow
def my_flow():
try:
# your flow logic here
...
except Exception as exc:
# optionally construct a Prefect URL to the flow run if you store it in context
send_teams_card(
title="Prefect flow run failed",
text=f"my_flow failed with: {exc}"
)
raise
- You can enrich the message with run IDs and links if you capture them (e.g., via runtime context or logging).
- Teams webhook doc: Teams incoming webhook
Which path do you want to take? If you’re on Cloud, I can give you the exact event fields to include in the Automation payload. If you’re self-hosted, I can help wire up a lightweight helper that posts rich cards (and include a link to the run in your Prefect UI).Hendrik
10/02/2025, 5:14 PMMarvin
10/02/2025, 5:14 PMMarvin
10/02/2025, 5:18 PM"spec": {
"template": {
"spec": {
"containers": [
{
"name": "flow-runner",
"env": [
{
"name": "TEAMS_WEBHOOK_URL",
"valueFrom": {
"secretKeyRef": { "name": "teams-webhook", "key": "url" }
}
},
{
"name": "PREFECT_UI_URL",
"value": "<https://your-prefect-ui.example.com>"
}
]
}
]
}
}
}
3) Add a flow failure hook to post to Teams
- The hook receives (flow, flow_run, state). Attach it with the flow decorator’s on_failure/on_crashed.
import os
import httpx
from prefect import flow
def send_teams_card(title: str, text: str, link: str | None = None):
webhook = os.environ["TEAMS_WEBHOOK_URL"]
card = {
"@type": "MessageCard",
"@context": "<http://schema.org/extensions>",
"themeColor": "E81123",
"summary": title,
"title": title,
"sections": [{ "text": text }],
}
if link:
card["potentialAction"] = [{
"@type": "OpenUri",
"name": "Open in Prefect",
"targets": [{ "os": "default", "uri": link }]
}]
r = <http://httpx.post|httpx.post>(webhook, json=card, timeout=10)
r.raise_for_status()
def notify_teams_on_failure(flow, flow_run, state):
ui_base = os.environ.get("PREFECT_UI_URL", "").rstrip("/")
run_url = f"{ui_base}/flow-runs/flow-run/{flow_run.id}" if ui_base else None
title = f"Flow failed: {flow_run.name}"
text = (
f"- Flow: {flow.name}\n"
f"- Deployment: {getattr(flow_run, 'deployment_id', '')}\n"
f"- Run ID: {flow_run.id}\n"
f"- State: {state.name}\n"
f"- Message: {state.message or ''}"
)
send_teams_card(title, text, run_url)
@flow(on_failure=notify_teams_on_failure, on_crashed=notify_teams_on_failure)
def my_flow(...):
...
Notes and tips
- You can reuse the same hook function across all flows by importing it and attaching it via the flow decorator.
- If you prefer not to use hooks, you can wrap your flow body in try/except, send the Teams message, then re-raise.
- To reduce noise, consider using retries and only notify after final failure.
- If your Base Job Template doesn’t currently inject env vars, update it and apply to your work pool:
prefect work-pool update "<your-k8s-pool>" --base-job-template k8s-job-template.json
Want me to draft a full Base Job Template snippet (with both resources and these env vars) tailored to your work pool? If you share your pool name and any existing template content, I’ll put one together.Hendrik
10/02/2025, 5:22 PMMarvin
10/02/2025, 5:22 PMMarvin
10/02/2025, 5:24 PMHendrik
10/02/2025, 5:27 PMMarvin
10/02/2025, 5:27 PMMarvin
10/02/2025, 5:33 PMprefect automation ls/inspect/pause/resume)
If you prefer the code approach for certain projects (e.g., custom cards), you can still use a flow `on_failure`/`on_crashed` hook and post to the Teams webhook URL from the flow-run pod.
Want me to sanity-check your current Automation trigger/action setup or provide a templated Teams message payload to include extra run details (links, parameters, etc.)?