I'm creating an integration with datadog and I nee...
# ask-community
l
I'm creating an integration with datadog and I need to add a custom handler to prefect's flows. I need to add a handler that gets all logs from the run and spits it out as json. I've seen a lot of examples like the one bellow, but that's not what I need. I don't want to add a handler to each logger.
Copy code
logger = prefect.context.get('logger')
logger.addHandler(log_handler)
I need something like this
Copy code
log_handler = logging.StreamHandler()
log_handler.setFormatter(DatadogFormatter())
get_logger().addHandler(log_handler)
I add the handler to the 'root' logger and everything is logged the way I need. I've tried the latter and it works fine with
flow.run()
, but when I register the flow I can't get it to work with k8s. More details in the thread
flow.run output:
Copy code
INFO - prefect.FlowRunner | Beginning Flow run for 'Test'
{"message": "Beginning Flow run for 'Test'", "timestamp": "2021-04-06T10:50:57.480545+00:00", "status": "info", "logger": {"name": "prefect.FlowRunner"}}
INFO - prefect.TaskRunner | Task 'run': Starting task run...
{"message": "Task 'run': Starting task run...", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.522043+00:00", "status": "info", "logger": {"name": "prefect.TaskRunner"}}
INFO - prefect.run | ######## Hand/lers
{"message": "######## Hand/lers", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.522366+00:00", "status": "info", "logger": {"name": "prefect.run"}}
INFO - prefect.run | []
{"message": "[]", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.522527+00:00", "status": "info", "logger": {"name": "prefect.run"}}
INFO - prefect.TaskRunner | Task 'run': Finished task run for task with final state: 'Success'
{"message": "Task 'run': Finished task run for task with final state: 'Success'", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.603722+00:00", "status": "info", "logger": {"name": "prefect.TaskRunner"}}
INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
{"message": "Flow run SUCCESS: all reference tasks succeeded", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "timestamp": "2021-04-06T10:50:57.604533+00:00", "status": "info", "logger": {"name": "prefect.FlowRunner"}}
K8s output:
Copy code
$ kubectl logs prefect-job-ee17eab3-z6pvp -f
[2021-04-06 10:49:50+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'Test'
[2021-04-06 10:49:53+0000] INFO - prefect.CloudTaskRunner | Task 'run': Starting task run...
[2021-04-06 10:49:54+0000] INFO - prefect.run | ######## Hand/lers
[2021-04-06 10:49:54+0000] INFO - prefect.run | []
[2021-04-06 10:49:55+0000] INFO - prefect.CloudTaskRunner | Task 'run': Finished task run for task with final state: 'Success'
[2021-04-06 10:49:55+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
As seen, my flow ignores my custom handler when executing on k8s.
python file
Copy code
# datadog.py
import sys
import logging
import prefect
from prefect.engine.flow_runner import FlowRunner
from prefect import Flow, task
from pythonjsonlogger import jsonlogger
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.utilities.logging import get_logger


class DatadogFormatter(jsonlogger.JsonFormatter):
    def __init__(self):
        super().__init__(timestamp=True)

    def add_fields(self, log_record, record, message_dict):
        super(DatadogFormatter, self).add_fields(log_record, record, message_dict)
        log_record["status"] = record.levelname.lower()
        log_record["logger"] = {"name": record.name}
        if record.exc_info:
            log_record["error"] = {
                "kind": record.exc_info[0].__name__,
                "stack": message_dict.get("stack_info"),
                "message": message_dict.get("exc_info"),
            }
            log_record.pop("exc_info", None)
            log_record.pop("stack_info", None)

        if log_record['message'].startswith('Flow run FAILED:'):
            log_record['status'] = 'failure'



###### Log Handler
log_handler = logging.StreamHandler()
log_handler.setFormatter(DatadogFormatter())
get_logger().addHandler(log_handler)


@task(log_stdout=True)
def run():
    logger = prefect.context.get('logger')
    # logger.addHandler(log_handler)  I don't want to use this, I want all logs

    <http://logger.info|logger.info>('######## Hand/lers')
    <http://logger.info|logger.info>(logger.handlers)

    # 2/0


storage = Docker(
    image_name='datadog_test',
    registry_url="<http://ghcr.io/company|ghcr.io/company>",
    python_dependencies=['python-json-logger'],
)
with Flow('Test', storage=storage) as flow:
    run()


flow.run_config = KubernetesRun(
    labels=['datadog'],
    cpu_request=1,
)

#flow.run()
flow.register(project_name='default')
m
Hi @Levi Leal - it looks to me like you need to “register” the logger (basically to add the logger name under
PREFECT__LOGGING__EXTRA_LOGGERS
) See https://docs.prefect.io/core/concepts/logging.html#extra-loggers
l
@Marwan Sarieddine, My logger has no name. I need it to be the root logger and catch all logging info.
m
Hmm - I see, sorry I am not quite sure how one would go about a registering a root logger - maybe someone on the prefect team can help
k
Is this what you’re looking for?
Copy code
import os
import sys
import logging
import prefect
from prefect import task, Flow

def get_logger():
    logger = logging.getLogger("my-named-logger")
    logger.addHandler(logging.StreamHandler(sys.stdout))
    logger.setLevel(logging.DEBUG)
    return logger

@task()
def log_message():
    logger = get_logger()
    <http://logger.info|logger.info>("Hello world!")
    logger.error("Foo!")

with Flow('flow-with-extra-logger') as flow:
	log_message()
    
flow.run()
l
@Kevin Kho, Nope, I don't want to add the handler to each logger I get. Also this method doesn't get the "default" logging from prefect. When I say default I mean the statements Flow started, Task Start, Task End, Flow End, etc
k
Ok will look into it
z
Hey @Levi Leal -- adding log handlers to the
FlowRunner
/
TaskRunner
loggers isn't currently supported. We get this request often so I'll open an issue to track a feature to do this.
👍 1
l
@Zanie Thanks
m
@Zanie or @Levi Leal, do you know if this issue was solved in the meantime?
l
No real solution. I'm using a task to "initialize" the logger and add the handler
Copy code
from prefect import get_logger
def init_logger():
    logger = get_logger
    logger.addHandler(logging.StreamHandler(sys.stdout))
    logger.setLevel(logging.DEBUG)
    
@task()
def log_message():
    logger = prefect.get('logger')
    <http://logger.info|logger.info>("Hello world!")
    logger.error("Foo!")
    
with Flow('flow-with-extra-logger', tasks=[init_logger]) as flow:	
	log_message()
i
Hello, thank you for sharing. I need "default" logging as well. What is the last status for that feature?
@Zanie @Kevin Kho
z
Hey @Ismail Cenik -- we've sketched a solution to logging in https://github.com/PrefectHQ/prefect/pull/4550 and it's on our roadmap for the year.
👍 2
188 Views