<@ULVA73B9P> In a deploymeny/flow, can the worker ...
# ask-marvin
t
@Marvin In a deploymeny/flow, can the worker tag the flow that it is currently running? E.g. I want a way to categorize flowruns based on the worker that executed them (e.g. their pod ID and/or arch).
m
thought for 270.8 seconds
Short answer: there isn’t a built-in “worker auto-tagging” feature in Prefect 3.x. However, you can have the flow tag itself at runtime based on information the worker injects (env vars, job variables, etc.). This gives you searchable/filterable tags like pod ID or architecture on each flow run. How to do it 1) Expose worker/pod info to the flow: - Kubernetes worker: inject env vars via the Downward API in your work pool’s job template
Copy code
env:
  - name: POD_NAME
    valueFrom:
      fieldRef:
        fieldPath: metadata.name
  - name: PREFECT__WORKER_NAME
    value: my-k8s-worker-1
- Docker/Process workers: set environment variables in the work pool’s base job template (e.g., WORKER_NAME). For Docker,
HOSTNAME
typically maps to the container id automatically. 2) At the start of the flow, read those values and update the flow run’s tags once:
Copy code
from prefect import flow, get_client
from prefect.client.schemas.actions import FlowRunUpdate
from prefect.runtime.flow_run import id as flow_run_id
import os
import platform
import socket

@flow
async def my_flow():
    # build new tags from runtime info
    new_info_tags = []

    # worker/pod/container identifiers
    if pod := os.getenv("POD_NAME"):
        new_info_tags.append(f"pod:{pod}")
    if worker := os.getenv("PREFECT__WORKER_NAME"):
        new_info_tags.append(f"worker:{worker}")

    # machine/platform info
    new_info_tags.append(f"arch:{platform.machine()}")
    new_info_tags.append(f"host:{socket.gethostname()}")

    # read + merge with existing tags
    async with get_client() as client:
        fr = await client.read_flow_run(flow_run_id)
        merged = sorted(set((fr.tags or []) + new_info_tags))
        await client.update_flow_run(flow_run_id, FlowRunUpdate(tags=merged))

    # ...rest of your flow...
Notes - Do the update once per run (ideally right at the beginning) to minimize API chatter, especially since you’ve seen disconnects under load. - You can also set default tags at deployment time, then append runtime info as shown above. -
prefect.runtime.flow_run.job_variables()
is available inside the flow but typically won’t include pod-specific details unless you set them via the job template. Docs: Customize job variables - For reference, the API supports updating tags via `FlowRunUpdate`: Flow runs API Follow-up question - Which worker type(s) are you running (Kubernetes, Docker, Process)? I can share a precise job template/env setup to surface the right pod/host/arch details for your environment.