YD
06/11/2021, 9:16 PMKevin Kho
Kevin Kho
from datetime import timedelta, datetime
from prefect import task, Flow, Parameter
import prefect
import logging
from prefect.utilities.logging import get_logger
from pathlib import Path
import os
# os.mkdir('log')
LOG_PATH = 'log'
class MyFileLogger(logging.FileHandler):
def __init__(self, filename, mode='a', encoding=None, delay=False):
filename = os.path.join(LOG_PATH, filename)
super(MyFileLogger, self).__init__(filename, mode, encoding, delay)
@task(max_retries=3, retry_delay=timedelta(seconds=1), name='pull_project', log_stdout=True)
def pull_project(project_name):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Pull latest {project_name} version from git.")
logger.warning("A warning message....")
return {'project_name': project_name}
def main():
with Flow("test") as flow:
project_info = pull_project(project_name='test')
file_logger = get_logger()
fh = MyFileLogger('auto_ai.log')
fh.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s - %(name)s | %(message)s"))
file_logger.addHandler(fh)
flow.run()
if __name__ == "__main__":
main()
YD
06/11/2021, 9:47 PMBeginning Flow run for 'test'
Task 'pull_project': Starting task run...
Pull latest test version from git.
A warning message....
Task 'pull_project': Finished task run for task with final state: 'Success'
Flow run SUCCESS: all reference tasks succeeded
instead of
Beginning Flow run for 'test'
Task 'pull_project': Starting task run...
Pull latest test version from git.
A warning message....
Task 'pull_project': Finished task run for task with final state: 'Success'
Flow run SUCCESS: all reference tasks succeeded
Original code
from datetime import timedelta, datetime
from prefect import task, Flow, Parameter
import prefect
import logging
from prefect.utilities.logging import get_logger
from pathlib import Path
import os
SRC_PATH = os.path.dirname(__file__)
LOG_PATH = os.path.join(Path(SRC_PATH).parents[0], 'logs')
class MyFileLogger(logging.FileHandler):
def __init__(self, filename, mode='a', encoding=None, delay=False):
filename = os.path.join(LOG_PATH, filename)
super(MyFileLogger, self).__init__(filename, mode, encoding, delay)
# logging.Formatter = "[%(asctime)s] %(levelname)s - %(name)s | %(message)s"
@task(max_retries=3, retry_delay=timedelta(seconds=1), name='pull_project', log_stdout=True)
def pull_project(project_name):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Pull latest {project_name} version from git.")
logger.warning("A warning message....")
return {'project_name': project_name}
def main():
with Flow("test") as flow:
project_info = pull_project(project_name='test')
file_logger = get_logger()
file_logger.addHandler(MyFileLogger('auto_ai.log'))
flow.run()
if __name__ == "__main__":
main()
Kevin Kho
YD
06/11/2021, 9:53 PMfile_logger = get_logger()
fh = MyFileLogger('auto_ai.log')
fh.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s - %(name)s | %(message)s"))
file_logger.addHandler(fh)
Fix the issue
Thanks @Kevin KhoKevin Kho