<@ULVA73B9P> how can I adjust the prefect logger ...
# ask-marvin
r
@Marvin how can I adjust the prefect logger to format logs into JSON and add
extra
attributes per JSON log?
m
thought for 201.4 seconds
Short answer - Use Prefect’s JSON formatter and a custom logging config to emit JSON. - Add structured fields with Python logging’s
extra={...}
(works with
get_run_logger()
). - Point Prefect to your config with
PREFECT_LOGGING_SETTINGS_PATH
. Step-by-step 1) Create a logging config that uses Prefect’s JsonFormatter Save this as
logging.yml
(or
.yaml
):
Copy code
version: 1
disable_existing_loggers: false

formatters:
  json:
    '()': prefect.logging.formatters.JsonFormatter
    fmt: default   # or 'pretty' for human-readable JSON

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: json
    stream: <ext://sys.stdout>
  api:
    '()': prefect.logging.handlers.APILogHandler
    level: INFO
    # APILogHandler can still have a formatter; for console JSON only, you can omit this
    formatter: json

loggers:
  prefect:
    level: INFO
    handlers: [console, api]
    propagate: false

root:
  level: INFO
  handlers: [console]
2) Tell Prefect to use your config Set an environment variable before your flow/worker starts:
Copy code
PREFECT_LOGGING_SETTINGS_PATH=/path/to/logging.yml
3) Log with extra attributes Inside flows/tasks, use
get_run_logger()
and pass
extra={...}
. Prefect’s JSON formatter will include these fields in the JSON output.
Copy code
from prefect import flow, task, get_run_logger

@task
def process_file(path: str, attempt: int = 1):
    logger = get_run_logger()
    <http://logger.info|logger.info>(
        "Processing file",
        extra={"file": path, "attempt": attempt, "component": "ingestion"}
    )

@flow
def pipeline():
    process_file("<s3://bucket/key1>", attempt=1)
    process_file("<s3://bucket/key2>", attempt=2)

if __name__ == "__main__":
    pipeline()
4) Add the same extras to every log record (optional) - LoggerAdapter approach (merges per-call extras with a common context):
Copy code
import logging
from prefect import get_run_logger

class ContextAdapter(logging.LoggerAdapter):
    def process(self, msg, kwargs):
        extra = kwargs.get("extra", {})
        extra.update(self.extra)
        kwargs["extra"] = extra
        return msg, kwargs

@task
def run():
    base = get_run_logger()
    logger = ContextAdapter(base, {"env": "prod", "dataset": "sales"})
    <http://logger.info|logger.info>("Started")                          # has env,dataset
    logger.warning("Bad row", extra={"row": 42})    # merges row + env,dataset
- Filter approach (injects fields on every record via config): Define a filter:
Copy code
# my_filters.py
import logging

class ContextFilter(logging.Filter):
    def filter(self, record):
        record.env = "prod"
        record.dataset = "sales"
        return True
Add it to your `logging.yml`:
Copy code
filters:
  context:
    '()': my_filters.ContextFilter

handlers:
  console:
    ...
    filters: [context]
  api:
    ...
    filters: [context]
What to expect in different destinations - Console/stdout: You’ll see structured JSON with your
extra
fields included. - Prefect Cloud/API: Standard fields (message, level, timestamp, logger name, run IDs, etc.) are persisted. Custom
extra
fields may not be indexed or displayed in the UI; treat them as best-effort metadata. If you need a field to be searchable in Cloud, include it in the message as well. You can verify by sending a test log and inspecting it in the UI. Notes and links - Prefect 3.x JSON formatter: JsonFormatter API - Customizing logging in Prefect 3.x: Logging customization - Env var:
PREFECT_LOGGING_SETTINGS_PATH
points to your logging config file. Want me to tailor this for your setup? Let me know: - Where the logs should go (console only, file, Prefect Cloud, another sink) - How you’re running flows (local, worker, Docker, K8s) - Any specific extra fields you want on every record
r
@Marvin I use OSS in Kubernetes with prefect-server and prefect-worker
m
thought for 193.4 seconds
Great — for OSS on Kubernetes (Prefect Server + Kubernetes worker), here’s a practical way to emit JSON logs and attach extra fields. Assumptions - Prefect 3.x - You want JSON on stdout (for Fluent Bit/ELK etc.) and logs sent to Prefect Server as usual - You run a Kubernetes work pool that creates flow-run pods Overview - Create a logging config that uses Prefect’s JSON formatter - Point both the worker and the flow-run pods at that config via
PREFECT_LOGGING_SETTINGS_PATH
- Optionally inject common context (flow/task IDs, k8s pod/namespace) on every log line via a logging Filter, and/or pass
extra={...}
in your code 1) Create a JSON logging config Save this to a file named `logging.yml`:
Copy code
version: 1
disable_existing_loggers: false

formatters:
  json:
    '()': prefect.logging.formatters.JsonFormatter
    fmt: default  # 'default' is compact JSON; use 'pretty' for readable

