Alexandra Valentine-Ketchum
05/14/2024, 11:46 AMUserWarning: Logger '<redacted>' attempted to send logs to the API without a flow run id. The API log handler can only send logs within flow run contexts unless the flow run id is manually provided.
when my code that uses prefect uses my library
in every single process i'm spawning in my library, I'm configuring the logger like the following (where log_queue
is a multiprocessing queue):
queue_handler = QueueHandler(log_queue)
logger.addHandler(queue_handler)
logger.setLevel(logging.DEBUG)
and then in the library, I'm spawning a thread to listen to the logs sent down the multiprocessing queue:
logging_q_consumer_thread = threading.Thread(target=logger_consumer_thread, args=(log_queue,))
and this thread just runs this function:
def logger_consumer_thread(logging_q: LoggingQueue) -> None:
logger = logging.getLogger("<redacted>")
while True:
record = logging_q.get()
if record is None:
break
logger.log(level=record.levelno, msg=record.msg)
my idea was, if every process I spawn is sending its logs down the multiprocess queue, and then if I spawn a thread in the main process that consumes from the queue, then all my logging is effectively happening in the main process -- and once all the logging is happening in the main process, the modifications that prefect makes to my logger with the name <redacted>
will work properly
(I assumed the reason why logging wasn't working before, was because I was logging in processes that weren't the main process. I think I've now solved that issue, as all my logs are now consumed in a thread inside the main process, but it doesn't seem to solve the problem)
(I've set the PREFECT_LOGGING_EXTRA_LOGGERS=<redacted>
environment variable, so prefect is meant to be setting up any logger with the name <redacted>
to log to prefect's API)
i'm thoroughly stuck and have no idea what I could be doing wrongChris Guidry
05/14/2024, 1:49 PMContextVar
.
I think what's happening here is that by using multiprocessing
, you're moving the log records from within that context to outside that context. I'm not seeing where you're enqueuing each log record, but if you were to do something like this before enqueuing the record:
log_record: Record = ...whereever it comes from...
flow_run_context = FlowRunContext.get()
task_run_context = TaskRunContext.get()
setattr(log_record, "flow_run_id", flow_run_context.flow_run.id)
if task_run_context:
setattr(log_record, "task_run_id", task_run_context.task_run.id)
...enqueue the log_record...
Alexandra Valentine-Ketchum
05/14/2024, 1:51 PMChris Guidry
05/14/2024, 1:53 PMAlexandra Valentine-Ketchum
05/14/2024, 1:53 PMAlexandra Valentine-Ketchum
05/14/2024, 1:54 PMget_run_logger()
, and then listen to the multiprocessing log queue, and log to that logger, then things work
but this seems like a very janky solutionChris Guidry
05/14/2024, 1:54 PMflow_run_id
and/or task_run_id
available on each log record from the outer program....Chris Guidry
05/14/2024, 1:56 PMget_run_logger
is set up to know about prefect contexts and to add the extra
values already:
https://github.com/PrefectHQ/prefect/blob/main/src/prefect/logging/loggers.py#L160-L183Chris Guidry
05/14/2024, 1:56 PMflow_run_id
and/or task_run_id
attached to itChris Guidry
05/14/2024, 1:56 PMLoggerAdapter
could do that for youChris Guidry
05/14/2024, 1:57 PMget_run_logger
for all of its logs?Alexandra Valentine-Ketchum
05/14/2024, 1:57 PMChris Guidry
05/14/2024, 1:58 PMChris Guidry
05/14/2024, 2:00 PMChris Guidry
05/14/2024, 2:02 PMAlexandra Valentine-Ketchum
05/14/2024, 2:03 PMattempted to send logs to the API without a flow run id
error message, this means that the prefect handler is being correctly inherited in the logger i'm getting with logger.getLogger()
inside my library
but somehow the contextual information about the flow run ID isn't being inherited?Chris Guidry
05/14/2024, 2:03 PMAlexandra Valentine-Ketchum
05/14/2024, 2:03 PMAlexandra Valentine-Ketchum
05/14/2024, 2:04 PMChris Guidry
05/14/2024, 2:04 PMContextVar
and I believe a thread counts as a separate contextAlexandra Valentine-Ketchum
05/14/2024, 2:04 PMChris Guidry
05/14/2024, 2:04 PMAlexandra Valentine-Ketchum
05/14/2024, 2:06 PMChris Guidry
05/14/2024, 2:07 PMimport contextvars
import threading
var = contextvars.ContextVar('var')
def main():
var.set(10)
print('main:', var.get())
t = threading.Thread(target=thread)
t.start()
t.join()
def thread():
print('thread:', var.get())
if __name__ == '__main__':
main()
$ python cv.py
main: 10
Exception in thread Thread-1 (thread):
Traceback (most recent call last):
File "/home/chris/.pyenv/versions/3.12.2/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
self.run()
File "/home/chris/.pyenv/versions/3.12.2/lib/python3.12/threading.py", line 1010, in run
self._target(*self._args, **self._kwargs)
File "/home/chris/tmp/cv.py", line 14, in thread
print('thread:', var.get())
^^^^^^^^^
LookupError: <ContextVar name='var' at 0x73a5bc9ea390>
Chris Guidry
05/14/2024, 2:07 PMflow_run_id
copied onto the LogRecord
before you leave the context, that should work outChris Guidry
05/14/2024, 2:08 PMChris Guidry
05/14/2024, 2:08 PMAlexandra Valentine-Ketchum
05/14/2024, 2:08 PMChris Guidry
05/14/2024, 2:09 PMdef logger_consumer_thread(logging_q: LoggingQueue) -> None:
logger = logging.getLogger("<redacted>")
while True:
record = logging_q.get()
if record is None:
break
logger.log(level=record.levelno, msg=record.msg)
Alexandra Valentine-Ketchum
05/14/2024, 2:09 PMChris Guidry
05/14/2024, 2:09 PMChris Guidry
05/14/2024, 2:09 PMrecord
alreadyChris Guidry
05/14/2024, 2:10 PMlogger.log
from your thread, you can just use a handler and do handler.emit
Chris Guidry
05/14/2024, 2:10 PMAlexandra Valentine-Ketchum
05/14/2024, 2:12 PMAlexandra Valentine-Ketchum
05/14/2024, 2:13 PMChris Guidry
05/14/2024, 2:13 PMChris Guidry
05/14/2024, 2:15 PMAlexandra Valentine-Ketchum
05/14/2024, 2:15 PMChris Guidry
05/14/2024, 2:17 PMflow_run_id
on them. I misunderstood the direction of the flow here. Okay, so really the whole problem boils down to the context not being there in your threadChris Guidry
05/14/2024, 2:18 PMcopy_context
Alexandra Valentine-Ketchum
05/14/2024, 2:21 PMAlexandra Valentine-Ketchum
05/14/2024, 2:21 PMAlexandra Valentine-Ketchum
05/14/2024, 2:21 PMAlexandra Valentine-Ketchum
05/14/2024, 2:21 PMChris Guidry
05/14/2024, 2:21 PMAlexandra Valentine-Ketchum
05/14/2024, 5:12 PMcurrent_thread_context = contextvars.copy_context()
logging_q_consumer_thread = threading.Thread(
target=current_thread_context.run,
args=(lambda: logger_consumer_thread(logging_q),),
daemon=True,
)
logging_q_consumer_thread.start()
and then I change the function run by that thread to:
def logger_consumer_thread(logging_q: LoggingQueue) -> None:
current_thread_context = cast(
Mapping[str, object],
dict(contextvars.copy_context().items()),
)
logger = logging.getLogger("<redacted>")
while True:
record = logging_q.get()
if record is None:
break
logger.log(level=record.levelno, msg=record.msg, extra=current_thread_context)
then everything worksChris Guidry
05/14/2024, 5:12 PMChris Guidry
05/14/2024, 5:13 PMAlexandra Valentine-Ketchum
05/14/2024, 5:13 PMChris Guidry
05/14/2024, 5:13 PMAlexandra Valentine-Ketchum
05/14/2024, 5:14 PMAlexandra Valentine-Ketchum
05/14/2024, 5:14 PM