Syméon del Marmol
08/26/2025, 12:53 PMMarvin
08/26/2025, 12:53 PMMarvin
08/26/2025, 12:57 PMget_run_logger()
. You can either:
- wrap Prefect’s run logger with structlog so you keep your structlog API, or
- let Prefect’s logger propagate to your root logger where your structlog/Datadog handlers live, or
- emit with your structlog logger and add Prefect’s API handler so those logs also show up in the Prefect UI.
Below are concrete options. Pick the one that matches your current setup best.
Option 1 (recommended): Wrap Prefect’s run logger with structlog
- Use the structlog API you already like
- Logs are automatically tied to the current Prefect run
- Your structlog processors (and Datadog correlation) still apply
import structlog
from prefect import flow, task
from prefect.logging import get_run_logger
from prefect.runtime import flow_run, task_run
# Configure structlog once, at import time
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
# add your ddtrace/structlog correlation processor(s) here
# structlog.processors.JSONRenderer()
],
logger_factory=structlog.stdlib.LoggerFactory(), # integrate with stdlib logging
)
@task
def my_task():
plogger = get_run_logger()
# Keep structlog API while using the Prefect run logger underneath
logger = structlog.wrap_logger(plogger).bind(
flow_run_id=str(flow_run.id),
task_run_id=str(task_run.id),
)
<http://logger.info|logger.info>("task started", extra_field=123)
@flow
def my_flow():
plogger = get_run_logger()
flogger = structlog.wrap_logger(plogger).bind(flow_run_id=str(flow_run.id))
<http://flogger.info|flogger.info>("flow started")
my_task()
if __name__ == "__main__":
my_flow()
Why this works:
- get_run_logger()
ensures logs are associated with the current run and sent to the Prefect API/UI.
- structlog.wrap_logger
preserves your processors, formatting, and Datadog trace correlation.
Option 2: Let Prefect’s logger propagate to your root structlog/Datadog handlers
- Keep calling get_run_logger()
in tasks/flows
- Configure your root logger (that structlog wraps) with your Datadog/formatting handlers
- Ensure log propagation is on (default) so Prefect’s logger bubbles up
import logging
import structlog
from prefect import flow, task
from prefect.logging import get_run_logger
# Configure root/structlog handlers as you do today (Datadog, JSON, etc.)
structlog.configure(logger_factory=structlog.stdlib.LoggerFactory())
root = logging.getLogger()
root.setLevel(<http://logging.INFO|logging.INFO>)
# root.addHandler(<your Datadog handler>) # e.g., ddtrace/logging handler
@task
def t():
logger = get_run_logger()
<http://logger.info|logger.info>("hello from task") # goes to Prefect UI and propagates to your handlers
@flow
def f():
logger = get_run_logger()
<http://logger.info|logger.info>("hello from flow")
t()
if __name__ == "__main__":
f()
Option 3: Keep using your structlog logger directly and add Prefect’s API handler
- If you’d rather not call get_run_logger()
, attach Prefect’s handler to your stdlib logger so all your structlog logs also reach Prefect’s API/UI
- Be careful not to double-log if you also use get_run_logger()
elsewhere
import logging
import structlog
from prefect.logging.handlers import APILogHandler
# your existing structlog + Datadog setup...
structlog.configure(logger_factory=structlog.stdlib.LoggerFactory())
root = logging.getLogger()
root.setLevel(<http://logging.INFO|logging.INFO>)
root.addHandler(APILogHandler()) # send all stdlib/structlog logs to Prefect
# now use your structlog logger as usual in tasks/flows
Tips and notes
- If you want Datadog to correlate logs with traces, keep your ddtrace logging/structlog correlation processor as you have it today; Prefect doesn’t interfere with that.Marvin
08/26/2025, 12:57 PMprefect.runtime.flow_run
and prefect.runtime.task_run
give you run ids and other fields to add to your structlog context.
- Avoid duplicate logs: pick one pattern (wrap run logger OR add APILogHandler to root). Using both can emit two copies.
Docs
- Prefect logging how-to: Add logging to flows and tasks
- Logging customization (handlers, formatters, config): Customize logging in Prefect
If you can share a snippet of your current structlog + Datadog setup (how you configure processors/handlers), I can tailor the minimal changes to get both Prefect UI visibility and Datadog trace correlation with no duplicates.Syméon del Marmol
08/26/2025, 1:23 PMMarvin
08/26/2025, 1:23 PMMarvin
08/26/2025, 1:24 PMget_run_logger()
returns a logger that’s bound to the current flow/task run context. That guarantees logs are attributed to the right run and show up in the Prefect UI.
- Minimal side effects: You don’t have to change global handlers or risk double-emitting logs to Prefect and Datadog. You explicitly opt-in where you need run-aware logging.
- Keeps your structlog pipeline: structlog.wrap_logger(...)
preserves your processors/Datadog correlation.
That said, you don’t have to repeat the wrapping everywhere. Here are DRY patterns that keep the benefits of Option 1:
Option A: Custom structlog LoggerFactory (no wrapping in user code)
Configure structlog so that structlog.get_logger()
returns the Prefect run logger when a run is active, and a normal stdlib logger otherwise. Then inside tasks/flows you just call structlog.get_logger()
.
import structlog
from structlog.stdlib import LoggerFactory as StdlibLoggerFactory
class PrefectRunLoggerFactory:
def __init__(self, fallback=None):
self.fallback = fallback or StdlibLoggerFactory()
def __call__(self, *args, **kwargs):
try:
# Only returns a run-aware logger when inside a Prefect run
from prefect.logging import get_run_logger
return get_run_logger()
except Exception:
# Outside any Prefect run (e.g., module import, local scripts)
return self.fallback(*args, **kwargs)
# Configure once at startup
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
# your ddtrace/structlog correlation processor(s), JSON renderer, etc.
],
logger_factory=PrefectRunLoggerFactory(),
)
# Usage anywhere (flow/task or not)
from prefect import flow, task
from prefect.runtime import flow_run, task_run
@task
def my_task():
logger = structlog.get_logger().bind(task_run_id=str(task_run.id))
<http://logger.info|logger.info>("task started")
@flow
def my_flow():
logger = structlog.get_logger().bind(flow_run_id=str(flow_run.id))
<http://logger.info|logger.info>("flow started")
my_task()
if __name__ == "__main__":
my_flow()
Pros:
- Zero per-function wrapping; you keep using your existing structlog pattern.
- When a Prefect run is active, logs are run-aware and go to the Prefect UI; outside runs, your normal logging applies.
Option B: Small helper utility
Centralize the wrapping/binding so tasks/flows call a single helper.
import structlog
from prefect.logging import get_run_logger
from prefect.runtime import flow_run, task_run
def get_structlog_run_logger(**bind):
logger = structlog.wrap_logger(get_run_logger())
# Bind run ids when available
try:
bind.setdefault("flow_run_id", str(flow_run.id))
except Exception:
pass
try:
bind.setdefault("task_run_id", str(task_run.id))
except Exception:
pass
return logger.bind(**bind)
# Usage
from prefect import flow, task
@task
def my_task():
logger = get_structlog_run_logger()
<http://logger.info|logger.info>("hello from task")
@flow
def my_flow():
logger = get_structlog_run_logger()
<http://logger.info|logger.info>("hello from flow")
my_task()
Option C: Decorator that injects a logger parameter
If you like explicit dependency injection, a decorator can pass a run-aware structlog logger into your tasks/flows.
```
import functools
import structlog
from prefect.logging import get_run_logger
def inject_logger(fn):
@functools.wraps(fn)
def wrapper(*args, logger=None, **kwargs):
logger = logger or structlog.wrap_logger(get_run_logger())
return fn(*args, logger=logger, **kwargs)
return wrapper
from prefect import flow, task
@task
@inject_logger
def my_task(logger=None):
logger.info("task says hi")
@flow
@inject_logger
def my_flow(logger=None):
logger.info("flow says hi")Marvin
08/26/2025, 1:24 PMAPILogHandler
to your root/stdlib logger) works, but:
- It can capture and forward logs from libraries you didn’t intend to send to Prefect.
- It’s easier to end up with duplicates if you also use get_run_logger()
somewhere.
- You lose the explicit “this is a run-aware logger” signal in your code.
Docs for reference:
- Add logging to flows and tasks: How to add logging
- Customize logging (handlers, formatters, config): Logging customization
If you share a snippet of your current structlog + Datadog config, I can adapt the LoggerFactory approach so you don’t have to touch every task/flow.Syméon del Marmol
08/26/2025, 1:28 PMMarvin
08/26/2025, 1:33 PMMarvin
08/26/2025, 1:33 PMtask_run_id
/ flow_run_id
in each task or flow.
Option 1: structlog processor (recommended)
- Adds run IDs automatically to every log event when running under Prefect
- Works whether you use get_run_logger()
or your own structlog logger
- Safe outside of Prefect (it just skips adding fields)
import structlog
from prefect.runtime import flow_run, task_run
def add_prefect_context(logger, method, event_dict):
# Add flow/task run ids if available
try:
event_dict.setdefault("flow_run_id", str(flow_run.id))
except Exception:
pass
try:
event_dict.setdefault("task_run_id", str(task_run.id))
except Exception:
pass
return event_dict
# Configure once at startup alongside your existing processors/handlers
structlog.configure(
processors=[
add_prefect_context, # inject Prefect context
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
# your ddtrace correlation processor(s) here
# structlog.processors.JSONRenderer(),
],
logger_factory=structlog.stdlib.LoggerFactory(), # keep stdlib integration
)
# Usage in tasks/flows: just use your logger; no explicit bind needed
# If you prefer Prefect's run-aware logger, you can also wrap it:
# logger = structlog.wrap_logger(get_run_logger())
Option 2: stdlib logging Filter (if you rely on stdlib handlers/formatters)
- If your structlog setup ultimately emits via stdlib logging, attach a Filter that adds attributes to the LogRecord; your formatter or ProcessorFormatter can include them.
import logging
from prefect.runtime import flow_run, task_run
class PrefectContextFilter(logging.Filter):
def filter(self, record):
try:
record.flow_run_id = str(flow_run.id)
except Exception:
record.flow_run_id = None
try:
record.task_run_id = str(task_run.id)
except Exception:
record.task_run_id = None
return True
root = logging.getLogger()
for h in root.handlers:
h.addFilter(PrefectContextFilter())
Notes
- Prefect’s get_run_logger()
already associates logs with the correct run for the Prefect UI. The processor/filter above is for enriching your Datadog/structlog output automatically.
- You can extend the processor to include more runtime fields (e.g., prefect.runtime.flow_run.name
, deployment.id
, tags) the same way.
- Combine this with a custom LoggerFactory (from the earlier message) if you want structlog.get_logger()
to return a run-aware logger implicitly; the processor then ensures the IDs are present in every event.
Docs
- Add logging to flows and tasks: How to add logging
- Customize logging: Logging customization
If you share your current structlog + Datadog config, I can drop in the exact processor placement and make sure you don’t get duplicates.