Hi again, I have a question about extra logging. W...
# prefect-community
i
Hi again, I have a question about extra logging. We have own Python module 'bear' whereas logging is defined as:
logger = logging.getLogger(__ _name ___)
and I want log messages from that module to appear in Prefect. I registered extra logger, and log messages defined in Task (AWSPOC) appear, but not the log messages defined inside 'bear'. My code for "prefect_flow.py" :
Copy code
import logging

format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s"
logger = logging.getLogger('magic_logger')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(format_string)
handler.setFormatter(formatter)
logger.addHandler(handler)

class AWSPOC(Task):

    def __init__(self, name: str, config_file: str):
        self.config_file = config_file
        super().__init__(name=name)

    def run(self):
        configuration = load_file(self.config_file)
        loader = Loader(configuration=configuration)
        <http://logger.info|logger.info>("This message appears in Prefect log output")
        loader.run()

executor = LocalDaskExecutor()
host_config = {...}
storage = S3(...)
env = {
    "PREFECT__LOGGING__EXTRA_LOGGERS": "['magic_logger']"
}
docker_run_config = DockerRun(image=..., host_config=host_config, env=env)

with Flow(name="AWS POC", executor=executor, storage=storage, run_config=docker_run_config) as flow:
    task = AWSPOC(name='...', config_file='...')
    task()

flow.register(project_name='AWS POC')
Where I'm wrong? The only idea I have is that 'bear' should have it's own named logger.
k
This looks fine on first glance. Will look more later
i
also have no ideas
k
Oh sorry slipped my mind to return. Looking now
So everything you do outside the Flow to the logger will not take effect in the flow run because it’s executing during flow registration time. Not during flow run time. So you either need to used script based storage so that it is evaluated during runtime or you need to put that logic in a task.
The setup on this side looks right to be so i am wondering about
magic_logger
here. Could you give me a snippet on how it’s defined in the custom module?
i
magic_logger is not defined in custom module. It's just above in the script. Customer module has just "logger = logging.getLogger(name)" and that' it.
I'm using script based storage. Script (Flow) is published to S3 and taken from there. "storage = S3(...)"
k
Ahh I see. Sorry I misunderstood the first question. I think S3 storage is still pickle based by default. Do you have
stored_as_script=True
?
i
yeap
k
There is no stream in your StreamHandler, maybe you need
Copy code
logging.StreamHandler(sys.stdout)
? Also, just confirming you have debug level logs set on the flow as well?
i
I've added sys.stdout. Really missed that, but nothing changed
logging messaged appears only inside Task, but not from customer modules (or any library used in flow)
maybe you have an example of it?
k
This is a full working example of using a custom module
i
Is there any extra support available, personal, paid or any? I cannot explain what is wrong with it. If I run library out of Prefect logs appear as usual. Adding Prefect - logs disappear and even if flow fails inside, in Prefect flow still completes successfully.
Copy code
# poc.py
from custom_module import Loader

logger = logging.getLogger('custom_logger')

class POC(Task):

    def __init__(self, name: str, config_file: str):
        self.config_file = config_file
        super().__init__(name=name)

    def run(self):
        configuration = load_file(self.config_file)
        loader = Loader(configuration=configuration)
        <http://logger.info|logger.info>(f"Starting POC Loader. This message appears. Logger is {logger.name}")
        loader.start()
        <http://logger.info|logger.info>("Completed POC Loader. This message appears")

env['PREFECT__LOGGING__EXTRA_LOGGERS'] = "['custom_logger']"

storage = S3(stored_as_script=True, local_script_path=f'poc.py', bucket='...',
             client_options={'aws_access_key_id': ...),
                             'aws_secret_access_key': ...)}
             )

docker_run_config = DockerRun(image='...', env=env)

with Flow(name="POC", executor=LocalDaskExecutor(), storage=storage, run_config=docker_run_config) as flow:
    load_data = POC(name="POC", config_file='/root/prefect/flows/poc/poc.json')
    load_data()

