Nikola Lusic
06/30/2021, 11:15 AMflow.run()
), the custom log handler works. However, when registering the flow to my localhost deployment of prefect server and agent (flow.register()
), the log handlers don't seem to do anything (although the logs are still visible in the Prefect UI).
In the following example, when running the flow locally, all logs are sent to our Logstash instance successfully. However, when registering to localhost server and running the flow, no logs are sent to the Logstash.
import prefect
from prefect import task, Flow
from prefect.run_configs import LocalRun
from prefect.utilities.logging import get_logger as get_prefect_logger
@task(name='Test Task')
def test_task():
logger = prefect.context.get("logger")
logger.error('TEST LOG 1 - context logger')
logger = get_custom_logger()
logger.addHandler(get_logstash_handler())
logger.error('TEST LOG 2 - custom logger')
logger = get_prefect_logger()
logger.addHandler(get_logstash_handler())
logger.error('TEST LOG 3 - util logger')
return
with Flow("test_flow") as flow:
test_task()
task_logger = get_prefect_logger('Test Task')
task_logger.addHandler(get_logstash_handler())
if __name__ == "__main__":
flow.run_config = LocalRun()
#flow.run()
#flow.register(project_name='localhost')
Is there something I'm missing in this setup?Spencer
06/30/2021, 12:12 PMflow
in the script) and delivering it to the agent to execute. I'm not sure the logging handler will come along for the ride.Nikola Lusic
06/30/2021, 12:59 PMSpencer
06/30/2021, 1:41 PMKevin Kho
Kevin Kho
Kevin Kho
Nikola Lusic
06/30/2021, 3:06 PMlogger = prefect.context.get("logger")
I will try adding it as an extra logger.Nikola Lusic
06/30/2021, 3:49 PMimport logging
import requests
import prefect
from prefect import task, Flow
from prefect.utilities.logging import get_logger
class MyHandler(logging.StreamHandler):
def emit(self, record):
<http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg=record.msg))
@task(name="Task A")
def task_a():
logger = prefect.context.get("logger")
logger.error("error log")
<http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg='hello message'))
return
with Flow("logging-example") as flow:
result = task_a()
# now attach our custom handler to Task B's logger
task_logger = get_logger("Task A")
task_logger.addHandler(MyHandler())
if __name__ == "__main__":
flow.run()
flow.register(project_name='localhost')
When running this, the flow will send two messages to the local webserver ("error log", and "hello message").
After registering the flow to local server, and running it, only one message will hit the webserver ("hello message").
Local machine setup:
# spins up a local webserver running at <http://0.0.0.0:8000/>
python3 -m http.server
# starts the local prefect server
prefect agent local start
# starts the local prefect agent
prefect agent local start
Kevin Kho
Nikola Lusic
06/30/2021, 3:54 PMKevin Kho
Nikola Lusic
06/30/2021, 3:58 PMKevin Kho
Kevin Kho
import logging
import requests
import prefect
from prefect import task, Flow
from prefect.utilities.logging import get_logger
class MyHandler(logging.StreamHandler):
def emit(self, record):
<http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg=record.msg))
@task(name="Task A")
def task_b(x):
logger = prefect.context.get("logger")
logger.addHandler(MyHandler())
logger.warning("Returning {}".format(x))
<http://requests.post|requests.post>("<http://0.0.0.0:8000/>", params=dict(msg='hello message'))
return x
with Flow("logging-example-stream") as flow:
result = task_b(3)
flow.register("project-name")
Nikola Lusic
07/01/2021, 2:54 PMfrom logstash_async.handler import AsynchronousLogstashHandler
def get_logstash_handler(conf = None):
host = <LOGSTASH_ADDRESS>
port = 5044
handler = AsynchronousLogstashHandler(
host,
port,
ssl_enable=True,
ssl_verify=False,
transport="logstash_async.transport.BeatsTransport",
database_path=.data/logstash.db",
)
return handler
The issue was caused by the:
database_path=.data/logstash.db
Logstash messages were supposed to be queued in this file, but obviously that couldn't be done in a serialized flow, so they were lost. When setting it to None
(keeping the messages in memory), the logs are now being sent to Logstash.
It was the quiet failure that kept me tapping in dark far too long.
Thank you so much @Kevin Kho 🙏Kevin Kho