Hi, I'm trying to run some flows that use `Process...
# ask-community
a
Hi, I'm trying to run some flows that use
ProcessPoolExecutor
and am having trouble getting print statements and stack traces from subprocesses to show up in the Prefect Cloud UI. When running the flow locally the logs show up in stdout/stderr, and having the task defined with
@task(log_stdout)
makes print statements in subprocesses show up with a
└── <time> | INFO
prefix (which I thought means it's getting picked up by the Prefect logger). However the subprocess print statements don't show up at all in the Prefect Cloud UI logs. Stack traces from subprocesses that crash also print locally but don't show up in the UI, which only shows a stack trace for the main process with
concurrent.futures.process.BrokenProcessPool
. Is there a way to have a flow capture all subprocess output so it's visible in the UI?
Here is a simple example - my flow basically imports and wraps
run_process_pool
in a task. The prints in
initialize
and
process
are what aren't showing up in the UI
Copy code
from concurrent.futures import ProcessPoolExecutor, as_completed

def initialize():
    print("Initializing")
    pass

def process(input):
    print(f"Processing: {input}")
    return input

def run_process_pool():
    inputs = [1,2,3,4]
    with ProcessPoolExecutor(
        max_workers=4, initializer=initialize
    ) as pool:
        futures = [pool.submit(process, input) for input in inputs]
        out = [f.result() for f in as_completed(futures)]
    return out
I'm looking for a way to do this at the flow level without altering the multiprocessing code e.g. ^^, since we have a bunch of existing code that we want to run in Prefect
k
Honestly, the honestly way I’ve seen this done is passing the Prefect logger directly into the that
process
function. You can also maybe try adding an explicit Handler to the Prefect logger for stderr or stdout? I think
@task(log_stdout)
might do this though. Stuff printing locally is not indication it will print on Cloud, especially if it’s in a different subprocess.
a
@Kevin Kho I see - I tried using
contextlib
to redirect stdout/stderr to call the Prefect logger directly and again that seemed to work locally (I can see the subprocess logs/stack traces prefixed with
└── <time> | INFO
or
└── <time> | ERROR
) but don't see those logs in the Cloud UI. Here is an example:
Copy code
from prefect import task
import contextlib
import logging
from prefect.utilities.logging import get_logger
logger = get_logger()

class OutputLogger:
    def __init__(self, name="root", level="INFO"):
        self.logger = logger
        self.name = self.logger.name
        self.level = getattr(logging, level)

    def write(self, msg):
        if msg and not msg.isspace():
            self.logger.log(self.level, msg)

    def flush(self): pass

@task(log_stdout=True)
def run_process_pool_task():
    with contextlib.redirect_stdout(OutputLogger("prefect", "INFO")):
        with contextlib.redirect_stderr(OutputLogger("prefect", "ERROR")):
            run_process_pool()
I also tried attaching handlers like this, this also looked good locally but didn't send logs to cloud UI:
Copy code
import logging, sys, contextlib
from prefect.utilities.logging import get_logger
logger = get_logger()

@task(log_stdout=True)
def run_process_pool_task:
    stdout_handler = logging.StreamHandler(sys.stdout)
    stderr_handler = logging.StreamHandler(sys.stderr)
    stdout_handler.setLevel(<http://logging.INFO|logging.INFO>)
    stderr_handler.setLevel(logging.ERROR)
    logger.addHandler(stdout_handler)
    logger.addHandler(stderr_handler)
    run_process_pool()
    logger.removeHandler(stdout_handler)
    logger.removeHandler(stderr_handler)
@Kevin Kho ^^ is that how you would add an explicit handler for stdout/stderr?
k
Yes that is what I had in mind. I think what this indicates is that the logger will not see the
sys.stdout
and
sys.stderr
because they in a different subprocess. I think you might need to really edit the task to take in the Prefect logger because these attempts look very good.
a
Got it, I was hoping that wouldn't be necessary but will give it a try. I am really curious why the subprocess logs seem to get picked up by the Prefect logger locally 🤔 If you find some way of capturing the subprocess logs without having to pass in the logger I'd love to know
k
I can ask the team about this just to double check
Is there any reason you don’t want to use the LocalDaskExecutor?
a
We've only used LocalExecutor so far but no reason we couldn't use LocalDaskExecutor
k
LocalDaskExecutor with a map would spin up a local processing pool and give you the Prefect logging I think?
a
Oh do you mean by changing
process()
to a Task and mapping the input to it, instead of using
ProcessPoolExecutor
?
k
Yes exactly, then let LocalDaskExecutor deal with it.
a
the main reason is that we have a lot of code that is written to use
ProcessPoolExecutor
already and it would save us a lot of time if we could run it as-is. right now things are working pretty well but the biggest complaint is that the subprocess logs are missing which makes debugging harder
something interesting I found is that if i do
multiprocessing.set_start_method("spawn")
in the above flow code, subprocess logs are NOT prefixed with
└── <time> | <LEVEL>
(which seems to mean they're not being picked up by the Prefect logger), but with
multiprocessing.set_start_method("fork")
they are. explicitly setting to
fork
doesn't change the cloud behavior unfortunately, but maybe the way subprocesses are run on cloud has something to do with why logs aren't picked up there?
z
Hey @Aric Huang - you may be able to create a context manager for logging to std out like we do in our TaskRunner here, although the task is already being run in this context so I'm not sure it will work https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L852
a
@Zach Angell Thanks, I did try using
log_stdout=True
in the task, which looks like it uses this, but still wasn't able to get any stdout/stderr output into the UI. An approach that ended up working for me was to use
redirect_stdout/stderr
and write to a file, then reading the file and passing the contents to the Prefect
logger
. For some reason directly redirecting stdout/stderr to the Prefect
logger
only worked locally, but using an intermediate file I can see logs in the UI.
basically i created a context manager like this:
Copy code
import contextlib
from prefect.utilities.logging import get_logger
logger = get_logger()

@contextlib.contextmanager
def capture_logs():
    stdout_file="/tmp/stdout.log"
    stderr_file="/tmp/stderr.log"
    stdout = open(stdout_file, 'w')
    stderr = open(stderr_file, 'w')

    with contextlib.redirect_stdout(stdout):
        with contextlib.redirect_stderr(stderr):
            yield
    with open(stdout_file, 'r') as f:
        contents = f.readlines()
        if contents:
            <http://logger.info|logger.info>(''.join(contents))
    with open(stderr_file, 'r') as f:
        contents = f.readlines()
        if contents:
            logger.error(''.join(contents))
    stdout.close()
    stderr.close()
and my task looks like:
Copy code
@task(log_stdout=True)
def run_process_pool_task():
    with capture_logs():
        run_process_pool()
z
Nicely done! A few stackoverflow posts I read through mentioned something similar