flow.register(project_name='AWS')
custom_module:
Copy code
######
custom_module.py

def init_logger(name: str = 'custom_logger', level=logging.DEBUG, stream=sys.stdout,
                format_string: str = "%(asctime)s %(name)s %(levelname)s %(message)s"):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    handler = logging.StreamHandler(stream)
    handler.setLevel(level)
    formatter = logging.Formatter(format_string)
    handler.setFormatter(formatter)
    logger.addHandler(handler)
	
init_logger(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger('custom_logger')


class Loader:
	def __init__(self):
		<http://logger.info|logger.info>("This message doesn't appear in Prefect")
		
	def start():
		<http://logger.info|logger.info>("This message doesn't appear in Prefect")
k
Looking into this now
So i got this to work on my end
Flow file:
Copy code
from prefect import Flow, Task 
from themodule import Loader
import prefect
from prefect.run_configs import LocalRun

class POC(Task):

    def __init__(self):
        super().__init__()

    def run(self):
        logger = prefect.context.get("logger")
        loader = Loader()
        <http://logger.info|logger.info>(f"Starting POC Loader. This message appears. Logger is {logger.name}")
        loader.start()
        <http://logger.info|logger.info>("Completed POC Loader. This message appears")

run_config = LocalRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['custom_logger']"})

with Flow(name="POC", run_config=run_config) as flow:
    load_data = POC()
    load_data()

flow.register("databricks")
Modified custom module:
Copy code
import logging
import sys

def init_logger(name: str = 'custom_logger', level=logging.DEBUG, stream=sys.stdout,
                format_string: str = "%(asctime)s %(name)s %(levelname)s %(message)s"):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    handler = logging.StreamHandler(stream)
    handler.setLevel(level)
    formatter = logging.Formatter(format_string)
    handler.setFormatter(formatter)
    logger.addHandler(handler)
	
init_logger(level=<http://logging.INFO|logging.INFO>)
logger = logging.getLogger('custom_logger')


class Loader:
	def __init__(self):
		<http://logger.info|logger.info>("This message doesn't appear in Prefect")
		
	def start(self):
		<http://logger.info|logger.info>("This message doesn't appear in Prefect")
Can you try this? and see if you can run a flow and if the logs appear? This uses local storage and local agent and then we can think of why yours was not working
i
doing
without flow.register(). Just flow.run() gives desired effect based on your example
k
flow.register()
didnt work for you?
i
need few more minutes to pull prefect locally 🙂
k
Ah ok lol
i
local run also doesn't show any messages from custom module. Only those that are in POC(Task) are visible. Also you've mentioned that you modified custom_module, while I don't see any difference.
k
So I see them. I think I may have just added
self
to the
start
method for that
You used LocalRun + LocalAgent?
i
Yes. Ok it started working, but I cannot explain why. Need to compare step by step. I also had to add working_dir to run_config:
Copy code
run_config = LocalRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": "['custom_logger']"},
                      working_dir='.....')
k
Ah ok that’s fine. I can’t explain fully either myself for now, but at least we have a working example to start with 😅
i
@Kevin Kho, ok so the root cause is a combination of factors. 1. missing definition of prefect.context.logger 2. https://prefect-community.slack.com/archives/CL09KU1K7/p1648115064317039 - this thread. 3. https://prefect-community.slack.com/archives/CL09KU1K7/p1648114337459679 - this thread. Overall the messages inside the custom_module were generated from threads. Since multithreading in Prefect works a bit indifferent, that is what happened: task was submitted form Prefect, threads in custom_module created and were running, but Prefect immediately succeeded the task and all threads output was missed. Thus: task completed, no logs. I think the proper way will be to wrap each separate function on custom_module into Prefect Task, which allows to run them in parallel.
I need to write an essay.
k
Ahhh that clears things for me as well. Yes the LocalDaskExecutor will propagate logs better than multiprocessing