```@task(log_stdout=True) # ... with Flow(..., exe...
# ask-community
j
Copy code
@task(log_stdout=True)
# ...
with Flow(..., executor=LocalDaskExecutor(scheduler="processes", num_workers=2)) as flow:
I see stdout when using the threads scheduler, but not with processes ... prefect version is 0.14.19 ... running flow locally in Jupyter at this point, no agent ... I should also mention that using the threads scheduler causes subsequent runs of the notebook cell to do nothing, but it is no surprise to me that threads would be less stable than subprocesses whose end state is cleaned up by the OS
k
Hey @John Muehlhausen, I think processes in Jupyter are just tricky in general. This should work though when you run on with a backend with an agent
upvote 1
j
@Kevin Kho I've never had any problem with subprocess module in Jupyter. This smells more to me like the system just not collecting stdout from the subprocesses for reporting, even when log_stdout=True ... but I'll report back once I try it on the agent. In any case, I need a model where local flows in Jupyter work "the same" (hopefully including parallelism) since that is where all preliminary testing will take place. I can't very well tell my flow writers that it will work live but not in testing?
k
I’ll give this a shot now
🙏 1
j
Same behavior in jupyter with 0.15.7, just thought I'd upgrade and check
the threads scheduler option seems more reliable in 0.15.7 though .. but I do want processes mode with stdout reported
k
So this is what Prefect is doing under the hood, and there’s really no stdout (threads uses a ThreadPoolExecutor as opposed to this pool)
Copy code
from dask.multiprocessing import get_context
from dask import delayed
import dask

context = get_context()
pool = context.Pool(4)

@delayed
def func():
    print("test")
    return 1

a = func()
b = func()
futures = [a,b]

dask.compute(futures, scheduler="processes", pool=pool)[0]
But if you do
Copy code
a.compute()
you will see the stdout. I’ll dig a bit to into that class to see if it’s fixable
It’s dask itself:
Copy code
dask.compute(futures, scheduler="processes")[0]
gives stdout but
Copy code
dask.compute(futures, scheduler="threads")[0]
does. Will dig a bit more
I can’t find anything and it doesn’t seem like anything is exposed on the dask side so I think if you want to pursue this, it might have to be an issue in the Dask repo
compute
function source here
j
So it won't work in the Agent either, right?
k
Oh this does on the agent. That’s just jupyter specfically
j
Local agent, I mean
How would it work on a local agent?
note: LocalDaskExecutor
I'm using this executor whether in jupyter or on the local agent
From what I've seen, dask doesn't support capture of stdout/stderr, period, in any context ... so if it works I'd be highly curious why that same method can't be used in the notebook
k
You can just run this as a Python script and you’ll see the stdout.
Copy code
from dask.multiprocessing import get_context
from dask import delayed
import dask

if __name__ == "__main__":
    context = get_context()
    pool = context.Pool(4)

    @delayed
    def func():
        print("test")
        return 1

    a = func()
    b = func()
    futures = [a,b]
    dask.compute(futures, scheduler="processes", pool=pool)[0]
I know we’re talking about the LocalDaskExecutor. So the DaskExecutor uses distrbuted, while the LocalDaskExecutor just uses dask. distributed is the one that doesn’t quite capture stdout because Dask itself doesn’t shuffle worker logs to the driver so logs there don’t make it back by default.
j
oh I see, the stdout/stderr of the subprocesses are attached to the terminal but not to the jupyter cell output which all must come from the parent process. Got it. This may help me to devise a workaround.
k
A workaround is beyond me, but I’d love to learn it if you find out more for sure.
j
Well I'm wondering about allocating a terminal and attaching the jupyter kernel to that (hopefully with subprocesses) and then collect the output to the output cell... we'll see
🤔 1