Does anyone have know how to integrate Loguru with...
# ask-community
m
Does anyone have know how to integrate Loguru with Prefect? I don't want to use
log_prints=True
, because then all messages will be logged as
INFO
. I want to use Loguru instead of
get_run_logger
because my code is calling out to internal libraries that log messages with Loguru and provide contextual information using
.bind
, and I need these messages to appear as well in the Prefect UI.
b
Hi Morten, this may help you out: • a custom function for enabling loguru logs. • an example of using that function in the context of a flow task, and custom functions.
m
I did look into this, but quickly ran into issues with adding and removing handlers on the fly in the context of subflows :/
b
What were the issues, if you don't mind sharing? šŸ‘€
m
Sure, let me grab the example I wrote quickly! šŸ™‚
Welp, naturally everything works when I rewrite the example.. šŸ‘€ Given the following set of tasks, flows and subflows:
Copy code
@forward_logs
def task_1():
    logger.bind(key="value").info("This was logged in Task 1 (normal function)")

@task
@forward_logs
def task_2():
    logger.critical("This was logged in Task 2 (prefect task)")

@forward_logs
def task_3():
    logger.warning("This was logged in Task 3 (normal function)")

@task
@forward_logs
def task_4():
    logger.error("This was logged in Task 4 inside a subflow (prefect task)")

@task
@forward_logs
def task_5():
    <http://logger.info|logger.info>("This was logged in Task 5 inside a subflow (prefect task)")

@flow
def example_flow():
    subflow()
    task_1()
    task_2()
    task_3()

@flow
def subflow():
    task_4()
    task_5()

if __name__ == "__main__":
    serve(example_flow.to_deployment(name="example-flow"))
With the following decorator:
Copy code
import functools

from loguru import logger
from prefect import flow, get_run_logger, serve, task

handler_ids = []


def forward_logs(func):
    """
    Loguru uses an anonymous logger, which prohibits us from using the
    PREFECT_LOGGING_EXTRA_LOGGERS variable to capture logs and forward them to the
    Prefect UI.

    Decorating functions at the flow-level will forward logs for all subflows, tasks and
    ordinary python functions called by that flow automatically to the Flow Run in the
    Prefect UI.

    Note that in order to have logs show up in the Task run view in the Prefect UI, that
    the Task itself must be decorated with this decorator.
    """

    # Reference:
    # <https://discourse.prefect.io/t/can-i-use-loguru-logs-in-prefect-flows/140>
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        global handler_ids

        try:
            # Passthrough in case we are outside of a task / flow context, e.g. `.fn`
            run_logger = get_run_logger()
        except Exception:
            return func(*args, **kwargs)

        for handler_id in handler_ids:
            try:
                logger.remove(handler_id)
            except ValueError:
                pass

        log_format = "{message}"

        handler_ids.append(
            logger.add(
                run_logger.debug,
                filter=lambda record: record["level"].name == "DEBUG",
                level="TRACE",
                format=log_format,
            )
        )

        handler_ids.append(
            logger.add(
                run_logger.warning,
                filter=lambda record: record["level"].name == "WARNING",
                level="TRACE",
                format=log_format,
            )
        )

        handler_ids.append(
            logger.add(
                run_logger.error,
                filter=lambda record: record["level"].name == "ERROR",
                level="TRACE",
                format=log_format,
            )
        )

        handler_ids.append(
            logger.add(
                run_logger.critical,
                filter=lambda record: record["level"].name == "CRITICAL",
                level="TRACE",
                format=log_format,
            )
        )

        handler_ids.append(
            logger.add(
                <http://run_logger.info|run_logger.info>,
                filter=lambda record: record["level"].name
                not in ["DEBUG", "WARNING", "ERROR", "CRITICAL"],
                level="TRACE",
                format=log_format,
            )
        )

        return func(*args, **kwargs)

    return wrapper
We produce the following results with messages logged both in normal functions, tasks, subflows, and task runs (in the Prefect UI). So I guess this answers my own question. No idea how, why or what crashed earlier this morning.
šŸ‘ 1
gratitude thank you 1
b
It's like that sometimes. I appreciate you sharing that here for others to take a look at!
šŸ™Œ 1
m
As it turns out, there is an issue here when decorating `@flow`s like this. Doing so results in "This flow run did not generate any task or subflow runs" in the flow run graph, and no logs at all are sent to the Prefect server. If we split the decorator into two functions
configure_logging
and
forward_logs
we can combine both approaches as a workaround, i.e.:
Copy code
def forward_logs(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        configure_logger()
        return func(*args, **kwargs)

    return wrapper

handler_ids: list[int] = []
 

def configure_logger():
    global handler_ids

    try:
        # Passthrough in case we are outside of a task / flow context, e.g. `.fn`
        run_logger = get_run_logger()
    except Exception:
        return

    for handler_id in handler_ids:
        try:
            logger.remove(handler_id)
        except ValueError:
            pass

    log_format = "{message}"

    handler_ids.append(
        logger.add(
            run_logger.debug,
            filter=lambda record: record["level"].name == "DEBUG",
            level="TRACE",
            format=log_format,
        )
    )
    ...

@task
@forward_logs()
def task_1():
    ...

@flow
def example_flow():
    configure_loggingU)
    task_1()
    ...
I'm not sure why the decorator would have this effect, might be due to some of the magic happening in
@flow
?
šŸ¤” 1
This behavior was not present in Prefect 2., and only started appearing after upgrading to Prefect 3.. This behaviour was tested on the most recent version, 3.1.5.
I created the following minimal reproducible example and bug report: https://github.com/PrefectHQ/prefect/issues/16182
gratitude thank you 1