https://prefect.io logo
c

Chris Arderne

02/02/2022, 12:09 PM
@task
decorator has
log_stdout=True
but is there any way to add
stderr
to logging?
And if not is there any interest in a PR for this? (Looks like should just be a few lines adding
redirect_stderr
to
task_runner.py
.
a

Anna Geller

02/02/2022, 12:20 PM
This should be automatically included. Any time your task run encounters an error, the error message should be logged in your Prefect Cloud UI. The way I understand it is that
log_stdout=True
turns all print statements into messages with log level INFO and error messages are automatically logged with ERROR level.
Did you come across any issues with this? If so, was it when using Dask?
c

Chris Arderne

02/02/2022, 12:27 PM
I'm using
loguru
which outputs to
stderr
by default. This is in called functions with no Prefect code (i.e., not
Tasks
). And yes, using Dask, but the behaviour seems to be similar when I just ran a tiny local test.
If I run this with
prefect run -p test-prefect-logging.py
then the loguru log shows up, but doesn't seem to be captured by Prefect. Eg if I do
prefect run -p test-prefect-logging.py > output.txt
then the loguru output isn't there.
(And yes none of the
stderr
output from loguru on the Dask cluster shows up in Prefect Cloud.)
a

Anna Geller

02/02/2022, 12:30 PM
Yes, exactly. You would need to use Prefect logger instead:
Copy code
logger = prefect.context.get("logger")
or just print with log_stdout=True. Regarding Dask logs from worker nodes, this is a known issue, this page gives more info
c

Chris Arderne

02/02/2022, 12:31 PM
If I add
Copy code
import sys
logger.add(sys.stdout)
just below the imports in the gist, then the "hello from fn2" shows up twice, which I assume means that Prefect is picking it up...
We'd prefer not to use Prefect in those called functions, as they are used in other ways as well, where prefect context is not available...
a

Anna Geller

02/02/2022, 12:34 PM
I would encourage you to use the Prefect logger within the Prefect tasks, and when you log within your custom code, you could e.g. just use print statements and then it will be picked up by Prefect logger. Or alternatively (way more setup), you would need to configure extra loggers but I wouldn’t go that route. Having said that, we hear you and this is easier and more configurable in Orion
c

Chris Arderne

02/02/2022, 12:37 PM
Okay thanks! Will look into your suggestions in the Dask link, hopefully we can get something working!
👍 1
a

Anna Geller

02/02/2022, 12:39 PM
@Andrew Black FYI - example logging issue
c

Chris Arderne

02/02/2022, 2:03 PM
@Anna Geller I'm confused by what you said about
stderr
. I've written the following basic flow:
Copy code
import sys
from prefect import Flow, task
from prefect.executors import DaskExecutor

@task(log_stdout=True)
def task_one():
    print("Hello from stdout")
    print("Hello from STDERR", file=sys.stderr)

with Flow("test-stderr", executor=DaskExecutor()) as flow:
    a = task_one()
Edit: added
DaskExecutor()
.
a

Anna Geller

02/02/2022, 2:04 PM
Sorry for causing confusion, happy to clarify 🙂
c

Chris Arderne

02/02/2022, 2:04 PM
I registered it with Prefect Cloud and ran it via a Local Agent. Only the stdout makes it to the Cloud logs.
Note that it is running with
distributed
.
a

Anna Geller

02/02/2022, 2:09 PM
What I meant by stderror was rather when your task throws an exception because e.g. the API request timed out or your task run ended in database error due to constraint violation. So something like this would show what I meant:
Copy code
from prefect import Flow, task
from prefect.executors import DaskExecutor

@task(log_stdout=True)
def task_one():
    print("Hello from stdout")
    raise RuntimeError("Something bad happened in this task run. This message will appear in your Prefect Cloud UI")

with Flow("test-stderr", executor=DaskExecutor()) as flow:
    a = task_one()
c

Chris Arderne

02/02/2022, 2:18 PM
So
stderr
by itself isn't picked up unless an error is raised. Do you think worth creating a GH issue on this/posting a PR? Or this excluded for a reason?
Separately, Prefect picks up
stdout
but for some reason doesn't pick up logging to stdout, e.g.
Copy code
import logging
logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger()
logger.warning("This should show up in Prefect Cloud but doesn't")
EDIT: Have checked and this is something with
redirect_stdout
and not Prefect.
z

Zanie

02/02/2022, 2:43 PM
Hey Chris, the stdout capturing is a bit of a hack. There’s not really a great mechanism to actually accomplish it. I’m not surprised it doesn’t work for log messages.
It seems like we could add a
log_stderr
bool in the same way, but I’m not sure enough people would use it for it to be worth it.
It seems like what you want is a way to add the Prefect log handler to loguru
👍 2
c

Chris Arderne

02/02/2022, 2:51 PM
Btw have posted on Stackoverflow about the stdout thing. I guess the solution is either (a) use the Extra loggers approach (b) write logs to disk somewhere or (c) investigate
KubeCluster.get_logs()
.
Yeah was hoping for an easy generic solution but looks like I should just go and figure that out.
z

Zanie

02/02/2022, 3:16 PM
Feel free to open an issue if you can’t attach the handler to loguru easily.
a

Anna Geller

02/02/2022, 3:28 PM
I’m reading this doc and it looks like sink accepts a handler object as well. Can you try adding this to your custom non-Prefect code to add Prefect’s handlers as Michael suggested?
Copy code
from loguru import logger
from prefect.utilities.logging import get_logger

prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)
z

