Hi, I'm seeing some strange behavior when using `c...
# ask-community
a
Hi, I'm seeing some strange behavior when using
concurrent.futures.ProcessPoolExecutor
in a Prefect task. With the following code as
test.py
, if run using
python3 test.py
it completes successfully, but with
prefect run --path test.py
it stays stuck at 0% completion.
Copy code
from prefect import task, Flow
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed

def process(inp):
    return inp

@task
def test():
    print("start")
    with ProcessPoolExecutor(
        max_workers=4,
    ) as pool:
        futures = [pool.submit(process, inp) for inp in range(1000)]
        out = [f.result() for f in tqdm(as_completed(futures), total=1000)]
    print('done')

with Flow("Test") as flow:
    test()

if __name__ == "__main__":
    test.run()
Any ideas what could be happening here?
I modified the code to only use one input to the
process
function, and now Prefect runs return this error:
Copy code
Retrieving local flow... Done
Running flow locally...
└── 22:30:20 | INFO    | Beginning Flow run for 'Test'
└── 22:30:20 | INFO    | Task 'test': Starting task run...
start
  0%|                                                                                                                                                | 0/1 [00:00<?, ?it/s]└── 22:30:20 | ERROR   | Task 'test': Exception encountered during task execution!
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function process at 0x7fc59d7f88b0>: import of module '<flow>' failed
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "test_deadlock.py", line 16, in test
    out = [f.result() for f in tqdm(as_completed(futures), total=1)]
  File "test_deadlock.py", line 16, in <listcomp>
    out = [f.result() for f in tqdm(as_completed(futures), total=1)]
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function process at 0x7fc59d7f88b0>: import of module '<flow>' failed
└── 22:30:20 | INFO    | Task 'test': Finished task run for task with final state: 'Failed'
└── 22:30:20 | INFO    | Flow run FAILED: some reference tasks failed.
running with
python3 test.py
still works fine 🤔
k
Hey @Aric Huang, I think this is what you’re running into. I suspect that using the CLI takes a different code path wherein it loads the flow from a file, and then executes it. This probably makes it a nested object causing the issue. Whereas using
python test.py
leaves it as a top level object. Is there a reason you don’t want to use the LocalDaskExecutor? Could you move the traceback to the thread when you get the chance to keep the main channel a bit neater? (I think the code block can remain)
a
Sure, pasting the traceback here: If I ctrl-c the Prefect flow run, it shows these messages in the stack trace:
Copy code
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
k
Thank you!
a
Got it, that makes sense. We have some existing code that is using ProcessPoolExecutor that is intermittently hanging when running in Prefect flows and I'm investigating why - was trying to replicate the issue in a minimal example and ran into this
the existing code that's hanging looks like a different issue, since it only happens sometimes. in that code the function used for multiprocessing is imported from another module so i think that wouldn't have the same issue
k
Ah I see. I don’t know if the import would change anything. First time I’ve seen this myself
a
i tried making a separate pip package and updating
test
to look like this, and it works with
prefect run
:
Copy code
@task
def test():
    import mptest
    print("start")
    with ProcessPoolExecutor(
        max_workers=4,
    ) as pool:
        inp = 1
        futures = [pool.submit(mptest.noop, inp)]
        out = [f.result() for f in tqdm(as_completed(futures), total=1)]
    print('done')
this is using the package at this repo:
Copy code
python -m pip install git+<https://github.com/concreted/mptest.git>
k
I see the difference, but i’m not understanding why it worked 😅. Could you explain?
a
i'm not entirely sure myself but i think it could be that it's getting hung up trying to serialize something in the original flow file?
the error was
Can't pickle <function process at 0x7fc59d7f88b0>: import of module '<flow>' failed
which makes me think it was trying to serialize a flow object
k
Ah I see