<@ULVA73B9P> I have a general logger built with st...
# ask-marvin
s
@Marvin I have a general logger built with structlog and integrated with datadog traces. What is the appropriate way to use my existing logger in prefect tasks and flows ?
m
thought for 218.9 seconds
Great question. In Prefect 3.x, you have a few good patterns for using an existing structlog + Datadog setup inside flows and tasks while still getting Prefect’s run-aware logging. Key idea: Prefect associates logs with the current flow/task run via
get_run_logger()
. You can either: - wrap Prefect’s run logger with structlog so you keep your structlog API, or - let Prefect’s logger propagate to your root logger where your structlog/Datadog handlers live, or - emit with your structlog logger and add Prefect’s API handler so those logs also show up in the Prefect UI. Below are concrete options. Pick the one that matches your current setup best. Option 1 (recommended): Wrap Prefect’s run logger with structlog - Use the structlog API you already like - Logs are automatically tied to the current Prefect run - Your structlog processors (and Datadog correlation) still apply
Copy code
import structlog
from prefect import flow, task
from prefect.logging import get_run_logger
from prefect.runtime import flow_run, task_run

# Configure structlog once, at import time
structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        # add your ddtrace/structlog correlation processor(s) here
        # structlog.processors.JSONRenderer()
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),  # integrate with stdlib logging
)

@task
def my_task():
    plogger = get_run_logger()
    # Keep structlog API while using the Prefect run logger underneath
    logger = structlog.wrap_logger(plogger).bind(
        flow_run_id=str(flow_run.id),
        task_run_id=str(task_run.id),
    )
    <http://logger.info|logger.info>("task started", extra_field=123)

@flow
def my_flow():
    plogger = get_run_logger()
    flogger = structlog.wrap_logger(plogger).bind(flow_run_id=str(flow_run.id))
    <http://flogger.info|flogger.info>("flow started")
    my_task()

if __name__ == "__main__":
    my_flow()
Why this works: -
get_run_logger()
ensures logs are associated with the current run and sent to the Prefect API/UI. -
structlog.wrap_logger
preserves your processors, formatting, and Datadog trace correlation. Option 2: Let Prefect’s logger propagate to your root structlog/Datadog handlers - Keep calling
get_run_logger()
in tasks/flows - Configure your root logger (that structlog wraps) with your Datadog/formatting handlers - Ensure log propagation is on (default) so Prefect’s logger bubbles up
Copy code
import logging
import structlog
from prefect import flow, task
from prefect.logging import get_run_logger

# Configure root/structlog handlers as you do today (Datadog, JSON, etc.)
structlog.configure(logger_factory=structlog.stdlib.LoggerFactory())