Zanie

02/02/2022, 3:53 PM
This change probably won’t persist to a worker though
a

Anna Geller

02/02/2022, 3:56 PM
ah true, thanks
c

Chris Arderne

02/02/2022, 4:22 PM
Tried this and got
_pickle.PicklingError: Can't pickle <function task_first at 0x7fc1d6147c10>: attribute lookup task_first on __main__ failed
.
a

Anna Geller

02/02/2022, 4:38 PM
Thanks for trying this approach. This error makes sense since Dask requires tasks to be cloudpickle-serializable. Could you share how did you configure it in your Prefect flow and non-Prefect modules?
c

Chris Arderne

02/02/2022, 9:37 PM
Code that gives the pickle error:
Copy code
from loguru import logger
from prefect import Flow, task
from prefect.executors import DaskExecutor
from prefect.utilities.logging import get_logger

prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)

def called():
    logger.warning("LOGGER")

@task(log_stdout=True)
def task_first():
    called()

executor = DaskExecutor()
with Flow("test-logging-pickle", executor=DaskExecutor()) as flow:
    a = task_first()
And now what I'm trying to use is
EXTRA_LOGGERS
to capture the output of my other loggers:
Copy code
import logging
from dask_kubernetes import KubeCluster
from prefect import Flow, task
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import GitHub

logger = logging.getLogger("foo")

def called():
    print("PRINT")
    logger.warning("LOGGER")

@task(log_stdout=True)
def task_first():
    called()

executor = DaskExecutor(cluster_class=KubeCluster, cluster_kwargs=...)
storage = GitHub(...)
run_config = KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": '["foo"]'})
with Flow("test", executor=executor, storage=storage, run_config=run_config) as flow:
    a = task_first()
• First I tried this with just a plain
DaskExecutor()
and
LocalRun
and triggered it from the Cloud to run on a local agent. It worked and LOGGER was captured. • Then I tried running as above, running on the k8s cluster with KubeCluster, and the LOGGER is not captured. In either case, the PRINT is still captured, so it clearly is possible to get at least stdout out from the Dask cluster. Is there a way to get Prefect logs playing nicely with a Dask cluster? Am I doing anything wrong above?
z

Zanie

02/02/2022, 10:12 PM
I think you may need to set the extra loggers variable on the dask workers as well
Since logging is configured per-process
c

Chris Arderne

02/03/2022, 10:10 AM
Woohoo that did it! Thanks @Zanie and @Anna Geller sorry for being dense. 🙂 Still need to figure out how to get loguru to play nicely but that's not your problem. Sharing code below for posterity, also KubeCluster ref. I'm using a function for the executor for other reasons:
Copy code
env = {"PREFECT__LOGGING__EXTRA_LOGGERS": '["foo"]'}
def cluster(**kwargs):
    spec = make_pod_spec(..., env=env)          # put env here
    return KubeCluster(spec, env=env, **kwargs) # or here

executor = DaskExecutor(cluster_class=cluster)
Otherwise just do this:
Copy code
executor = DaskExecutor(
    cluster_class=KubeCluster,
    cluster_kwargs={"env": env, ...},
)
upvote 1
a

Anna Geller

02/03/2022, 1:15 PM
@Chris Arderne I looked a bit into loguru logs and wrote down what I’ve found so far in this Discourse topic - not sure if this works with a remote distributed Dask cluster but using a local Dask Cluster seemed to work - LMK if you can’t reproduce.
c

Chris Arderne

02/03/2022, 1:48 PM
Confirming that it works on a distributed KubeCluster. Only difference from what I tried before is to move the
stream_handler
stuff inside the called function, which is pretty obvious in hindsight!
a

Anna Geller

02/03/2022, 2:49 PM
Amazing and thanks for confirming that! I will add that insight to the topic
z

Zanie

02/03/2022, 4:02 PM
Great 🙂 THanks for sharing!
3 Views