how should I handle logging in a prefect-compatible way in a library that uses multiprocessing?
no matter what I try, I keep getting
UserWarning: Logger '<redacted>' attempted to send logs to the API without a flow run id. The API log handler can only send logs within flow run contexts unless the flow run id is manually provided.
when my code that uses prefect uses my library
in every single process i'm spawning in my library, I'm configuring the logger like the following (where
log_queue
is a multiprocessing queue):
queue_handler = QueueHandler(log_queue)
logger.addHandler(queue_handler)
logger.setLevel(logging.DEBUG)
and then in the library, I'm spawning a thread to listen to the logs sent down the multiprocessing queue:
logging_q_consumer_thread = threading.Thread(target=logger_consumer_thread, args=(log_queue,))
and this thread just runs this function:
def logger_consumer_thread(logging_q: LoggingQueue) -> None:
logger = logging.getLogger("<redacted>")
while True:
record = logging_q.get()
if record is None:
break
logger.log(level=record.levelno, msg=record.msg)
my idea was, if every process I spawn is sending its logs down the multiprocess queue, and then if I spawn a thread in the main process that consumes from the queue, then all my logging is effectively happening in the main process -- and once all the logging is happening in the main process, the modifications that prefect makes to my logger with the name
<redacted>
will work properly
(I assumed the reason why logging wasn't working before, was because I was logging in processes that weren't the main process. I think I've now solved that issue, as all my logs are now consumed in a thread inside the main process, but it doesn't seem to solve the problem)
(I've set the
PREFECT_LOGGING_EXTRA_LOGGERS=<redacted>
environment variable, so prefect is meant to be setting up any logger with the name
<redacted>
to log to prefect's API)
i'm thoroughly stuck and have no idea what I could be doing wrong