root = logging.getLogger()
root.setLevel(<http://logging.INFO|logging.INFO>)
# root.addHandler(<your Datadog handler>)  # e.g., ddtrace/logging handler

@task
def t():
    logger = get_run_logger()
    <http://logger.info|logger.info>("hello from task")  # goes to Prefect UI and propagates to your handlers

@flow
def f():
    logger = get_run_logger()
    <http://logger.info|logger.info>("hello from flow")
    t()

if __name__ == "__main__":
    f()
Option 3: Keep using your structlog logger directly and add Prefect’s API handler - If you’d rather not call
get_run_logger()
, attach Prefect’s handler to your stdlib logger so all your structlog logs also reach Prefect’s API/UI - Be careful not to double-log if you also use
get_run_logger()
elsewhere
Copy code
import logging
import structlog
from prefect.logging.handlers import APILogHandler

# your existing structlog + Datadog setup...
structlog.configure(logger_factory=structlog.stdlib.LoggerFactory())

root = logging.getLogger()
root.setLevel(<http://logging.INFO|logging.INFO>)
root.addHandler(APILogHandler())  # send all stdlib/structlog logs to Prefect

# now use your structlog logger as usual in tasks/flows
Tips and notes - If you want Datadog to correlate logs with traces, keep your ddtrace logging/structlog correlation processor as you have it today; Prefect doesn’t interfere with that.
- If you bind run metadata for better observability in Datadog,
prefect.runtime.flow_run
and
prefect.runtime.task_run
give you run ids and other fields to add to your structlog context. - Avoid duplicate logs: pick one pattern (wrap run logger OR add APILogHandler to root). Using both can emit two copies. Docs - Prefect logging how-to: Add logging to flows and tasks - Logging customization (handlers, formatters, config): Customize logging in Prefect If you can share a snippet of your current structlog + Datadog setup (how you configure processors/handlers), I can tailor the minimal changes to get both Prefect UI visibility and Datadog trace correlation with no duplicates.
s
@Marvin I see you recommend the option 1, but it requires to do the wrapping for each task and flow. Why do you recommend that ?
m
thought for 57.1 seconds
Great point — doing the wrap in every task/flow is a little repetitive. Why I recommended Option 1 - Correct run association:
get_run_logger()
returns a logger that’s bound to the current flow/task run context. That guarantees logs are attributed to the right run and show up in the Prefect UI. - Minimal side effects: You don’t have to change global handlers or risk double-emitting logs to Prefect and Datadog. You explicitly opt-in where you need run-aware logging. - Keeps your structlog pipeline:
structlog.wrap_logger(...)
preserves your processors/Datadog correlation. That said, you don’t have to repeat the wrapping everywhere. Here are DRY patterns that keep the benefits of Option 1: Option A: Custom structlog LoggerFactory (no wrapping in user code) Configure structlog so that
structlog.get_logger()
returns the Prefect run logger when a run is active, and a normal stdlib logger otherwise. Then inside tasks/flows you just call
structlog.get_logger()
.
Copy code
import structlog
from structlog.stdlib import LoggerFactory as StdlibLoggerFactory

class PrefectRunLoggerFactory:
    def __init__(self, fallback=None):
        self.fallback = fallback or StdlibLoggerFactory()

    def __call__(self, *args, **kwargs):
        try:
            # Only returns a run-aware logger when inside a Prefect run
            from prefect.logging import get_run_logger
            return get_run_logger()
        except Exception:
            # Outside any Prefect run (e.g., module import, local scripts)
            return self.fallback(*args, **kwargs)

# Configure once at startup
structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        # your ddtrace/structlog correlation processor(s), JSON renderer, etc.
    ],
    logger_factory=PrefectRunLoggerFactory(),
)

# Usage anywhere (flow/task or not)
from prefect import flow, task
from prefect.runtime import flow_run, task_run

@task
def my_task():
    logger = structlog.get_logger().bind(task_run_id=str(task_run.id))
    <http://logger.info|logger.info>("task started")

@flow
def my_flow():
    logger = structlog.get_logger().bind(flow_run_id=str(flow_run.id))
    <http://logger.info|logger.info>("flow started")
    my_task()

if __name__ == "__main__":
    my_flow()
Pros: - Zero per-function wrapping; you keep using your existing structlog pattern. - When a Prefect run is active, logs are run-aware and go to the Prefect UI; outside runs, your normal logging applies. Option B: Small helper utility Centralize the wrapping/binding so tasks/flows call a single helper.
Copy code
import structlog
from prefect.logging import get_run_logger
from prefect.runtime import flow_run, task_run

def get_structlog_run_logger(**bind):
    logger = structlog.wrap_logger(get_run_logger())
    # Bind run ids when available
    try:
        bind.setdefault("flow_run_id", str(flow_run.id))
    except Exception:
        pass
    try:
        bind.setdefault("task_run_id", str(task_run.id))
    except Exception:
        pass
    return logger.bind(**bind)

# Usage
from prefect import flow, task

@task
def my_task():
    logger = get_structlog_run_logger()
    <http://logger.info|logger.info>("hello from task")

@flow
def my_flow():
    logger = get_structlog_run_logger()
    <http://logger.info|logger.info>("hello from flow")
    my_task()
