Aric Huang
09/01/2021, 12:41 AMProcessPoolExecutor
and am having trouble getting print statements and stack traces from subprocesses to show up in the Prefect Cloud UI. When running the flow locally the logs show up in stdout/stderr, and having the task defined with @task(log_stdout)
makes print statements in subprocesses show up with a └── <time> | INFO
prefix (which I thought means it's getting picked up by the Prefect logger). However the subprocess print statements don't show up at all in the Prefect Cloud UI logs. Stack traces from subprocesses that crash also print locally but don't show up in the UI, which only shows a stack trace for the main process with concurrent.futures.process.BrokenProcessPool
. Is there a way to have a flow capture all subprocess output so it's visible in the UI?Aric Huang
09/01/2021, 12:44 AMrun_process_pool
in a task. The prints in initialize
and process
are what aren't showing up in the UI
from concurrent.futures import ProcessPoolExecutor, as_completed
def initialize():
print("Initializing")
pass
def process(input):
print(f"Processing: {input}")
return input
def run_process_pool():
inputs = [1,2,3,4]
with ProcessPoolExecutor(
max_workers=4, initializer=initialize
) as pool:
futures = [pool.submit(process, input) for input in inputs]
out = [f.result() for f in as_completed(futures)]
return out
Aric Huang
09/01/2021, 12:49 AMKevin Kho
process
function. You can also maybe try adding an explicit Handler to the Prefect logger for stderr or stdout? I think @task(log_stdout)
might do this though. Stuff printing locally is not indication it will print on Cloud, especially if it’s in a different subprocess.Aric Huang
09/01/2021, 3:42 PMcontextlib
to redirect stdout/stderr to call the Prefect logger directly and again that seemed to work locally (I can see the subprocess logs/stack traces prefixed with └── <time> | INFO
or └── <time> | ERROR
) but don't see those logs in the Cloud UI. Here is an example:
from prefect import task
import contextlib
import logging
from prefect.utilities.logging import get_logger
logger = get_logger()
class OutputLogger:
def __init__(self, name="root", level="INFO"):
self.logger = logger
self.name = self.logger.name
self.level = getattr(logging, level)
def write(self, msg):
if msg and not msg.isspace():
self.logger.log(self.level, msg)
def flush(self): pass
@task(log_stdout=True)
def run_process_pool_task():
with contextlib.redirect_stdout(OutputLogger("prefect", "INFO")):
with contextlib.redirect_stderr(OutputLogger("prefect", "ERROR")):
run_process_pool()
Aric Huang
09/01/2021, 3:46 PMimport logging, sys, contextlib
from prefect.utilities.logging import get_logger
logger = get_logger()
@task(log_stdout=True)
def run_process_pool_task:
stdout_handler = logging.StreamHandler(sys.stdout)
stderr_handler = logging.StreamHandler(sys.stderr)
stdout_handler.setLevel(<http://logging.INFO|logging.INFO>)
stderr_handler.setLevel(logging.ERROR)
logger.addHandler(stdout_handler)
logger.addHandler(stderr_handler)
run_process_pool()
logger.removeHandler(stdout_handler)
logger.removeHandler(stderr_handler)
Aric Huang
09/01/2021, 3:47 PMKevin Kho
sys.stdout
and sys.stderr
because they in a different subprocess. I think you might need to really edit the task to take in the Prefect logger because these attempts look very good.Aric Huang
09/01/2021, 4:35 PMKevin Kho
Kevin Kho
Aric Huang
09/01/2021, 4:47 PMKevin Kho
Aric Huang
09/01/2021, 4:58 PMprocess()
to a Task and mapping the input to it, instead of using ProcessPoolExecutor
?Kevin Kho
Aric Huang
09/01/2021, 5:00 PMProcessPoolExecutor
already and it would save us a lot of time if we could run it as-is. right now things are working pretty well but the biggest complaint is that the subprocess logs are missing which makes debugging harderAric Huang
09/01/2021, 6:00 PMmultiprocessing.set_start_method("spawn")
in the above flow code, subprocess logs are NOT prefixed with └── <time> | <LEVEL>
(which seems to mean they're not being picked up by the Prefect logger), but with multiprocessing.set_start_method("fork")
they are. explicitly setting to fork
doesn't change the cloud behavior unfortunately, but maybe the way subprocesses are run on cloud has something to do with why logs aren't picked up there?Zach Angell
Aric Huang
09/02/2021, 12:31 AMlog_stdout=True
in the task, which looks like it uses this, but still wasn't able to get any stdout/stderr output into the UI. An approach that ended up working for me was to use redirect_stdout/stderr
and write to a file, then reading the file and passing the contents to the Prefect logger
. For some reason directly redirecting stdout/stderr to the Prefect logger
only worked locally, but using an intermediate file I can see logs in the UI.Aric Huang
09/02/2021, 12:31 AMimport contextlib
from prefect.utilities.logging import get_logger
logger = get_logger()
@contextlib.contextmanager
def capture_logs():
stdout_file="/tmp/stdout.log"
stderr_file="/tmp/stderr.log"
stdout = open(stdout_file, 'w')
stderr = open(stderr_file, 'w')
with contextlib.redirect_stdout(stdout):
with contextlib.redirect_stderr(stderr):
yield
with open(stdout_file, 'r') as f:
contents = f.readlines()
if contents:
<http://logger.info|logger.info>(''.join(contents))
with open(stderr_file, 'r') as f:
contents = f.readlines()
if contents:
logger.error(''.join(contents))
stdout.close()
stderr.close()
Aric Huang
09/02/2021, 12:32 AM@task(log_stdout=True)
def run_process_pool_task():
with capture_logs():
run_process_pool()
Zach Angell