Aric Huang
01/05/2022, 10:28 PMconcurrent.futures.ProcessPoolExecutor
in a Prefect task. With the following code as test.py
, if run using python3 test.py
it completes successfully, but with prefect run --path test.py
it stays stuck at 0% completion.
from prefect import task, Flow
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
def process(inp):
return inp
@task
def test():
print("start")
with ProcessPoolExecutor(
max_workers=4,
) as pool:
futures = [pool.submit(process, inp) for inp in range(1000)]
out = [f.result() for f in tqdm(as_completed(futures), total=1000)]
print('done')
with Flow("Test") as flow:
test()
if __name__ == "__main__":
test.run()
Any ideas what could be happening here?Aric Huang
01/05/2022, 10:31 PMprocess
function, and now Prefect runs return this error:
Retrieving local flow... Done
Running flow locally...
└── 22:30:20 | INFO | Beginning Flow run for 'Test'
└── 22:30:20 | INFO | Task 'test': Starting task run...
start
0%| | 0/1 [00:00<?, ?it/s]└── 22:30:20 | ERROR | Task 'test': Exception encountered during task execution!
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function process at 0x7fc59d7f88b0>: import of module '<flow>' failed
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/pyenv/versions/3.8.12/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/opt/pyenv/versions/3.8.12/lib/python3.8/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "test_deadlock.py", line 16, in test
out = [f.result() for f in tqdm(as_completed(futures), total=1)]
File "test_deadlock.py", line 16, in <listcomp>
out = [f.result() for f in tqdm(as_completed(futures), total=1)]
File "/opt/pyenv/versions/3.8.12/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/opt/pyenv/versions/3.8.12/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function process at 0x7fc59d7f88b0>: import of module '<flow>' failed
└── 22:30:20 | INFO | Task 'test': Finished task run for task with final state: 'Failed'
└── 22:30:20 | INFO | Flow run FAILED: some reference tasks failed.
Aric Huang
01/05/2022, 10:32 PMpython3 test.py
still works fine 🤔Kevin Kho
python test.py
leaves it as a top level object. Is there a reason you don’t want to use the LocalDaskExecutor?
Could you move the traceback to the thread when you get the chance to keep the main channel a bit neater? (I think the code block can remain)Aric Huang
01/05/2022, 10:39 PMFile "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/pyenv/versions/3.8.12/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/opt/pyenv/versions/3.8.12/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
Kevin Kho
Aric Huang
01/05/2022, 10:42 PMAric Huang
01/05/2022, 10:43 PMKevin Kho
Aric Huang
01/05/2022, 11:00 PMtest
to look like this, and it works with prefect run
:
@task
def test():
import mptest
print("start")
with ProcessPoolExecutor(
max_workers=4,
) as pool:
inp = 1
futures = [pool.submit(mptest.noop, inp)]
out = [f.result() for f in tqdm(as_completed(futures), total=1)]
print('done')
Aric Huang
01/05/2022, 11:00 PMpython -m pip install git+<https://github.com/concreted/mptest.git>
Kevin Kho
Aric Huang
01/05/2022, 11:03 PMAric Huang
01/05/2022, 11:04 PMCan't pickle <function process at 0x7fc59d7f88b0>: import of module '<flow>' failed
which makes me think it was trying to serialize a flow objectKevin Kho