Hello everyone. I've got a question regarding log...
# ask-community
n
Hello everyone. I've got a question regarding logging. In the documentation it is mentioned that you can add custom log handlers to prefect logs. When adding a custom log handler, and simply running the flow via the local script (
flow.run()
), the custom log handler works. However, when registering the flow to my localhost deployment of prefect server and agent (
flow.register()
), the log handlers don't seem to do anything (although the logs are still visible in the Prefect UI). In the following example, when running the flow locally, all logs are sent to our Logstash instance successfully. However, when registering to localhost server and running the flow, no logs are sent to the Logstash.
Copy code
import prefect
from prefect import task, Flow
from prefect.run_configs import LocalRun
from prefect.utilities.logging import get_logger as get_prefect_logger



@task(name='Test Task')
def test_task():
    logger = prefect.context.get("logger")
    logger.error('TEST LOG 1 - context logger')

    logger = get_custom_logger()
    logger.addHandler(get_logstash_handler())
    logger.error('TEST LOG 2 - custom logger')

    logger = get_prefect_logger()
    logger.addHandler(get_logstash_handler())
    logger.error('TEST LOG 3 - util logger')

    return


with Flow("test_flow") as flow:
    test_task()

task_logger = get_prefect_logger('Test Task')
task_logger.addHandler(get_logstash_handler())

if __name__ == "__main__":
    flow.run_config = LocalRun()
    #flow.run()
    #flow.register(project_name='localhost')
Is there something I'm missing in this setup?
s
I'm not sure if that documentation applies for agent driven flows 😓 When you register your flow, it's serializing the flow itself (variable
flow
in the script) and delivering it to the agent to execute. I'm not sure the logging handler will come along for the ride.
n
Are you aware of any workaround how to get the agent driven flows to handle logs in any way other than output them to UI?
s
I think they get output to STDOUT by default, so you could use logstash as a sidecar?
k
Hi @Nikola Lusic, I’ll ask the team about this
👍 1
Did you try adding it as an extra logger ?
This may also help. Could you try attaching the handler outside the task so it’s not deffered?
n
Yup, out of 3 loggers in the example I pasted, one of them is attached outside of the task, and is fetched inside the task exactly as in the link you provided
logger = prefect.context.get("logger")
I will try adding it as an extra logger.
@Kevin Kho, none of the suggested helped solve the issue. Here is the minimal code to recreate the issue (this is pretty much the example you linked):
Copy code
import logging
import requests

import prefect
from prefect import task, Flow
from prefect.utilities.logging import get_logger

class MyHandler(logging.StreamHandler):
    def emit(self, record):
        <http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg=record.msg))

@task(name="Task A")
def task_a():
    logger = prefect.context.get("logger")
    logger.error("error log")
    <http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg='hello message'))
    return

with Flow("logging-example") as flow:
    result = task_a()

# now attach our custom handler to Task B's logger
task_logger = get_logger("Task A")
task_logger.addHandler(MyHandler())


if __name__ == "__main__":
    flow.run()
    flow.register(project_name='localhost')
When running this, the flow will send two messages to the local webserver ("error log", and "hello message"). After registering the flow to local server, and running it, only one message will hit the webserver ("hello message"). Local machine setup:
Copy code
# spins up a local webserver running at <http://0.0.0.0:8000/>
python3 -m http.server

# starts the local prefect server
prefect agent local start

# starts the local prefect agent
prefect agent local start
k
Sure I ’ll look deeper into this. Will take me a bit of time though.
n
Thank you very much Kevin!
k
Can you give me the logstash handler function? Take out any sensitive info. I’ll spin up a free trial and test.
n
I think you can test with the local python webserver from my last example - just to make it as simple as possible.
k
Ok sounds good!
I tested adding it in the task and it worked for me (on Cloud though). Not so sure about Server at the moment, but I think it should work based on the code. Seems like your first code snippet should have worked. Could you try this and see if it works? I’ll also chat with someone on the team (today) who knows more about this to understand the behavior better
Copy code
import logging
import requests

import prefect
from prefect import task, Flow
from prefect.utilities.logging import get_logger


class MyHandler(logging.StreamHandler):
    def emit(self, record):
        <http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg=record.msg))

@task(name="Task A")
def task_b(x):
    logger = prefect.context.get("logger")
    logger.addHandler(MyHandler())
    logger.warning("Returning {}".format(x))
    <http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg='hello message'))
    return x


with Flow("logging-example-stream") as flow:
    result = task_b(3)

flow.register("project-name")
n
I can confirm this setup works! Which lead me to believe there are issues with how the logstash handler from my first code snippet works. And upon even further inspection, there are!
Copy code
from logstash_async.handler import AsynchronousLogstashHandler

def get_logstash_handler(conf = None):
    host = <LOGSTASH_ADDRESS>
    port = 5044
    handler = AsynchronousLogstashHandler(
        host,
        port,
        ssl_enable=True,
        ssl_verify=False,
        transport="logstash_async.transport.BeatsTransport",
        database_path=.data/logstash.db",
    )
    return handler
The issue was caused by the:
database_path=.data/logstash.db
Logstash messages were supposed to be queued in this file, but obviously that couldn't be done in a serialized flow, so they were lost. When setting it to
None
(keeping the messages in memory), the logs are now being sent to Logstash. It was the quiet failure that kept me tapping in dark far too long. Thank you so much @Kevin Kho 🙏
k
No worries! I was suspecting also there could be something in your logstash handler. Glad you got it figured out!