Tom Han
10/12/2025, 6:48 AMMarvin
10/12/2025, 6:48 AMMarvin
10/12/2025, 6:52 AMenv:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: PREFECT__WORKER_NAME
value: my-k8s-worker-1
- Docker/Process workers: set environment variables in the work pool’s base job template (e.g., WORKER_NAME). For Docker, HOSTNAME
typically maps to the container id automatically.
2) At the start of the flow, read those values and update the flow run’s tags once:
from prefect import flow, get_client
from prefect.client.schemas.actions import FlowRunUpdate
from prefect.runtime.flow_run import id as flow_run_id
import os
import platform
import socket
@flow
async def my_flow():
# build new tags from runtime info
new_info_tags = []
# worker/pod/container identifiers
if pod := os.getenv("POD_NAME"):
new_info_tags.append(f"pod:{pod}")
if worker := os.getenv("PREFECT__WORKER_NAME"):
new_info_tags.append(f"worker:{worker}")
# machine/platform info
new_info_tags.append(f"arch:{platform.machine()}")
new_info_tags.append(f"host:{socket.gethostname()}")
# read + merge with existing tags
async with get_client() as client:
fr = await client.read_flow_run(flow_run_id)
merged = sorted(set((fr.tags or []) + new_info_tags))
await client.update_flow_run(flow_run_id, FlowRunUpdate(tags=merged))
# ...rest of your flow...
Notes
- Do the update once per run (ideally right at the beginning) to minimize API chatter, especially since you’ve seen disconnects under load.
- You can also set default tags at deployment time, then append runtime info as shown above.
- prefect.runtime.flow_run.job_variables()
is available inside the flow but typically won’t include pod-specific details unless you set them via the job template. Docs: Customize job variables
- For reference, the API supports updating tags via `FlowRunUpdate`: Flow runs API
Follow-up question
- Which worker type(s) are you running (Kubernetes, Docker, Process)? I can share a precise job template/env setup to surface the right pod/host/arch details for your environment.