`@task` decorator has `log_stdout=True` but is the...
# ask-community
c
@task
decorator has
log_stdout=True
but is there any way to add
stderr
to logging?
And if not is there any interest in a PR for this? (Looks like should just be a few lines adding
redirect_stderr
to
task_runner.py
.
a
This should be automatically included. Any time your task run encounters an error, the error message should be logged in your Prefect Cloud UI. The way I understand it is that
log_stdout=True
turns all print statements into messages with log level INFO and error messages are automatically logged with ERROR level.
Did you come across any issues with this? If so, was it when using Dask?
c
I'm using
loguru
which outputs to
stderr
by default. This is in called functions with no Prefect code (i.e., not
Tasks
). And yes, using Dask, but the behaviour seems to be similar when I just ran a tiny local test.
If I run this with
prefect run -p test-prefect-logging.py
then the loguru log shows up, but doesn't seem to be captured by Prefect. Eg if I do
prefect run -p test-prefect-logging.py > output.txt
then the loguru output isn't there.
(And yes none of the
stderr
output from loguru on the Dask cluster shows up in Prefect Cloud.)
a
Yes, exactly. You would need to use Prefect logger instead:
Copy code
logger = prefect.context.get("logger")
or just print with log_stdout=True. Regarding Dask logs from worker nodes, this is a known issue, this page gives more info
c
If I add
Copy code
import sys
logger.add(sys.stdout)
just below the imports in the gist, then the "hello from fn2" shows up twice, which I assume means that Prefect is picking it up...
We'd prefer not to use Prefect in those called functions, as they are used in other ways as well, where prefect context is not available...
a
I would encourage you to use the Prefect logger within the Prefect tasks, and when you log within your custom code, you could e.g. just use print statements and then it will be picked up by Prefect logger. Or alternatively (way more setup), you would need to configure extra loggers but I wouldn’t go that route. Having said that, we hear you and this is easier and more configurable in Orion
c
Okay thanks! Will look into your suggestions in the Dask link, hopefully we can get something working!
👍 1
a
@Andrew Black FYI - example logging issue
c
@Anna Geller I'm confused by what you said about
stderr
. I've written the following basic flow:
Copy code
import sys
from prefect import Flow, task
from prefect.executors import DaskExecutor

@task(log_stdout=True)
def task_one():
    print("Hello from stdout")
    print("Hello from STDERR", file=sys.stderr)

with Flow("test-stderr", executor=DaskExecutor()) as flow:
    a = task_one()
Edit: added
DaskExecutor()
.
a
Sorry for causing confusion, happy to clarify 🙂
c
I registered it with Prefect Cloud and ran it via a Local Agent. Only the stdout makes it to the Cloud logs.
Note that it is running with
distributed
.
a
What I meant by stderror was rather when your task throws an exception because e.g. the API request timed out or your task run ended in database error due to constraint violation. So something like this would show what I meant:
Copy code
from prefect import Flow, task
from prefect.executors import DaskExecutor

@task(log_stdout=True)
def task_one():
    print("Hello from stdout")
    raise RuntimeError("Something bad happened in this task run. This message will appear in your Prefect Cloud UI")

with Flow("test-stderr", executor=DaskExecutor()) as flow:
    a = task_one()
c
So
stderr
by itself isn't picked up unless an error is raised. Do you think worth creating a GH issue on this/posting a PR? Or this excluded for a reason?
Separately, Prefect picks up
stdout
but for some reason doesn't pick up logging to stdout, e.g.
Copy code
import logging
logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger()
logger.warning("This should show up in Prefect Cloud but doesn't")
EDIT: Have checked and this is something with
redirect_stdout
and not Prefect.
z
Hey Chris, the stdout capturing is a bit of a hack. There’s not really a great mechanism to actually accomplish it. I’m not surprised it doesn’t work for log messages.
It seems like we could add a
log_stderr
bool in the same way, but I’m not sure enough people would use it for it to be worth it.
It seems like what you want is a way to add the Prefect log handler to loguru
👍 2
c
Btw have posted on Stackoverflow about the stdout thing. I guess the solution is either (a) use the Extra loggers approach (b) write logs to disk somewhere or (c) investigate
KubeCluster.get_logs()
.
Yeah was hoping for an easy generic solution but looks like I should just go and figure that out.
z
Feel free to open an issue if you can’t attach the handler to loguru easily.
a
I’m reading this doc and it looks like sink accepts a handler object as well. Can you try adding this to your custom non-Prefect code to add Prefect’s handlers as Michael suggested?
Copy code
from loguru import logger
from prefect.utilities.logging import get_logger

prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)
z
This change probably won’t persist to a worker though
a
ah true, thanks
c
Tried this and got
_pickle.PicklingError: Can't pickle <function task_first at 0x7fc1d6147c10>: attribute lookup task_first on __main__ failed
.
a
Thanks for trying this approach. This error makes sense since Dask requires tasks to be cloudpickle-serializable. Could you share how did you configure it in your Prefect flow and non-Prefect modules?
c
Code that gives the pickle error:
Copy code
from loguru import logger
from prefect import Flow, task
from prefect.executors import DaskExecutor
from prefect.utilities.logging import get_logger

prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)

def called():
    logger.warning("LOGGER")

@task(log_stdout=True)
def task_first():
    called()

executor = DaskExecutor()
with Flow("test-logging-pickle", executor=DaskExecutor()) as flow:
    a = task_first()
And now what I'm trying to use is
EXTRA_LOGGERS
to capture the output of my other loggers:
Copy code
import logging
from dask_kubernetes import KubeCluster
from prefect import Flow, task
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import GitHub

logger = logging.getLogger("foo")

def called():
    print("PRINT")
    logger.warning("LOGGER")

@task(log_stdout=True)
def task_first():
    called()

executor = DaskExecutor(cluster_class=KubeCluster, cluster_kwargs=...)
storage = GitHub(...)
run_config = KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": '["foo"]'})
with Flow("test", executor=executor, storage=storage, run_config=run_config) as flow:
    a = task_first()
• First I tried this with just a plain
DaskExecutor()
and
LocalRun
and triggered it from the Cloud to run on a local agent. It worked and LOGGER was captured. • Then I tried running as above, running on the k8s cluster with KubeCluster, and the LOGGER is not captured. In either case, the PRINT is still captured, so it clearly is possible to get at least stdout out from the Dask cluster. Is there a way to get Prefect logs playing nicely with a Dask cluster? Am I doing anything wrong above?
z
I think you may need to set the extra loggers variable on the dask workers as well
Since logging is configured per-process
c
Woohoo that did it! Thanks @Zanie and @Anna Geller sorry for being dense. 🙂 Still need to figure out how to get loguru to play nicely but that's not your problem. Sharing code below for posterity, also KubeCluster ref. I'm using a function for the executor for other reasons:
Copy code
env = {"PREFECT__LOGGING__EXTRA_LOGGERS": '["foo"]'}
def cluster(**kwargs):
    spec = make_pod_spec(..., env=env)          # put env here
    return KubeCluster(spec, env=env, **kwargs) # or here

executor = DaskExecutor(cluster_class=cluster)
Otherwise just do this:
Copy code
executor = DaskExecutor(
    cluster_class=KubeCluster,
    cluster_kwargs={"env": env, ...},
)
upvote 1
a
@Chris Arderne I looked a bit into loguru logs and wrote down what I’ve found so far in this Discourse topic - not sure if this works with a remote distributed Dask cluster but using a local Dask Cluster seemed to work - LMK if you can’t reproduce.
c
Confirming that it works on a distributed KubeCluster. Only difference from what I tried before is to move the
stream_handler
stuff inside the called function, which is pretty obvious in hindsight!
a
Amazing and thanks for confirming that! I will add that insight to the topic
z
Great 🙂 THanks for sharing!