Levi Leal
04/06/2021, 11:04 AMlogger = prefect.context.get('logger')
logger.addHandler(log_handler)
I need something like this
log_handler = logging.StreamHandler()
log_handler.setFormatter(DatadogFormatter())
get_logger().addHandler(log_handler)
I add the handler to the 'root' logger and everything is logged the way I need.
I've tried the latter and it works fine with flow.run()
, but when I register the flow I can't get it to work with k8s.
More details in the threadLevi Leal
04/06/2021, 11:06 AMINFO - prefect.FlowRunner | Beginning Flow run for 'Test'
{"message": "Beginning Flow run for 'Test'", "timestamp": "2021-04-06T10:50:57.480545+00:00", "status": "info", "logger": {"name": "prefect.FlowRunner"}}
INFO - prefect.TaskRunner | Task 'run': Starting task run...
{"message": "Task 'run': Starting task run...", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.522043+00:00", "status": "info", "logger": {"name": "prefect.TaskRunner"}}
INFO - prefect.run | ######## Hand/lers
{"message": "######## Hand/lers", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.522366+00:00", "status": "info", "logger": {"name": "prefect.run"}}
INFO - prefect.run | []
{"message": "[]", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.522527+00:00", "status": "info", "logger": {"name": "prefect.run"}}
INFO - prefect.TaskRunner | Task 'run': Finished task run for task with final state: 'Success'
{"message": "Task 'run': Finished task run for task with final state: 'Success'", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "task_name": "run", "task_slug": "run-1", "task_run_id": "06f292f0-034a-4c32-9e0b-52437724444a", "timestamp": "2021-04-06T10:50:57.603722+00:00", "status": "info", "logger": {"name": "prefect.TaskRunner"}}
INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
{"message": "Flow run SUCCESS: all reference tasks succeeded", "flow_name": "Test", "flow_run_id": "1a72ec67-7aba-4d3c-8ef1-d68c6f9363eb", "timestamp": "2021-04-06T10:50:57.604533+00:00", "status": "info", "logger": {"name": "prefect.FlowRunner"}}
K8s output:
$ kubectl logs prefect-job-ee17eab3-z6pvp -f
[2021-04-06 10:49:50+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'Test'
[2021-04-06 10:49:53+0000] INFO - prefect.CloudTaskRunner | Task 'run': Starting task run...
[2021-04-06 10:49:54+0000] INFO - prefect.run | ######## Hand/lers
[2021-04-06 10:49:54+0000] INFO - prefect.run | []
[2021-04-06 10:49:55+0000] INFO - prefect.CloudTaskRunner | Task 'run': Finished task run for task with final state: 'Success'
[2021-04-06 10:49:55+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
Levi Leal
04/06/2021, 11:06 AMLevi Leal
04/06/2021, 11:08 AM# datadog.py
import sys
import logging
import prefect
from prefect.engine.flow_runner import FlowRunner
from prefect import Flow, task
from pythonjsonlogger import jsonlogger
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.utilities.logging import get_logger
class DatadogFormatter(jsonlogger.JsonFormatter):
def __init__(self):
super().__init__(timestamp=True)
def add_fields(self, log_record, record, message_dict):
super(DatadogFormatter, self).add_fields(log_record, record, message_dict)
log_record["status"] = record.levelname.lower()
log_record["logger"] = {"name": record.name}
if record.exc_info:
log_record["error"] = {
"kind": record.exc_info[0].__name__,
"stack": message_dict.get("stack_info"),
"message": message_dict.get("exc_info"),
}
log_record.pop("exc_info", None)
log_record.pop("stack_info", None)
if log_record['message'].startswith('Flow run FAILED:'):
log_record['status'] = 'failure'
###### Log Handler
log_handler = logging.StreamHandler()
log_handler.setFormatter(DatadogFormatter())
get_logger().addHandler(log_handler)
@task(log_stdout=True)
def run():
logger = prefect.context.get('logger')
# logger.addHandler(log_handler) I don't want to use this, I want all logs
<http://logger.info|logger.info>('######## Hand/lers')
<http://logger.info|logger.info>(logger.handlers)
# 2/0
storage = Docker(
image_name='datadog_test',
registry_url="<http://ghcr.io/company|ghcr.io/company>",
python_dependencies=['python-json-logger'],
)
with Flow('Test', storage=storage) as flow:
run()
flow.run_config = KubernetesRun(
labels=['datadog'],
cpu_request=1,
)
#flow.run()
flow.register(project_name='default')
Marwan Sarieddine
04/06/2021, 12:24 PMPREFECT__LOGGING__EXTRA_LOGGERS
)
See https://docs.prefect.io/core/concepts/logging.html#extra-loggersLevi Leal
04/06/2021, 12:25 PMMarwan Sarieddine
04/06/2021, 12:27 PMKevin Kho
import os
import sys
import logging
import prefect
from prefect import task, Flow
def get_logger():
logger = logging.getLogger("my-named-logger")
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)
return logger
@task()
def log_message():
logger = get_logger()
<http://logger.info|logger.info>("Hello world!")
logger.error("Foo!")
with Flow('flow-with-extra-logger') as flow:
log_message()
flow.run()
Levi Leal
04/06/2021, 1:20 PMKevin Kho
Zanie
FlowRunner
/ TaskRunner
loggers isn't currently supported. We get this request often so I'll open an issue to track a feature to do this.Levi Leal
04/06/2021, 3:08 PMManuel Mourato
05/09/2021, 2:35 PMLevi Leal
05/09/2021, 2:45 PMLevi Leal
05/09/2021, 2:45 PMfrom prefect import get_logger
def init_logger():
logger = get_logger
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)
@task()
def log_message():
logger = prefect.get('logger')
<http://logger.info|logger.info>("Hello world!")
logger.error("Foo!")
with Flow('flow-with-extra-logger', tasks=[init_logger]) as flow:
log_message()
Ismail Cenik
06/16/2021, 11:03 PMIsmail Cenik
06/16/2021, 11:03 PMZanie