Option C: Decorator that injects a logger parameter If you like explicit dependency injection, a decorator can pass a run-aware structlog logger into your tasks/flows. ``` import functools import structlog from prefect.logging import get_run_logger def inject_logger(fn): @functools.wraps(fn) def wrapper(*args, logger=None, **kwargs): logger = logger or structlog.wrap_logger(get_run_logger()) return fn(*args, logger=logger, **kwargs) return wrapper from prefect import flow, task @task @inject_logger def my_task(logger=None): logger.info("task says hi") @flow @inject_logger def my_flow(logger=None): logger.info("flow says hi")
my_task() ``` If you truly want zero Prefect-specific code, Option 3 from before (attach
APILogHandler
to your root/stdlib logger) works, but: - It can capture and forward logs from libraries you didn’t intend to send to Prefect. - It’s easier to end up with duplicates if you also use
get_run_logger()
somewhere. - You lose the explicit “this is a run-aware logger” signal in your code. Docs for reference: - Add logging to flows and tasks: How to add logging - Customize logging (handlers, formatters, config): Logging customization If you share a snippet of your current structlog + Datadog config, I can adapt the LoggerFactory approach so you don’t have to touch every task/flow.
s
@Marvin Is there a way to make the binding (taks_run.id and flow_run.id) implicit when using a logger in a task ?
m
thought for 39.0 seconds
Yes—make it implicit with a structlog processor (or a stdlib logging Filter) that injects Prefect run context on every log call. That way you don’t have to bind
task_run_id
/
flow_run_id
in each task or flow. Option 1: structlog processor (recommended) - Adds run IDs automatically to every log event when running under Prefect - Works whether you use
get_run_logger()
or your own structlog logger - Safe outside of Prefect (it just skips adding fields)
Copy code
import structlog
from prefect.runtime import flow_run, task_run

def add_prefect_context(logger, method, event_dict):
    # Add flow/task run ids if available
    try:
        event_dict.setdefault("flow_run_id", str(flow_run.id))
    except Exception:
        pass
    try:
        event_dict.setdefault("task_run_id", str(task_run.id))
    except Exception:
        pass
    return event_dict

# Configure once at startup alongside your existing processors/handlers
structlog.configure(
    processors=[
        add_prefect_context,                          # inject Prefect context
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        # your ddtrace correlation processor(s) here
        # structlog.processors.JSONRenderer(),
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),  # keep stdlib integration
)

# Usage in tasks/flows: just use your logger; no explicit bind needed
# If you prefer Prefect's run-aware logger, you can also wrap it:
# logger = structlog.wrap_logger(get_run_logger())
Option 2: stdlib logging Filter (if you rely on stdlib handlers/formatters) - If your structlog setup ultimately emits via stdlib logging, attach a Filter that adds attributes to the LogRecord; your formatter or ProcessorFormatter can include them.
Copy code
import logging
from prefect.runtime import flow_run, task_run

class PrefectContextFilter(logging.Filter):
    def filter(self, record):
        try:
            record.flow_run_id = str(flow_run.id)
        except Exception:
            record.flow_run_id = None
        try:
            record.task_run_id = str(task_run.id)
        except Exception:
            record.task_run_id = None
        return True

root = logging.getLogger()
for h in root.handlers:
    h.addFilter(PrefectContextFilter())
Notes - Prefect’s
get_run_logger()
already associates logs with the correct run for the Prefect UI. The processor/filter above is for enriching your Datadog/structlog output automatically. - You can extend the processor to include more runtime fields (e.g.,
prefect.runtime.flow_run.name
,
deployment.id
, tags) the same way. - Combine this with a custom LoggerFactory (from the earlier message) if you want
structlog.get_logger()
to return a run-aware logger implicitly; the processor then ensures the IDs are present in every event. Docs - Add logging to flows and tasks: How to add logging - Customize logging: Logging customization If you share your current structlog + Datadog config, I can drop in the exact processor placement and make sure you don’t get duplicates.