Sandeep Aggarwal
06/27/2020, 12:41 PMPREFECT__LOGGING__LOG_ATTRIBUTES
?
I am trying to send a unique id while creating a flow run using python client. I want this uuid to be available in logs for tracking the request flow. However, so far I haven't had any success in correctly setting the extra log attributes.
This is how i start the docker agent:
prefect agent start docker -e PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
I tried debugging and found that the log attributes are fetched as string here, instead of list.Chris White
06/27/2020, 6:25 PM# start an ipython session with env vars
PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']" PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s" ipython
and then
import prefect
logger = prefect.utilities.logging.get_logger()
with prefect.context(uuid="42"):
logger.debug("hi")
I get:
hi | 42
as I expectSandeep Aggarwal
06/29/2020, 3:07 AMprefect agent start docker -v -f -e PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']" -e PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s" --network prefect_logging_default
# This is dask worker service in my compose file:
dask_worker:
image: "daskdev/dask"
command: dask-worker dask_scheduler:8786
extra_hosts:
- "host.docker.internal:172.17.0.1"
environment:
- PREFECT__LOGGING__LEVEL=DEBUG
- PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
- PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s"
# Agent output:
Beginning Flow run for 'logging-flow' | None
Starting flow run. | 42
Flow 'logging-flow': Handling state change from Scheduled to Running | 42
Flow run SUCCESS: all reference tasks succeeded | 42
Flow 'logging-flow': Handling state change from Running to Success | 42
# Worker output:
distributed.core - INFO - Starting established connection
Task 'log_with_attrs': Starting task run... | 42
Task 'log_with_attrs': Handling state change from Pending to Running | 42
Task 'log_with_attrs': Calling task.run() method... | 42
hi | 42
Task 'log_with_attrs': Handling state change from Running to Success | 42
Task 'log_with_attrs': finished task run for task with final state: 'Success' | 42
But when logging config is not passed to agent (The logging config is still set in worker environment though), below exception is raised. The flow still runs as expected.prefect agent start docker -v -f --network prefect_logging_default
# This is dask worker service in my compose file:
dask_worker:
image: "daskdev/dask"
command: dask-worker dask_scheduler:8786
extra_hosts:
- "host.docker.internal:172.17.0.1"
environment:
- PREFECT__LOGGING__LEVEL=DEBUG
- PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
- PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s"
# Agent output:
[2020-06-29 02:53:18] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'logging-flow'
[2020-06-29 02:53:18] INFO - prefect.CloudFlowRunner | Starting flow run.
[2020-06-29 02:53:18] DEBUG - prefect.CloudFlowRunner | Flow 'logging-flow': Handling state change from Scheduled to Running
[2020-06-29 02:53:18] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-06-29 02:53:18] DEBUG - prefect.CloudFlowRunner | Flow 'logging-flow': Handling state change from Running to Success
# Docker Worker logs
File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/threadpoolexecutor.py", line 55, in _worker
task.run()
File "/usr/local/lib/python3.7/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3419, in apply_function
result = function(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 696, in run_task
context=context,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 257, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 315, in run
is_mapped_parent=is_mapped_parent,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 257, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 291, in run
state = self.get_task_run_state(state, inputs=task_inputs)
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 66, in inner
return runner_method(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 822, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 186, in timeout_handler
return fn(*args, **kwargs)
File "/home/sandeep/workspace/playground/prefect_logging/logging_flow.py", line 10, in log_with_attrs
Message: 'hi'
Arguments: ()
--- Logging error ---
Traceback (most recent call last):
File "/usr/local/lib/python3.7/logging/__init__.py", line 1025, in emit
msg = self.format(record)
File "/usr/local/lib/python3.7/logging/__init__.py", line 869, in format
return fmt.format(record)
File "/usr/local/lib/python3.7/logging/__init__.py", line 611, in format
s = self.formatMessage(record)
File "/usr/local/lib/python3.7/logging/__init__.py", line 580, in formatMessage
return self._style.format(record)
File "/usr/local/lib/python3.7/logging/__init__.py", line 422, in format
return self._fmt % record.__dict__
KeyError: 'uuid'
Chris White
06/29/2020, 3:10 AMMarvin
06/29/2020, 3:10 AMSandeep Aggarwal
06/29/2020, 3:11 AM