s

    Sandeep Aggarwal

    2 years ago
    How to set
    PREFECT__LOGGING__LOG_ATTRIBUTES
    ? I am trying to send a unique id while creating a flow run using python client. I want this uuid to be available in logs for tracking the request flow. However, so far I haven't had any success in correctly setting the extra log attributes. This is how i start the docker agent:
    prefect agent start docker -e PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
    I tried debugging and found that the log attributes are fetched as string here, instead of list.
    Chris White

    Chris White

    2 years ago
    Hi Sandeep - can you describe what error you are seeing? This worked for me:
    # start an ipython session with env vars
    PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']" PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s" ipython
    and then
    import prefect
    
    logger = prefect.utilities.logging.get_logger()
    
    with prefect.context(uuid="42"):
        logger.debug("hi")
    I get:
    hi | 42
    as I expect
    s

    Sandeep Aggarwal

    2 years ago
    Thanks for your reply @Chris White. Yes this works as expected when running locally. However, when using server with docker agent and dask cluster running on my local machine, I see a strange behaviour. I start agent with below command and also set prefect logging config in my worker environment variables, everything works fine.
    prefect agent start docker -v -f -e PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']" -e PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s" --network prefect_logging_default
    
    # This is dask worker service in my compose file:
    dask_worker:
        image: "daskdev/dask"
        command: dask-worker dask_scheduler:8786
        extra_hosts:
            - "host.docker.internal:172.17.0.1"
        environment:
            - PREFECT__LOGGING__LEVEL=DEBUG
            - PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
            - PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s"
    
    # Agent output:
    Beginning Flow run for 'logging-flow' | None
    Starting flow run. | 42
    Flow 'logging-flow': Handling state change from Scheduled to Running | 42
    Flow run SUCCESS: all reference tasks succeeded | 42
    Flow 'logging-flow': Handling state change from Running to Success | 42
    
    # Worker output:
    distributed.core - INFO - Starting established connection
    Task 'log_with_attrs': Starting task run... | 42
    Task 'log_with_attrs': Handling state change from Pending to Running | 42
    Task 'log_with_attrs': Calling task.run() method... | 42
    hi | 42
    Task 'log_with_attrs': Handling state change from Running to Success | 42
    Task 'log_with_attrs': finished task run for task with final state: 'Success' | 42
    But when logging config is not passed to agent (The logging config is still set in worker environment though), below exception is raised. The flow still runs as expected.
    prefect agent start docker -v -f  --network prefect_logging_default
    
    # This is dask worker service in my compose file:
    dask_worker:
        image: "daskdev/dask"
        command: dask-worker dask_scheduler:8786
        extra_hosts:
            - "host.docker.internal:172.17.0.1"
        environment:
            - PREFECT__LOGGING__LEVEL=DEBUG
            - PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
            - PREFECT__LOGGING__FORMAT="%(message)s | %(uuid)s"
    
    # Agent output:
    [2020-06-29 02:53:18] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'logging-flow'
    [2020-06-29 02:53:18] INFO - prefect.CloudFlowRunner | Starting flow run.
    [2020-06-29 02:53:18] DEBUG - prefect.CloudFlowRunner | Flow 'logging-flow': Handling state change from Scheduled to Running
    [2020-06-29 02:53:18] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-06-29 02:53:18] DEBUG - prefect.CloudFlowRunner | Flow 'logging-flow': Handling state change from Running to Success
    
    # Docker Worker logs
    File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
        self.run()
      File "/usr/local/lib/python3.7/threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.7/site-packages/distributed/threadpoolexecutor.py", line 55, in _worker
        task.run()
      File "/usr/local/lib/python3.7/site-packages/distributed/_concurrent_futures_thread.py", line 65, in run
        result = self.fn(*self.args, **self.kwargs)
      File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3419, in apply_function
        result = function(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 696, in run_task
        context=context,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 257, in wrapper
        return func(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 315, in run
        is_mapped_parent=is_mapped_parent,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 257, in wrapper
        return func(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 291, in run
        state = self.get_task_run_state(state, inputs=task_inputs)
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 66, in inner
        return runner_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 822, in get_task_run_state
        self.task.run, timeout=self.task.timeout, **raw_inputs
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 186, in timeout_handler
        return fn(*args, **kwargs)
      File "/home/sandeep/workspace/playground/prefect_logging/logging_flow.py", line 10, in log_with_attrs
    Message: 'hi'
    Arguments: ()
    --- Logging error ---
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/logging/__init__.py", line 1025, in emit
        msg = self.format(record)
      File "/usr/local/lib/python3.7/logging/__init__.py", line 869, in format
        return fmt.format(record)
      File "/usr/local/lib/python3.7/logging/__init__.py", line 611, in format
        s = self.formatMessage(record)
      File "/usr/local/lib/python3.7/logging/__init__.py", line 580, in formatMessage
        return self._style.format(record)
      File "/usr/local/lib/python3.7/logging/__init__.py", line 422, in format
        return self._fmt % record.__dict__
    KeyError: 'uuid'
    These are logs from UI's flow run -> logs tab, which doesn't show any errors.
    Chris White

    Chris White

    2 years ago
    Interesting OK; yea occasionally logging configuration is not properly passed on to dask workers so this sounds like a possible bug! I’ll open an issue for you and we can move this conversation there
    @Marvin open “Adding logging attributes raises error when running on dask”
    Marvin

    Marvin

    2 years ago
    s

    Sandeep Aggarwal

    2 years ago
    Great. Thanks @Chris White. I will follow the progress there.