rania3
09/17/2025, 7:23 AMextra
attributes per JSON log?Marvin
09/17/2025, 7:23 AMMarvin
09/17/2025, 7:26 AMextra={...}
(works with get_run_logger()
).
- Point Prefect to your config with PREFECT_LOGGING_SETTINGS_PATH
.
Step-by-step
1) Create a logging config that uses Prefect’s JsonFormatter
Save this as logging.yml
(or .yaml
):
version: 1
disable_existing_loggers: false
formatters:
json:
'()': prefect.logging.formatters.JsonFormatter
fmt: default # or 'pretty' for human-readable JSON
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: json
stream: <ext://sys.stdout>
api:
'()': prefect.logging.handlers.APILogHandler
level: INFO
# APILogHandler can still have a formatter; for console JSON only, you can omit this
formatter: json
loggers:
prefect:
level: INFO
handlers: [console, api]
propagate: false
root:
level: INFO
handlers: [console]
2) Tell Prefect to use your config
Set an environment variable before your flow/worker starts:
PREFECT_LOGGING_SETTINGS_PATH=/path/to/logging.yml
3) Log with extra attributes
Inside flows/tasks, use get_run_logger()
and pass extra={...}
. Prefect’s JSON formatter will include these fields in the JSON output.
from prefect import flow, task, get_run_logger
@task
def process_file(path: str, attempt: int = 1):
logger = get_run_logger()
<http://logger.info|logger.info>(
"Processing file",
extra={"file": path, "attempt": attempt, "component": "ingestion"}
)
@flow
def pipeline():
process_file("<s3://bucket/key1>", attempt=1)
process_file("<s3://bucket/key2>", attempt=2)
if __name__ == "__main__":
pipeline()
4) Add the same extras to every log record (optional)
- LoggerAdapter approach (merges per-call extras with a common context):
import logging
from prefect import get_run_logger
class ContextAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
extra = kwargs.get("extra", {})
extra.update(self.extra)
kwargs["extra"] = extra
return msg, kwargs
@task
def run():
base = get_run_logger()
logger = ContextAdapter(base, {"env": "prod", "dataset": "sales"})
<http://logger.info|logger.info>("Started") # has env,dataset
logger.warning("Bad row", extra={"row": 42}) # merges row + env,dataset
- Filter approach (injects fields on every record via config):
Define a filter:
# my_filters.py
import logging
class ContextFilter(logging.Filter):
def filter(self, record):
record.env = "prod"
record.dataset = "sales"
return True
Add it to your `logging.yml`:
filters:
context:
'()': my_filters.ContextFilter
handlers:
console:
...
filters: [context]
api:
...
filters: [context]
What to expect in different destinations
- Console/stdout: You’ll see structured JSON with your extra
fields included.
- Prefect Cloud/API: Standard fields (message, level, timestamp, logger name, run IDs, etc.) are persisted. Custom extra
fields may not be indexed or displayed in the UI; treat them as best-effort metadata. If you need a field to be searchable in Cloud, include it in the message as well. You can verify by sending a test log and inspecting it in the UI.
Notes and links
- Prefect 3.x JSON formatter: JsonFormatter API
- Customizing logging in Prefect 3.x: Logging customization
- Env var: PREFECT_LOGGING_SETTINGS_PATH
points to your logging config file.
Want me to tailor this for your setup? Let me know:
- Where the logs should go (console only, file, Prefect Cloud, another sink)
- How you’re running flows (local, worker, Docker, K8s)
- Any specific extra fields you want on every recordrania3
09/17/2025, 7:28 AMMarvin
09/17/2025, 7:28 AMMarvin
09/17/2025, 7:31 AMPREFECT_LOGGING_SETTINGS_PATH
- Optionally inject common context (flow/task IDs, k8s pod/namespace) on every log line via a logging Filter, and/or pass extra={...}
in your code
1) Create a JSON logging config
Save this to a file named `logging.yml`:
version: 1
disable_existing_loggers: false
formatters:
json:
'()': prefect.logging.formatters.JsonFormatter
fmt: default # 'default' is compact JSON; use 'pretty' for readable
filters:
context:
'()': my_filters.ContextFilter # optional; see step 3
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: json
stream: <ext://sys.stdout>
filters: [context] # optional
api:
'()': prefect.logging.handlers.APILogHandler
level: INFO
# APILogHandler ships logs to Prefect Server; it can still have a formatter
formatter: json
filters: [context] # optional
loggers:
prefect:
level: INFO
handlers: [console, api]
propagate: false
root:
level: INFO
handlers: [console]
2) Add an optional filter to stamp every record with runtime/k8s context
Create a my_filters.py
in your project/module (importable by your flow code):
# my_filters.py
import logging, os
class ContextFilter(logging.Filter):
def filter(self, record):
# Kubernetes metadata via the Downward API (set in step 4)
record.k8s_pod = os.getenv("POD_NAME")
record.k8s_namespace = os.getenv("POD_NAMESPACE")
# Prefect runtime metadata (available inside flow/task run pods)
try:
from prefect.runtime import flow_run, task_run, deployment
record.flow_run_id = flow_run.id
record.task_run_id = task_run.id
record.deployment_name = deployment.name
except Exception:
# not in a run context
record.flow_run_id = None
record.task_run_id = None
record.deployment_name = None
return True
Notes
- The filter is optional — you can skip it and only use extra={...}
in your logs.
- The module path in logging.yml
must match where this file lives (e.g., my_project.logging.my_filters.ContextFilter
).
3) Use extra={...}
in your flow/task code
This works with get_run_logger()
and JsonFormatter will include your fields.
from prefect import flow, task, get_run_logger
@task
def process_obj(obj_id: str):
logger = get_run_logger()
<http://logger.info|logger.info>("Processing object", extra={"object_id": obj_id, "component": "ingestion"})
@flow
def main():
process_obj("123")
process_obj("456")
if __name__ == "__main__":
main()
4) Wire it up in Kubernetes for flow-run pods (work pool base job template)
- Create a ConfigMap with your `logging.yml`:
apiVersion: v1
kind: ConfigMap
metadata:
name: prefect-logging-config
data:
logging.yml: |
# paste the exact contents from step 1 here
- Export your Kubernetes work pool’s base job template, edit it to:
- Mount the ConfigMap at /etc/prefect-logging
- Set PREFECT_LOGGING_SETTINGS_PATH=/etc/prefect-logging/logging.yml
- Inject POD_NAME
and POD_NAMESPACE
from the Downward API (for the filter)
Commands to get/update the template:
prefect work-pool get-default-base-job-template --type kubernetes > template.json
# or export your existing pool's template:
# prefect work-pool inspect <POOL_NAME> # to see current config
# edit template.json accordingly, then:
prefect work-pool update "<POOL_NAME>" --base-job-template template.json
Marvin
09/17/2025, 7:31 AMtemplate.json
, add something like the following under the container spec for the flow-run pod (the exact nesting depends on your current template — if you paste yours here, I can annotate precisely):
{
"job": {
"template": {
"spec": {
"template": {
"spec": {
"volumes": [
{
"name": "prefect-logging-config",
"configMap": {
"name": "prefect-logging-config"
}
}
],
"containers": [
{
"name": "prefect-job",
"env": [
{
"name": "PREFECT_LOGGING_SETTINGS_PATH",
"value": "/etc/prefect-logging/logging.yml"
},
{
"name": "POD_NAME",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.name"
}
}
},
{
"name": "POD_NAMESPACE",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.namespace"
}
}
}
],
"volumeMounts": [
{
"name": "prefect-logging-config",
"mountPath": "/etc/prefect-logging"
}
]
}
]
}
}
}
}
}
}
5) Don’t forget the worker itself (optional but recommended)
If you want the worker’s own logs (not just the flow-run pods) to be JSON too:
- Mount the same ConfigMap into the worker Deployment/Helm values
- Set PREFECT_LOGGING_SETTINGS_PATH=/etc/prefect-logging/logging.yml
in the worker container env
- Optionally inject `POD_NAME`/`POD_NAMESPACE` there as well
6) What you’ll see where
- stdout in pods: JSON logs with both your extra
and the filter fields (good for log shipping)
- Prefect Server UI: standard fields (message, level, time, logger name, run IDs, etc.) are displayed. Custom extra
fields may not be shown/searchable in the UI, but they’re still in stdout for your log pipeline.
References
- Logging customization (Prefect 3): Logging customization
- JsonFormatter API: JsonFormatter API
- Work pool CLI (verify options):
- prefect work-pool get-default-base-job-template --help
- prefect work-pool update --help
Want me to tailor the JSON fields or annotate your current template.json
? If you paste your pool’s base job template or Helm values for the worker, I’ll mark the exact spots to add env, volumes, and mounts.