how should I handle logging in a prefect-compatibl...
# ask-community
a
how should I handle logging in a prefect-compatible way in a library that uses multiprocessing? no matter what I try, I keep getting
UserWarning: Logger '<redacted>' attempted to send logs to the API without a flow run id. The API log handler can only send logs within flow run contexts unless the flow run id is manually provided.
when my code that uses prefect uses my library in every single process i'm spawning in my library, I'm configuring the logger like the following (where
log_queue
is a multiprocessing queue):
Copy code
queue_handler = QueueHandler(log_queue)
    logger.addHandler(queue_handler)
    logger.setLevel(logging.DEBUG)
and then in the library, I'm spawning a thread to listen to the logs sent down the multiprocessing queue:
Copy code
logging_q_consumer_thread = threading.Thread(target=logger_consumer_thread, args=(log_queue,))
and this thread just runs this function:
Copy code
def logger_consumer_thread(logging_q: LoggingQueue) -> None:
    logger = logging.getLogger("<redacted>")

    while True:
        record = logging_q.get()

        if record is None:
            break

        logger.log(level=record.levelno, msg=record.msg)
my idea was, if every process I spawn is sending its logs down the multiprocess queue, and then if I spawn a thread in the main process that consumes from the queue, then all my logging is effectively happening in the main process -- and once all the logging is happening in the main process, the modifications that prefect makes to my logger with the name
<redacted>
will work properly (I assumed the reason why logging wasn't working before, was because I was logging in processes that weren't the main process. I think I've now solved that issue, as all my logs are now consumed in a thread inside the main process, but it doesn't seem to solve the problem) (I've set the
PREFECT_LOGGING_EXTRA_LOGGERS=<redacted>
environment variable, so prefect is meant to be setting up any logger with the name
<redacted>
to log to prefect's API) i'm thoroughly stuck and have no idea what I could be doing wrong
c
Hi Alexandra! Where this warning is coming from is in Prefect's logging `Handler`: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/logging/handlers.py#L189-L200 And what's going on here is that we pick up the current flow run (and optionally task run) from an context object that is active around a given flow run. That object is fundamentally stored in a standard library
ContextVar
. I think what's happening here is that by using
multiprocessing
, you're moving the log records from within that context to outside that context. I'm not seeing where you're enqueuing each log record, but if you were to do something like this before enqueuing the record:
Copy code
log_record: Record = ...whereever it comes from...

flow_run_context = FlowRunContext.get()
task_run_context = TaskRunContext.get()

setattr(log_record, "flow_run_id", flow_run_context.flow_run.id)
if task_run_context:
    setattr(log_record, "task_run_id", task_run_context.task_run.id)

...enqueue the log_record...
a
unfortunately, i'm writing a library that's being used by a program that uses prefect and i don't want to make prefect a dependency of the library so i can't just get the flow run context inside the library
c
Ah okay I see, so do you have any control of the program that uses prefect?
a
yeah
i've just discovered that if I pass through a logger created by
get_run_logger()
, and then listen to the multiprocessing log queue, and log to that logger, then things work but this seems like a very janky solution
c
Okay sweet, I wonder if there's a way you can make the
flow_run_id
and/or
task_run_id
available on each log record from the outer program....
Yeah that would work because the logger returned from
get_run_logger
is set up to know about prefect contexts and to add the
extra
values already: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/logging/loggers.py#L160-L183
This is fundamentally what you'd need to do on the outer program, make sure that every log record has the
flow_run_id
and/or
task_run_id
attached to it
A
LoggerAdapter
could do that for you
One question I have: is the outer program using
get_run_logger
for all of its logs?
a
yup
c
Interesting, but the log records aren't coming through to your library with the flow run ID already 🤔
This may be hard to do, but are you able to share a small-scale version of your program and library with us as a gist? Like boil it down to the essentials?
Another question: is the outer program always running one flow run at a time, or could it be running any number at a time?
a
i assume that, if i'm seeing the
attempted to send logs to the API without a flow run id
error message, this means that the prefect handler is being correctly inherited in the logger i'm getting with
logger.getLogger()
inside my library but somehow the contextual information about the flow run ID isn't being inherited?
c
I believe that's right, because you're on a different thread
a
oh, that's expected behaviour?
i thought being on a different thread would be fine, as long as i'm on the same process
c
Right, the flow run context is in a python stdlib
ContextVar
and I believe a thread counts as a separate context
a
ohhhhhhhhh
c
Let me double-check that
a
yeah, looks like it's thread-local
c
Copy code
import contextvars
import threading

var = contextvars.ContextVar('var')

def main():
    var.set(10)
    print('main:', var.get())
    t = threading.Thread(target=thread)
    t.start()
    t.join()

def thread():
    print('thread:', var.get())

if __name__ == '__main__':
    main()
Copy code
$ python cv.py
main: 10
Exception in thread Thread-1 (thread):
Traceback (most recent call last):
  File "/home/chris/.pyenv/versions/3.12.2/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/home/chris/.pyenv/versions/3.12.2/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/home/chris/tmp/cv.py", line 14, in thread
    print('thread:', var.get())
                     ^^^^^^^^^
LookupError: <ContextVar name='var' at 0x73a5bc9ea390>
Yep, I think if you can manage to get the
flow_run_id
copied onto the
LogRecord
before you leave the context, that should work out
Without knowing how your outer program works, I'm not exactly sure how that would play out
Oh wait wait
a
yeah, i think I can sort something out
c
Copy code
def logger_consumer_thread(logging_q: LoggingQueue) -> None:
    logger = logging.getLogger("<redacted>")

    while True:
        record = logging_q.get()

        if record is None:
            break

        logger.log(level=record.levelno, msg=record.msg)
a
i at least understand what exactly is going wrong now!
c
so you're pulling log records off the queue here and then re-logging them
you should theoretically have all the info you need on the
record
already
I wonder if instead of doing
logger.log
from your thread, you can just use a handler and do
handler.emit
If you're comfortable sharing a little more detail, is the idea here that your library streams logs to another system?
a
we're using the library to analyze chess games, and it abstracts over various ways of doing that but the logs are important to track progress and errors when analyzing the games
and we're flying a bit blind not being able to see them lol
c
😄 totally feel that!
So is the root problem that your analysis is happening in subprocesses (via multiprocessing) and those logs aren't making it back to the prefect flows that are running them (and thus not back up to the API so you can see those logs)?
a
yup, exactly
c
aha! okay gotcha gotcha, so the log records won't have prefect
flow_run_id
on them. I misunderstood the direction of the flow here. Okay, so really the whole problem boils down to the context not being there in your thread
Take a look at https://docs.python.org/3/library/contextvars.html and there might be something there that helps, especially
copy_context
a
taking a look
thanks so much 🙏
this was bamboozling me so hard lol
this was a huge help
c
😄 my pleasure! keep me posted on how it's going
a
aha! if, when I spawn the thread to do logging in the main process, I do:
Copy code
current_thread_context = contextvars.copy_context()
        logging_q_consumer_thread = threading.Thread(
            target=current_thread_context.run,
            args=(lambda: logger_consumer_thread(logging_q),),
            daemon=True,
        )
        logging_q_consumer_thread.start()
and then I change the function run by that thread to:
Copy code
def logger_consumer_thread(logging_q: LoggingQueue) -> None:
    current_thread_context = cast(
        Mapping[str, object],
        dict(contextvars.copy_context().items()),
    )

    logger = logging.getLogger("<redacted>")

    while True:
        record = logging_q.get()

        if record is None:
            break

        logger.log(level=record.levelno, msg=record.msg, extra=current_thread_context)
then everything works
c
YESSSS!
So one question: your thread will only exist for the duration of one Prefect flow run and/or task run, right?
a
yup
c
If that's the case, I think you're golden
a
thanks for all the help
really appreciated
highfive 1