filters:
  context:
    '()': my_filters.ContextFilter  # optional; see step 3

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: json
    stream: <ext://sys.stdout>
    filters: [context]  # optional
  api:
    '()': prefect.logging.handlers.APILogHandler
    level: INFO
    # APILogHandler ships logs to Prefect Server; it can still have a formatter
    formatter: json
    filters: [context]  # optional

loggers:
  prefect:
    level: INFO
    handlers: [console, api]
    propagate: false

root:
  level: INFO
  handlers: [console]
2) Add an optional filter to stamp every record with runtime/k8s context Create a
my_filters.py
in your project/module (importable by your flow code):
Copy code
# my_filters.py
import logging, os

class ContextFilter(logging.Filter):
    def filter(self, record):
        # Kubernetes metadata via the Downward API (set in step 4)
        record.k8s_pod = os.getenv("POD_NAME")
        record.k8s_namespace = os.getenv("POD_NAMESPACE")

        # Prefect runtime metadata (available inside flow/task run pods)
        try:
            from prefect.runtime import flow_run, task_run, deployment
            record.flow_run_id = flow_run.id
            record.task_run_id = task_run.id
            record.deployment_name = deployment.name
        except Exception:
            # not in a run context
            record.flow_run_id = None
            record.task_run_id = None
            record.deployment_name = None

        return True
Notes - The filter is optional — you can skip it and only use
extra={...}
in your logs. - The module path in
logging.yml
must match where this file lives (e.g.,
my_project.logging.my_filters.ContextFilter
). 3) Use
extra={...}
in your flow/task code This works with
get_run_logger()
and JsonFormatter will include your fields.
Copy code
from prefect import flow, task, get_run_logger

@task
def process_obj(obj_id: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Processing object", extra={"object_id": obj_id, "component": "ingestion"})

@flow
def main():
    process_obj("123")
    process_obj("456")

if __name__ == "__main__":
    main()
4) Wire it up in Kubernetes for flow-run pods (work pool base job template) - Create a ConfigMap with your `logging.yml`:
Copy code
apiVersion: v1
kind: ConfigMap
metadata:
  name: prefect-logging-config
data:
  logging.yml: |
    # paste the exact contents from step 1 here
- Export your Kubernetes work pool’s base job template, edit it to: - Mount the ConfigMap at
/etc/prefect-logging
- Set
PREFECT_LOGGING_SETTINGS_PATH=/etc/prefect-logging/logging.yml
- Inject
POD_NAME
and
POD_NAMESPACE
from the Downward API (for the filter) Commands to get/update the template:
Copy code
prefect work-pool get-default-base-job-template --type kubernetes > template.json
# or export your existing pool's template:
# prefect work-pool inspect <POOL_NAME>  # to see current config

# edit template.json accordingly, then:
prefect work-pool update "<POOL_NAME>" --base-job-template template.json
In your
template.json
, add something like the following under the container spec for the flow-run pod (the exact nesting depends on your current template — if you paste yours here, I can annotate precisely):
Copy code
{
  "job": {
    "template": {
      "spec": {
        "template": {
          "spec": {
            "volumes": [
              {
                "name": "prefect-logging-config",
                "configMap": {
                  "name": "prefect-logging-config"
                }
              }
            ],
            "containers": [
              {
                "name": "prefect-job",
                "env": [
                  {
                    "name": "PREFECT_LOGGING_SETTINGS_PATH",
                    "value": "/etc/prefect-logging/logging.yml"
                  },
                  {
                    "name": "POD_NAME",
                    "valueFrom": {
                      "fieldRef": {
                        "fieldPath": "metadata.name"
                      }
                    }
                  },
                  {
                    "name": "POD_NAMESPACE",
                    "valueFrom": {
                      "fieldRef": {
                        "fieldPath": "metadata.namespace"
                      }
                    }
                  }
                ],
                "volumeMounts": [
                  {
                    "name": "prefect-logging-config",
                    "mountPath": "/etc/prefect-logging"
                  }
                ]
              }
            ]
          }
        }
      }
    }
  }
}
5) Don’t forget the worker itself (optional but recommended) If you want the worker’s own logs (not just the flow-run pods) to be JSON too: - Mount the same ConfigMap into the worker Deployment/Helm values - Set
PREFECT_LOGGING_SETTINGS_PATH=/etc/prefect-logging/logging.yml
in the worker container env - Optionally inject `POD_NAME`/`POD_NAMESPACE` there as well 6) What you’ll see where - stdout in pods: JSON logs with both your
extra
and the filter fields (good for log shipping) - Prefect Server UI: standard fields (message, level, time, logger name, run IDs, etc.) are displayed. Custom
extra
fields may not be shown/searchable in the UI, but they’re still in stdout for your log pipeline. References - Logging customization (Prefect 3): Logging customization - JsonFormatter API: JsonFormatter API - Work pool CLI (verify options): - prefect work-pool get-default-base-job-template --help - prefect work-pool update --help Want me to tailor the JSON fields or annotate your current
template.json
? If you paste your pool’s base job template or Helm values for the worker, I’ll mark the exact spots to add env, volumes, and mounts.