Thread
#prefect-community
    Sean Harkins

    Sean Harkins

    1 year ago
    I am investigating how we can aggregate module level logs from our Dask workers in Prefect Cloud. I had discussed this a bit with @Jim Crist-Harif in this thread https://prefect-community.slack.com/archives/CL09KU1K7/p1617040250140900?thread_ts=1616957545.069800&cid=CL09KU1K7 where he mentioned that configuring this was somewhat complex. The extra loggers documentation section https://docs.prefect.io/core/concepts/logging.html#extra-loggers demonstrates how to configure module level loggers to use Prefect’s
    CloudHandler
    via environment settings. I used the appropriate environment settings in my
    run_config
    with
    flow.run_config = ECSRun(
                image=worker_image,
                labels=["dask_test"],
                task_definition=definition,
                env={"PREFECT__LOGGING__EXTRA_LOGGERS": "pangeo_forge.recipe"}
            )
    I have also configured the task’s regular Python logging with
    logging.getLogger("pangeo_forge.recipe").setLevel(level=logging.DEBUG)
    and this logger is successfully writing to my Cloudwatch logs for the Dask worker. But none of this module’s log entries are being written to Prefect Cloud. Any suggestions on how I should configure things so that my Dask workers’ log streams are also written to Cloud?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Sean Harkins! Thanks for the detail in your posts, but just a reminder if we could move some of the details in the thread to not drown out other messages in the channel. I think your syntax might be off with the
    EXTRA_LOGGERS
    here. Can you try
    "['pangeo_forge.recipe']"
    instead? Note the outer quote and the bracket. I was able to replicate your behavior with Cloud and LocalRun by removing the outer quotes and bracket.
    Sean Harkins

    Sean Harkins

    1 year ago
    @Kevin Kho Apologies for the long initial message, as any of my colleagues will tell you, my Slack skills are pretty weak 🤦 😄 . In your test, what type of executor were you using? I re-ran using the bracketed string (again, this is running using a DaskExecutor so the logger is operating on the Dask worker) but I am still seeing the same behavior where my module’s logs are not in Prefect Cloud. To illustrate, here is a sample of the logs in Prefect Cloud
    17 April 2021,10:07:45 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[0]': Starting task run...
    17 April 2021,10:07:47 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[0]': Finished task run for task with final state: 'Success'
    17 April 2021,10:07:48 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[11]': Starting task run...
    17 April 2021,10:07:49 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[11]': Finished task run for task with final state: 'Success'
    17 April 2021,10:07:49 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[10]': Starting task run...
    17 April 2021,10:07:51 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[10]': Finished task run for task with final state: 'Success'
    And here are the logs captured in Cloudwatch for the Dask worker
    INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[0]': Starting task run...
    
    INFO:pangeo_forge.recipe:Caching input (0,)
    
    [2021-04-17 15:07:47+0000] INFO - prefect.CloudTaskRunner | Task 'MappedTaskWrapper[0]': Finished task run for task with final state: 'Success'
    
    INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[0]': Finished task run for task with final state: 'Success'
    
    [2021-04-17 15:07:48+0000] INFO - prefect.CloudTaskRunner | Task 'MappedTaskWrapper[11]': Starting task run...
    
    INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[11]': Starting task run...
    
    INFO:pangeo_forge.recipe:Caching input (11,)
    
    [2021-04-17 15:07:49+0000] INFO - prefect.CloudTaskRunner | Task 'MappedTaskWrapper[11]': Finished task run for task with final state: 'Success'
    
    INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[11]': Finished task run for task with final state: 'Success'
    The module logs from the worker don’t seem to be broadcast to the CloudHandler.
    Kevin Kho

    Kevin Kho

    1 year ago
    No worries! Ah gotcha. Will take a deeper look at this later. Will probably get back to you on Monday
    Sean Harkins

    Sean Harkins

    1 year ago
    👍
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Sean Harkins! Unfortunately as Jim said in that thread, collection logs from Dask workers is not something Prefect supports. This has to do with the fact that worker logs output to standard error and Dask itself does not really move logs around. So by default, workers don't send logs to the Client. It seems like you would have to set up your own service or find a way to delegate this to the cluster manager. This is not something we can really help with. Sorry about that.
    Sean Harkins

    Sean Harkins

    1 year ago
    @Kevin Kho The thread from @Jim Crist-Harif above was specifically about an edge case timing issue when using the Prefect logger within a task which is running on a Dask worker and this PR addresses that https://github.com/PrefectHQ/prefect/pull/4334 so the Prefect logger can write from a worker to Prefect Cloud. I am not specifically interested in the Dask
    distributed
    logger but as stated in the link you mentioned,
    distributed
    uses the standard logger so according to the extra logger docs I should be able to specify them and have log output directed to Cloud. At the moment and I was just trying to get my custom module’s logger output directed to cloud via extra loggers. The issue was that I was configuring
    PREFECT__LOGGING__EXTRA_LOGGERS
    for the
    ECSRun
    rather than the
    DaskExecutor
    which defines the environment settings for the Dask scheduler and worker nodes. So using
    DaskExecutor(
       cluster_class="dask_cloudprovider.aws.FargateCluster",
       "environment": {
          "PREFECT__LOGGING__EXTRA_LOGGERS": "['pangeo_forge.recipe']"
        },
    )
    My module’s log output gets directed to Cloud.
    17 April 2021,03:08:45 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[6]': Starting task run...
    17 April 2021,03:08:45 	pangeo_forge.recipe	INFO	Caching input (6,)
    17 April 2021,03:08:46 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[6]': Finished task run for task with final state: 'Success'
    But now as a follow up I would like to do what you were discussing and configure the
    distributed
    in
    PREFECT__LOGGING__EXTRA_LOGGERS
    but it doesn’t seem to be directing logs to the
    CloudHandler
    . Is this because Dask has already configured the handler?https://github.com/dask/distributed/blob/611414c0529daba3007a13a2ee7230e1881942b9/distributed/config.py#L86
    Kevin Kho

    Kevin Kho

    1 year ago
    Thanks for the very descriptive resolution from the environment variable on ECS. I learned a lot. About the last question, I can't elaborate more beyond Chris's statement in a Github Issue earlier that
    the Python logging module just re-initializes itself after pickling
    . So attaching the handler does not survive being sent to the workers.
    Sean Harkins

    Sean Harkins

    1 year ago
    Thanks for all your assistance on this @Kevin Kho. 🙇 Can you point me to the link for that Github issue?
    Kevin Kho

    Kevin Kho

    1 year ago
    It's very short. Sorry can't be of more help here.
    Re-reading that issue, I guess it's a bit different, but I believe the explanation is still related that the logger will not survive after being sent to the workers.