https://prefect.io logo
Title
f

Faheem Khan

07/02/2022, 9:50 AM
Prefect 2.0 Hello Everyone, Is there a way I can use multiprocessing.pool in prefect? sample code is in the thread, it works if I run it without prefect. how can I integrate it in prefect? thanks in advance.
listA = ['1', '2', '3']
@flow def main(): def funcA(): print("A") def funB(): print("b") if name == '__main__': pool = multiprocessing.Pool() pool.map(funcA, listA) pool.close() pool.join() DeploymentSpec( name="docker-example-01", flow=main, flow_runner=DockerFlowRunner( #image = 'prefect-orion:beta6', image = 'prefect-orion-main_prefect-server', image_pull_policy = 'IF_NOT_PRESENT', volumes=["C😕Users/data:/data"], networks = ['prefect-server'], env = { "USE_SSL": False, "AWS_ACCESS_KEY_ID": "abc", "AWS_SECRET_ACCESS_KEY": "abc", "ENDPOINT_URL": 'http://minio:9000', } ), )
a

Anna Geller

07/02/2022, 10:33 AM
Check out task runners in the docs - there are many ways of achieving parallelism and concurrency in your flows and task runners is the recommended way, but you can also use Sequential Task Runner and move this multiprocessing code into the flow
❤️ 1
f

Faheem Khan

07/03/2022, 1:30 AM
@Anna Geller I have used sequentional task runner in my code but it's not running the multiprocessing function. i.e. its not printing the hello message. code below;
listA = ['1', '2', '3']
@flow(task_runner=SequentialTaskRunner()) def main(): def funcA(): print("A") def funB(): print("b") if name == '__main__': logger = get_run_logger() logger.info(f"Hello") pool = multiprocessing.Pool() pool.map(funcA, listA) pool.close() pool.join() DeploymentSpec( name="docker-example-01", flow=main, flow_runner=DockerFlowRunner( #image = 'prefect-orion:beta6', image = 'prefect-orion-main_prefect-server', image_pull_policy = 'IF_NOT_PRESENT', volumes=["C😕Users/data:/data"], networks = ['prefect-server'], env = { "USE_SSL": False, "AWS_ACCESS_KEY_ID": "abc", "AWS_SECRET_ACCESS_KEY": "abc", "ENDPOINT_URL": 'http://minio:9000', } ), )
a

Anna Geller

07/04/2022, 1:21 PM
I don't understand your code and in the form you shared it it wouldn't work, with or without Prefect. Can you share a complete example?
even if I refactor this to be:
import multiprocessing

from prefect import flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
from prefect.flow_runners import DockerFlowRunner


listA = ["1", "2", "3"]


def funcA():
    print("A")


def funB():
    print("b")


@flow(task_runner=SequentialTaskRunner())
def main():
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Hello")
    pool = multiprocessing.Pool()
    pool.map(funcA, listA)
    pool.close()
    pool.join()


if __name__ == "__main__":
    main()
your multiprocessing code doesn't work:
File "/Users/anna/.conda/envs/prefect-2-0/lib/python3.10/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/anna/.conda/envs/prefect-2-0/lib/python3.10/multiprocessing/pool.py", line 771, in get
    raise self._value
TypeError: funcA() takes 0 positional arguments but 1 was given