I am investigating how we can aggregate module lev...
# ask-community
s
I am investigating how we can aggregate module level logs from our Dask workers in Prefect Cloud. I had discussed this a bit with @Jim Crist-Harif in this thread https://prefect-community.slack.com/archives/CL09KU1K7/p1617040250140900?thread_ts=1616957545.069800&cid=CL09KU1K7 where he mentioned that configuring this was somewhat complex. The extra loggers documentation section https://docs.prefect.io/core/concepts/logging.html#extra-loggers demonstrates how to configure module level loggers to use Prefect’s
CloudHandler
via environment settings. I used the appropriate environment settings in my
run_config
with
Copy code
flow.run_config = ECSRun(
            image=worker_image,
            labels=["dask_test"],
            task_definition=definition,
            env={"PREFECT__LOGGING__EXTRA_LOGGERS": "pangeo_forge.recipe"}
        )
I have also configured the task’s regular Python logging with
logging.getLogger("pangeo_forge.recipe").setLevel(level=logging.DEBUG)
and this logger is successfully writing to my Cloudwatch logs for the Dask worker. But none of this module’s log entries are being written to Prefect Cloud. Any suggestions on how I should configure things so that my Dask workers’ log streams are also written to Cloud?
k
Hi @Sean Harkins! Thanks for the detail in your posts, but just a reminder if we could move some of the details in the thread to not drown out other messages in the channel. I think your syntax might be off with the
EXTRA_LOGGERS
here. Can you try
"['pangeo_forge.recipe']"
instead? Note the outer quote and the bracket. I was able to replicate your behavior with Cloud and LocalRun by removing the outer quotes and bracket.
s
@Kevin Kho Apologies for the long initial message, as any of my colleagues will tell you, my Slack skills are pretty weak 🤦 😄 . In your test, what type of executor were you using? I re-ran using the bracketed string (again, this is running using a DaskExecutor so the logger is operating on the Dask worker) but I am still seeing the same behavior where my module’s logs are not in Prefect Cloud. To illustrate, here is a sample of the logs in Prefect Cloud
Copy code
17 April 2021,10:07:45 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[0]': Starting task run...
17 April 2021,10:07:47 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[0]': Finished task run for task with final state: 'Success'
17 April 2021,10:07:48 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[11]': Starting task run...
17 April 2021,10:07:49 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[11]': Finished task run for task with final state: 'Success'
17 April 2021,10:07:49 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[10]': Starting task run...
17 April 2021,10:07:51 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[10]': Finished task run for task with final state: 'Success'
And here are the logs captured in Cloudwatch for the Dask worker
Copy code
INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[0]': Starting task run...

INFO:pangeo_forge.recipe:Caching input (0,)

[2021-04-17 15:07:47+0000] INFO - prefect.CloudTaskRunner | Task 'MappedTaskWrapper[0]': Finished task run for task with final state: 'Success'

INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[0]': Finished task run for task with final state: 'Success'

[2021-04-17 15:07:48+0000] INFO - prefect.CloudTaskRunner | Task 'MappedTaskWrapper[11]': Starting task run...

INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[11]': Starting task run...

INFO:pangeo_forge.recipe:Caching input (11,)

[2021-04-17 15:07:49+0000] INFO - prefect.CloudTaskRunner | Task 'MappedTaskWrapper[11]': Finished task run for task with final state: 'Success'

INFO:prefect.CloudTaskRunner:Task 'MappedTaskWrapper[11]': Finished task run for task with final state: 'Success'
The module logs from the worker don’t seem to be broadcast to the CloudHandler.
k
No worries! Ah gotcha. Will take a deeper look at this later. Will probably get back to you on Monday
s
👍
k
Hey @Sean Harkins! Unfortunately as Jim said in that thread, collection logs from Dask workers is not something Prefect supports. This has to do with the fact that worker logs output to standard error and Dask itself does not really move logs around. So by default, workers don't send logs to the Client. It seems like you would have to set up your own service or find a way to delegate this to the cluster manager. This is not something we can really help with. Sorry about that.
s
@Kevin Kho The thread from @Jim Crist-Harif above was specifically about an edge case timing issue when using the Prefect logger within a task which is running on a Dask worker and this PR addresses that https://github.com/PrefectHQ/prefect/pull/4334 so the Prefect logger can write from a worker to Prefect Cloud. I am not specifically interested in the Dask
distributed
logger but as stated in the link you mentioned,
distributed
uses the standard logger so according to the extra logger docs I should be able to specify them and have log output directed to Cloud. At the moment and I was just trying to get my custom module’s logger output directed to cloud via extra loggers. The issue was that I was configuring
PREFECT__LOGGING__EXTRA_LOGGERS
for the
ECSRun
rather than the
DaskExecutor
which defines the environment settings for the Dask scheduler and worker nodes. So using
Copy code
DaskExecutor(
   cluster_class="dask_cloudprovider.aws.FargateCluster",
   "environment": {
      "PREFECT__LOGGING__EXTRA_LOGGERS": "['pangeo_forge.recipe']"
    },
)
My module’s log output gets directed to Cloud.
Copy code
17 April 2021,03:08:45 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[6]': Starting task run...
17 April 2021,03:08:45 	pangeo_forge.recipe	INFO	Caching input (6,)
17 April 2021,03:08:46 	prefect.CloudTaskRunner	INFO	Task 'MappedTaskWrapper[6]': Finished task run for task with final state: 'Success'
But now as a follow up I would like to do what you were discussing and configure the
distributed
in
PREFECT__LOGGING__EXTRA_LOGGERS
but it doesn’t seem to be directing logs to the
CloudHandler
. Is this because Dask has already configured the handler?https://github.com/dask/distributed/blob/611414c0529daba3007a13a2ee7230e1881942b9/distributed/config.py#L86
k
Thanks for the very descriptive resolution from the environment variable on ECS. I learned a lot. About the last question, I can't elaborate more beyond Chris's statement in a Github Issue earlier that
the Python logging module just re-initializes itself after pickling
. So attaching the handler does not survive being sent to the workers.
s
Thanks for all your assistance on this @Kevin Kho. 🙇 Can you point me to the link for that Github issue?
k
It's very short. Sorry can't be of more help here.
Re-reading that issue, I guess it's a bit different, but I believe the explanation is still related that the logger will not survive after being sent to the workers.