Ievgenii Martynenko
03/17/2022, 10:54 AMlogger = logging.getLogger(__ _name ___)
and I want log messages from that module to appear in Prefect.
I registered extra logger, and log messages defined in Task (AWSPOC) appear, but not the log messages defined inside 'bear'.
My code for "prefect_flow.py" :
import logging
format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
logger = logging.getLogger('magic_logger')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(format_string)
handler.setFormatter(formatter)
logger.addHandler(handler)
class AWSPOC(Task):
def __init__(self, name: str, config_file: str):
self.config_file = config_file
super().__init__(name=name)
def run(self):
configuration = load_file(self.config_file)
loader = Loader(configuration=configuration)
<http://logger.info|logger.info>("This message appears in Prefect log output")
loader.run()
executor = LocalDaskExecutor()
host_config = {...}
storage = S3(...)
env = {
"PREFECT__LOGGING__EXTRA_LOGGERS": "['magic_logger']"
}
docker_run_config = DockerRun(image=..., host_config=host_config, env=env)
with Flow(name="AWS POC", executor=executor, storage=storage, run_config=docker_run_config) as flow:
task = AWSPOC(name='...', config_file='...')
task()
flow.register(project_name='AWS POC')
Where I'm wrong? The only idea I have is that 'bear' should have it's own named logger.Kevin Kho
03/17/2022, 1:56 PMIevgenii Martynenko
03/18/2022, 11:42 AMKevin Kho
03/18/2022, 1:34 PMmagic_logger
here. Could you give me a snippet on how it’s defined in the custom module?Ievgenii Martynenko
03/18/2022, 2:20 PMKevin Kho
03/18/2022, 2:24 PMstored_as_script=True
?Ievgenii Martynenko
03/18/2022, 2:34 PMKevin Kho
03/18/2022, 2:39 PMlogging.StreamHandler(sys.stdout)
?
Also, just confirming you have debug level logs set on the flow as well?Ievgenii Martynenko
03/23/2022, 1:38 PMKevin Kho
03/23/2022, 1:46 PMIevgenii Martynenko
03/23/2022, 3:20 PM# poc.py
from custom_module import Loader
logger = logging.getLogger('custom_logger')
class POC(Task):
def __init__(self, name: str, config_file: str):
self.config_file = config_file
super().__init__(name=name)
def run(self):
configuration = load_file(self.config_file)
loader = Loader(configuration=configuration)
<http://logger.info|logger.info>(f"Starting POC Loader. This message appears. Logger is {logger.name}")
loader.start()
<http://logger.info|logger.info>("Completed POC Loader. This message appears")
env['PREFECT__LOGGING__EXTRA_LOGGERS'] = "['custom_logger']"
storage = S3(stored_as_script=True, local_script_path=f'poc.py', bucket='...',
client_options={'aws_access_key_id': ...),
'aws_secret_access_key': ...)}
)
docker_run_config = DockerRun(image='...', env=env)
with Flow(name="POC", executor=LocalDaskExecutor(), storage=storage, run_config=docker_run_config) as flow:
load_data = POC(name="POC", config_file='/root/prefect/flows/poc/poc.json')
load_data()
flow.register(project_name='AWS')
custom_module:
######
custom_module.py
def init_logger(name: str = 'custom_logger', level=logging.DEBUG, stream=sys.stdout,
format_string: str = "%(asctime)s %(name)s %(levelname)s %(message)s"):
logger = logging.getLogger(name)
logger.setLevel(level)
handler = logging.StreamHandler(stream)
handler.setLevel(level)
formatter = logging.Formatter(format_string)
handler.setFormatter(formatter)
logger.addHandler(handler)
init_logger(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger('custom_logger')
class Loader:
def __init__(self):
<http://logger.info|logger.info>("This message doesn't appear in Prefect")
def start():
<http://logger.info|logger.info>("This message doesn't appear in Prefect")
Kevin Kho
03/23/2022, 4:37 PMfrom prefect import Flow, Task
from themodule import Loader
import prefect
from prefect.run_configs import LocalRun
class POC(Task):
def __init__(self):
super().__init__()
def run(self):
logger = prefect.context.get("logger")
loader = Loader()
<http://logger.info|logger.info>(f"Starting POC Loader. This message appears. Logger is {logger.name}")
loader.start()
<http://logger.info|logger.info>("Completed POC Loader. This message appears")
run_config = LocalRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['custom_logger']"})
with Flow(name="POC", run_config=run_config) as flow:
load_data = POC()
load_data()
flow.register("databricks")
import logging
import sys
def init_logger(name: str = 'custom_logger', level=logging.DEBUG, stream=sys.stdout,
format_string: str = "%(asctime)s %(name)s %(levelname)s %(message)s"):
logger = logging.getLogger(name)
logger.setLevel(level)
handler = logging.StreamHandler(stream)
handler.setLevel(level)
formatter = logging.Formatter(format_string)
handler.setFormatter(formatter)
logger.addHandler(handler)
init_logger(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger('custom_logger')
class Loader:
def __init__(self):
<http://logger.info|logger.info>("This message doesn't appear in Prefect")
def start(self):
<http://logger.info|logger.info>("This message doesn't appear in Prefect")
Ievgenii Martynenko
03/23/2022, 4:53 PMKevin Kho
03/23/2022, 5:05 PMflow.register()
didnt work for you?Ievgenii Martynenko
03/23/2022, 5:06 PMKevin Kho
03/23/2022, 5:06 PMIevgenii Martynenko
03/23/2022, 5:29 PMKevin Kho
03/23/2022, 5:31 PMself
to the start
method for thatIevgenii Martynenko
03/23/2022, 5:36 PMrun_config = LocalRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['custom_logger']"},
working_dir='.....')
Kevin Kho
03/23/2022, 5:37 PMIevgenii Martynenko
03/24/2022, 2:35 PMKevin Kho
03/24/2022, 2:37 PM