rania3
09/17/2025, 7:23 AMextra attributes per JSON log?Marvin
09/17/2025, 7:23 AMMarvin
09/17/2025, 7:26 AMextra={...} (works with get_run_logger()).
- Point Prefect to your config with PREFECT_LOGGING_SETTINGS_PATH.
Step-by-step
1) Create a logging config that uses Prefect’s JsonFormatter
Save this as logging.yml (or .yaml):
version: 1
disable_existing_loggers: false
formatters:
json:
'()': prefect.logging.formatters.JsonFormatter
fmt: default # or 'pretty' for human-readable JSON
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: json
stream: <ext://sys.stdout>
api:
'()': prefect.logging.handlers.APILogHandler
level: INFO
# APILogHandler can still have a formatter; for console JSON only, you can omit this
formatter: json
loggers:
prefect:
level: INFO
handlers: [console, api]
propagate: false
root:
level: INFO
handlers: [console]
2) Tell Prefect to use your config
Set an environment variable before your flow/worker starts:
PREFECT_LOGGING_SETTINGS_PATH=/path/to/logging.yml
3) Log with extra attributes
Inside flows/tasks, use get_run_logger() and pass extra={...}. Prefect’s JSON formatter will include these fields in the JSON output.
from prefect import flow, task, get_run_logger
@task
def process_file(path: str, attempt: int = 1):
logger = get_run_logger()
<http://logger.info|logger.info>(
"Processing file",
extra={"file": path, "attempt": attempt, "component": "ingestion"}
)
@flow
def pipeline():
process_file("<s3://bucket/key1>", attempt=1)
process_file("<s3://bucket/key2>", attempt=2)
if __name__ == "__main__":
pipeline()
4) Add the same extras to every log record (optional)
- LoggerAdapter approach (merges per-call extras with a common context):
import logging
from prefect import get_run_logger
class ContextAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
extra = kwargs.get("extra", {})
extra.update(self.extra)
kwargs["extra"] = extra
return msg, kwargs
@task
def run():
base = get_run_logger()
logger = ContextAdapter(base, {"env": "prod", "dataset": "sales"})
<http://logger.info|logger.info>("Started") # has env,dataset
logger.warning("Bad row", extra={"row": 42}) # merges row + env,dataset
- Filter approach (injects fields on every record via config):
Define a filter:
# my_filters.py
import logging
class ContextFilter(logging.Filter):
def filter(self, record):
record.env = "prod"
record.dataset = "sales"
return True
Add it to your `logging.yml`:
filters:
context:
'()': my_filters.ContextFilter
handlers:
console:
...
filters: [context]
api:
...
filters: [context]
What to expect in different destinations
- Console/stdout: You’ll see structured JSON with your extra fields included.
- Prefect Cloud/API: Standard fields (message, level, timestamp, logger name, run IDs, etc.) are persisted. Custom extra fields may not be indexed or displayed in the UI; treat them as best-effort metadata. If you need a field to be searchable in Cloud, include it in the message as well. You can verify by sending a test log and inspecting it in the UI.
Notes and links
- Prefect 3.x JSON formatter: JsonFormatter API
- Customizing logging in Prefect 3.x: Logging customization
- Env var: PREFECT_LOGGING_SETTINGS_PATH points to your logging config file.
Want me to tailor this for your setup? Let me know:
- Where the logs should go (console only, file, Prefect Cloud, another sink)
- How you’re running flows (local, worker, Docker, K8s)
- Any specific extra fields you want on every recordrania3
09/17/2025, 7:28 AMMarvin
09/17/2025, 7:28 AMMarvin
09/17/2025, 7:31 AMPREFECT_LOGGING_SETTINGS_PATH
- Optionally inject common context (flow/task IDs, k8s pod/namespace) on every log line via a logging Filter, and/or pass extra={...} in your code
1) Create a JSON logging config
Save this to a file named `logging.yml`:
version: 1
disable_existing_loggers: false
formatters:
json:
'()': prefect.logging.formatters.JsonFormatter
fmt: default # 'default' is compact JSON; use 'pretty' for readable
filters:
context:
'()': my_filters.ContextFilter # optional; see step 3
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: json
stream: <ext://sys.stdout>
filters: [context] # optional
api:
'()': prefect.logging.handlers.APILogHandler
level: INFO
# APILogHandler ships logs to Prefect Server; it can still have a formatter
formatter: json
filters: [context] # optional
loggers:
prefect:
level: INFO
handlers: [console, api]
propagate: false
root:
level: INFO
handlers: [console]
2) Add an optional filter to stamp every record with runtime/k8s context
Create a my_filters.py in your project/module (importable by your flow code):
# my_filters.py
import logging, os
class ContextFilter(logging.Filter):
def filter(self, record):
# Kubernetes metadata via the Downward API (set in step 4)
record.k8s_pod = os.getenv("POD_NAME")
record.k8s_namespace = os.getenv("POD_NAMESPACE")
# Prefect runtime metadata (available inside flow/task run pods)
try:
from prefect.runtime import flow_run, task_run, deployment
record.flow_run_id = flow_run.id
record.task_run_id = task_run.id
record.deployment_name = deployment.name
except Exception:
# not in a run context
record.flow_run_id = None
record.task_run_id = None
record.deployment_name = None
return True
Notes
- The filter is optional — you can skip it and only use extra={...} in your logs.
- The module path in logging.yml must match where this file lives (e.g., my_project.logging.my_filters.ContextFilter).
3) Use extra={...} in your flow/task code
This works with get_run_logger() and JsonFormatter will include your fields.
from prefect import flow, task, get_run_logger
@task
def process_obj(obj_id: str):
logger = get_run_logger()
<http://logger.info|logger.info>("Processing object", extra={"object_id": obj_id, "component": "ingestion"})
@flow
def main():
process_obj("123")
process_obj("456")
if __name__ == "__main__":
main()
4) Wire it up in Kubernetes for flow-run pods (work pool base job template)
- Create a ConfigMap with your `logging.yml`:
apiVersion: v1
kind: ConfigMap
metadata:
name: prefect-logging-config
data:
logging.yml: |
# paste the exact contents from step 1 here
- Export your Kubernetes work pool’s base job template, edit it to:
- Mount the ConfigMap at /etc/prefect-logging
- Set PREFECT_LOGGING_SETTINGS_PATH=/etc/prefect-logging/logging.yml
- Inject POD_NAME and POD_NAMESPACE from the Downward API (for the filter)
Commands to get/update the template:
prefect work-pool get-default-base-job-template --type kubernetes > template.json
# or export your existing pool's template:
# prefect work-pool inspect <POOL_NAME> # to see current config
# edit template.json accordingly, then:
prefect work-pool update "<POOL_NAME>" --base-job-template template.jsonMarvin
09/17/2025, 7:31 AMtemplate.json, add something like the following under the container spec for the flow-run pod (the exact nesting depends on your current template — if you paste yours here, I can annotate precisely):
{
"job": {
"template": {
"spec": {
"template": {
"spec": {
"volumes": [
{
"name": "prefect-logging-config",
"configMap": {
"name": "prefect-logging-config"
}
}
],
"containers": [
{
"name": "prefect-job",
"env": [
{
"name": "PREFECT_LOGGING_SETTINGS_PATH",
"value": "/etc/prefect-logging/logging.yml"
},
{
"name": "POD_NAME",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.name"
}
}
},
{
"name": "POD_NAMESPACE",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.namespace"
}
}
}
],
"volumeMounts": [
{
"name": "prefect-logging-config",
"mountPath": "/etc/prefect-logging"
}
]
}
]
}
}
}
}
}
}
5) Don’t forget the worker itself (optional but recommended)
If you want the worker’s own logs (not just the flow-run pods) to be JSON too:
- Mount the same ConfigMap into the worker Deployment/Helm values
- Set PREFECT_LOGGING_SETTINGS_PATH=/etc/prefect-logging/logging.yml in the worker container env
- Optionally inject `POD_NAME`/`POD_NAMESPACE` there as well
6) What you’ll see where
- stdout in pods: JSON logs with both your extra and the filter fields (good for log shipping)
- Prefect Server UI: standard fields (message, level, time, logger name, run IDs, etc.) are displayed. Custom extra fields may not be shown/searchable in the UI, but they’re still in stdout for your log pipeline.
References
- Logging customization (Prefect 3): Logging customization
- JsonFormatter API: JsonFormatter API
- Work pool CLI (verify options):
- prefect work-pool get-default-base-job-template --help
- prefect work-pool update --help
Want me to tailor the JSON fields or annotate your current template.json? If you paste your pool’s base job template or Helm values for the worker, I’ll mark the exact spots to add env, volumes, and mounts.