Hi all, looking for some advice about logging with...
# ask-community
e
Hi all, looking for some advice about logging with prefect and threads (note: I'm no expert on any of these 3 concepts!): I have a prefect flow task that runs
main()
in the attached code snippet. When I execute this in a prefect flow I get the logging from main, but not from the thread. Any ideas? Thanks in advance ๐Ÿ™
k
Hey @Elliot Oram, would this work?
Copy code
import threading
import time
import logging
from prefect import task, Flow
import prefect

def _logging_thread(args: dict, logger) -> None:
    while not args["stop"]:
        <http://logger.info|logger.info>("Hello from thread")
        time.sleep(1)

@task
def main():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello from main")
    info = dict(stop=False)
    thread = threading.Thread(target=_logging_thread, args=(info, logger))
    thread.start()

    <http://logger.info|logger.info>("More from main")
    time.sleep(15)  # Do some interesting function calls
    <http://logger.info|logger.info>("Finally from main")
    info['stop'] = True

with Flow("aaa") as flow:
    main()

flow.run()
e
Thanks! This does work however, I omitted some details for simplicity of explaining the problem which means this isn't quite right for my scenario (apologies for not giving the whole problem up front!!). Our system setup means that this script needs to have the ability to run independently of prefect as well as in prefect. As such, there is a separate script that is responsible for setting up the flow and the tasks:
Copy code
## prefect_log_test.py

from prefect import Flow, task
from scripts import test_thread_logging

@task
def my_function_task():
    logger = prefect.context.get("logger")
    test_thread_logging.main(logger)

def main():
    flow = Flow('Test logging from thread')
    flow.add_task(my_function_task)
    flow.register(project_name='my_project')

if __name__ == '__main__':
    main()
Copy code
## test_thread_logging.py

import threading
import time

def _logging_thread(args: dict, logger) -> None:
    while not args["stop"]:
        <http://logger.info|logger.info>("Hello from thread")
        time.sleep(1)

def main(prefect_logger):
    <http://prefect_logger.info|prefect_logger.info>("Hello from main")
    info = dict(stop=False)
    thread = threading.Thread(target=_logging_thread, args=(info, prefect_logger))
    thread.start()
    <http://prefect_logger.info|prefect_logger.info>("More from main")
    time.sleep(15)  # Do some interesting function calls
    <http://prefect_logger.info|prefect_logger.info>("Finally from main")
    info['stop'] = True
As you can see I tried to add the prefect context logger into the task and forward this, however, unfortunately, the above still doesn't show the logs from the thread.
k
I see, I think what needs to happen is that script would have itโ€™s own logger, and then you add it to Prefect as an extra logger like this
e
Hey Kevin, I've tried this but still no luck. Folder structure is like this:
Copy code
\prefect_log_test.py
\scripts\test_thread_logging.py
config.toml
Copy code
extra_loggers = "['scripts.test_thread_logging']"
This successfully ensures that the log statements in main() are shown but still no luck on the those that are in the additional thread. Probably missing something obvious with the naming convention for this...?
k
Do you need the threads manually? The
LocalDaskExecutor
would do it for you and be simpler I think and parallelize the code for you.
e
Hey Kevin, thanks for the help, ended up going for a different direction which is simpler. For closure the following plays much nicer with prefect:
Copy code
import logging
import threading

log = logging.getLogger(__name__)

def log_progress(current_index):
    <http://log.info|log.info>(current_index)


def main(prefect_logger):
    logging_args = dict()
    current_timer = threading.Timer(10, log_progress, kwargs=logging_args)
    current_timer.start()
    for i in range(0,999):
        logging_args['current_index'] = i
        if not current_thread.is_alive():
             current_timer = threading.Timer(10, log_progress, kwargs=logging_args)
             current_timer.start()
        ...
    current_timer.cancel()
Our specific use case is want to have the progress while we are running to output via tqdm to console if run manually on command line, but also have time interval logging every x minutes for prefect. Thanks again for the suggestions ๐Ÿ™‚
k
I see. Thanks for circling back on this!