Chris Arderne
02/02/2022, 12:09 PM@task
decorator has log_stdout=True
but is there any way to add stderr
to logging?redirect_stderr
to task_runner.py
.Anna Geller
log_stdout=True
turns all print statements into messages with log level INFO and error messages are automatically logged with ERROR level.Chris Arderne
02/02/2022, 12:27 PMloguru
which outputs to stderr
by default. This is in called functions with no Prefect code (i.e., not Tasks
). And yes, using Dask, but the behaviour seems to be similar when I just ran a tiny local test.prefect run -p test-prefect-logging.py
then the loguru log shows up, but doesn't seem to be captured by Prefect. Eg if I do prefect run -p test-prefect-logging.py > output.txt
then the loguru output isn't there.stderr
output from loguru on the Dask cluster shows up in Prefect Cloud.)Anna Geller
logger = prefect.context.get("logger")
or just print with log_stdout=True.
Regarding Dask logs from worker nodes, this is a known issue, this page gives more infoChris Arderne
02/02/2022, 12:31 PMimport sys
logger.add(sys.stdout)
just below the imports in the gist, then the "hello from fn2" shows up twice, which I assume means that Prefect is picking it up...Anna Geller
Chris Arderne
02/02/2022, 12:37 PMAnna Geller
Chris Arderne
02/02/2022, 2:03 PMstderr
. I've written the following basic flow:
import sys
from prefect import Flow, task
from prefect.executors import DaskExecutor
@task(log_stdout=True)
def task_one():
print("Hello from stdout")
print("Hello from STDERR", file=sys.stderr)
with Flow("test-stderr", executor=DaskExecutor()) as flow:
a = task_one()
Edit: added DaskExecutor()
.Anna Geller
Chris Arderne
02/02/2022, 2:04 PMdistributed
.Anna Geller
from prefect import Flow, task
from prefect.executors import DaskExecutor
@task(log_stdout=True)
def task_one():
print("Hello from stdout")
raise RuntimeError("Something bad happened in this task run. This message will appear in your Prefect Cloud UI")
with Flow("test-stderr", executor=DaskExecutor()) as flow:
a = task_one()
Chris Arderne
02/02/2022, 2:18 PMstderr
by itself isn't picked up unless an error is raised. Do you think worth creating a GH issue on this/posting a PR? Or this excluded for a reason?stdout
but for some reason doesn't pick up logging to stdout, e.g.
import logging
logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger()
logger.warning("This should show up in Prefect Cloud but doesn't")
EDIT: Have checked and this is something with redirect_stdout
and not Prefect.Zanie
log_stderr
bool in the same way, but I’m not sure enough people would use it for it to be worth it.Chris Arderne
02/02/2022, 2:51 PMKubeCluster.get_logs()
.Zanie
Anna Geller
from loguru import logger
from prefect.utilities.logging import get_logger
prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)
Zanie
Anna Geller
Chris Arderne
02/02/2022, 4:22 PM_pickle.PicklingError: Can't pickle <function task_first at 0x7fc1d6147c10>: attribute lookup task_first on __main__ failed
.Anna Geller
Chris Arderne
02/02/2022, 9:37 PMfrom loguru import logger
from prefect import Flow, task
from prefect.executors import DaskExecutor
from prefect.utilities.logging import get_logger
prefect_logger = get_logger()
stream_handler, cloud_handler = prefect_logger.handlers
logger.add(sink=stream_handler)
logger.add(sink=cloud_handler)
def called():
logger.warning("LOGGER")
@task(log_stdout=True)
def task_first():
called()
executor = DaskExecutor()
with Flow("test-logging-pickle", executor=DaskExecutor()) as flow:
a = task_first()
EXTRA_LOGGERS
to capture the output of my other loggers:
import logging
from dask_kubernetes import KubeCluster
from prefect import Flow, task
from prefect.executors import DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import GitHub
logger = logging.getLogger("foo")
def called():
print("PRINT")
logger.warning("LOGGER")
@task(log_stdout=True)
def task_first():
called()
executor = DaskExecutor(cluster_class=KubeCluster, cluster_kwargs=...)
storage = GitHub(...)
run_config = KubernetesRun(env={"PREFECT__LOGGING__EXTRA_LOGGERS": '["foo"]'})
with Flow("test", executor=executor, storage=storage, run_config=run_config) as flow:
a = task_first()
DaskExecutor()
and LocalRun
and triggered it from the Cloud to run on a local agent. It worked and LOGGER was captured.
• Then I tried running as above, running on the k8s cluster with KubeCluster, and the LOGGER is not captured.
In either case, the PRINT is still captured, so it clearly is possible to get at least stdout out from the Dask cluster.
Is there a way to get Prefect logs playing nicely with a Dask cluster? Am I doing anything wrong above?Zanie
Chris Arderne
02/03/2022, 10:10 AMenv = {"PREFECT__LOGGING__EXTRA_LOGGERS": '["foo"]'}
def cluster(**kwargs):
spec = make_pod_spec(..., env=env) # put env here
return KubeCluster(spec, env=env, **kwargs) # or here
executor = DaskExecutor(cluster_class=cluster)
Otherwise just do this:
executor = DaskExecutor(
cluster_class=KubeCluster,
cluster_kwargs={"env": env, ...},
)
Anna Geller
Chris Arderne
02/03/2022, 1:48 PMstream_handler
stuff inside the called function, which is pretty obvious in hindsight!Anna Geller
Zanie