Morten Hauge
12/02/2024, 11:56 AMlog_prints=True
, because then all messages will be logged as INFO
. I want to use Loguru instead of get_run_logger
because my code is calling out to internal libraries that log messages with Loguru and provide contextual information using .bind
, and I need these messages to appear as well in the Prefect UI.Bianca Hoch
12/02/2024, 6:59 PMMorten Hauge
12/02/2024, 7:03 PMBianca Hoch
12/02/2024, 7:39 PMMorten Hauge
12/02/2024, 8:19 PMMorten Hauge
12/02/2024, 8:42 PM@forward_logs
def task_1():
logger.bind(key="value").info("This was logged in Task 1 (normal function)")
@task
@forward_logs
def task_2():
logger.critical("This was logged in Task 2 (prefect task)")
@forward_logs
def task_3():
logger.warning("This was logged in Task 3 (normal function)")
@task
@forward_logs
def task_4():
logger.error("This was logged in Task 4 inside a subflow (prefect task)")
@task
@forward_logs
def task_5():
<http://logger.info|logger.info>("This was logged in Task 5 inside a subflow (prefect task)")
@flow
def example_flow():
subflow()
task_1()
task_2()
task_3()
@flow
def subflow():
task_4()
task_5()
if __name__ == "__main__":
serve(example_flow.to_deployment(name="example-flow"))
With the following decorator:
import functools
from loguru import logger
from prefect import flow, get_run_logger, serve, task
handler_ids = []
def forward_logs(func):
"""
Loguru uses an anonymous logger, which prohibits us from using the
PREFECT_LOGGING_EXTRA_LOGGERS variable to capture logs and forward them to the
Prefect UI.
Decorating functions at the flow-level will forward logs for all subflows, tasks and
ordinary python functions called by that flow automatically to the Flow Run in the
Prefect UI.
Note that in order to have logs show up in the Task run view in the Prefect UI, that
the Task itself must be decorated with this decorator.
"""
# Reference:
# <https://discourse.prefect.io/t/can-i-use-loguru-logs-in-prefect-flows/140>
@functools.wraps(func)
def wrapper(*args, **kwargs):
global handler_ids
try:
# Passthrough in case we are outside of a task / flow context, e.g. `.fn`
run_logger = get_run_logger()
except Exception:
return func(*args, **kwargs)
for handler_id in handler_ids:
try:
logger.remove(handler_id)
except ValueError:
pass
log_format = "{message}"
handler_ids.append(
logger.add(
run_logger.debug,
filter=lambda record: record["level"].name == "DEBUG",
level="TRACE",
format=log_format,
)
)
handler_ids.append(
logger.add(
run_logger.warning,
filter=lambda record: record["level"].name == "WARNING",
level="TRACE",
format=log_format,
)
)
handler_ids.append(
logger.add(
run_logger.error,
filter=lambda record: record["level"].name == "ERROR",
level="TRACE",
format=log_format,
)
)
handler_ids.append(
logger.add(
run_logger.critical,
filter=lambda record: record["level"].name == "CRITICAL",
level="TRACE",
format=log_format,
)
)
handler_ids.append(
logger.add(
<http://run_logger.info|run_logger.info>,
filter=lambda record: record["level"].name
not in ["DEBUG", "WARNING", "ERROR", "CRITICAL"],
level="TRACE",
format=log_format,
)
)
return func(*args, **kwargs)
return wrapper
We produce the following results with messages logged both in normal functions, tasks, subflows, and task runs (in the Prefect UI). So I guess this answers my own question. No idea how, why or what crashed earlier this morning.Bianca Hoch
12/02/2024, 9:13 PMMorten Hauge
12/03/2024, 8:36 AMconfigure_logging
and forward_logs
we can combine both approaches as a workaround, i.e.:
def forward_logs(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
configure_logger()
return func(*args, **kwargs)
return wrapper
handler_ids: list[int] = []
def configure_logger():
global handler_ids
try:
# Passthrough in case we are outside of a task / flow context, e.g. `.fn`
run_logger = get_run_logger()
except Exception:
return
for handler_id in handler_ids:
try:
logger.remove(handler_id)
except ValueError:
pass
log_format = "{message}"
handler_ids.append(
logger.add(
run_logger.debug,
filter=lambda record: record["level"].name == "DEBUG",
level="TRACE",
format=log_format,
)
)
...
@task
@forward_logs()
def task_1():
...
@flow
def example_flow():
configure_loggingU)
task_1()
...
I'm not sure why the decorator would have this effect, might be due to some of the magic happening in @flow
?Morten Hauge
12/03/2024, 8:36 AMMorten Hauge
12/03/2024, 9:11 AM