Elliot Oram
07/21/2021, 3:53 PMmain()
in the attached code snippet.
When I execute this in a prefect flow I get the logging from main, but not from the thread. Any ideas? Thanks in advance ๐Kevin Kho
import threading
import time
import logging
from prefect import task, Flow
import prefect
def _logging_thread(args: dict, logger) -> None:
while not args["stop"]:
<http://logger.info|logger.info>("Hello from thread")
time.sleep(1)
@task
def main():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello from main")
info = dict(stop=False)
thread = threading.Thread(target=_logging_thread, args=(info, logger))
thread.start()
<http://logger.info|logger.info>("More from main")
time.sleep(15) # Do some interesting function calls
<http://logger.info|logger.info>("Finally from main")
info['stop'] = True
with Flow("aaa") as flow:
main()
flow.run()
Elliot Oram
07/22/2021, 8:10 AM## prefect_log_test.py
from prefect import Flow, task
from scripts import test_thread_logging
@task
def my_function_task():
logger = prefect.context.get("logger")
test_thread_logging.main(logger)
def main():
flow = Flow('Test logging from thread')
flow.add_task(my_function_task)
flow.register(project_name='my_project')
if __name__ == '__main__':
main()
## test_thread_logging.py
import threading
import time
def _logging_thread(args: dict, logger) -> None:
while not args["stop"]:
<http://logger.info|logger.info>("Hello from thread")
time.sleep(1)
def main(prefect_logger):
<http://prefect_logger.info|prefect_logger.info>("Hello from main")
info = dict(stop=False)
thread = threading.Thread(target=_logging_thread, args=(info, prefect_logger))
thread.start()
<http://prefect_logger.info|prefect_logger.info>("More from main")
time.sleep(15) # Do some interesting function calls
<http://prefect_logger.info|prefect_logger.info>("Finally from main")
info['stop'] = True
As you can see I tried to add the prefect context logger into the task and forward this, however, unfortunately, the above still doesn't show the logs from the thread.Kevin Kho
Elliot Oram
07/23/2021, 4:16 PM\prefect_log_test.py
\scripts\test_thread_logging.py
config.toml
extra_loggers = "['scripts.test_thread_logging']"
This successfully ensures that the log statements in main() are shown but still no luck on the those that are in the additional thread. Probably missing something obvious with the naming convention for this...?Kevin Kho
LocalDaskExecutor
would do it for you and be simpler I think and parallelize the code for you.Elliot Oram
07/26/2021, 11:50 AMimport logging
import threading
log = logging.getLogger(__name__)
def log_progress(current_index):
<http://log.info|log.info>(current_index)
def main(prefect_logger):
logging_args = dict()
current_timer = threading.Timer(10, log_progress, kwargs=logging_args)
current_timer.start()
for i in range(0,999):
logging_args['current_index'] = i
if not current_thread.is_alive():
current_timer = threading.Timer(10, log_progress, kwargs=logging_args)
current_timer.start()
...
current_timer.cancel()
Our specific use case is want to have the progress while we are running to output via tqdm to console if run manually on command line, but also have time interval logging every x minutes for prefect.
Thanks again for the suggestions ๐Kevin Kho