Prefect 2.0 Hello Everyone, Is there a way I can ...
# prefect-community
f
Prefect 2.0 Hello Everyone, Is there a way I can use multiprocessing.pool in prefect? sample code is in the thread, it works if I run it without prefect. how can I integrate it in prefect? thanks in advance.
listA = ['1', '2', '3']
@flow def main(): def funcA(): print("A") def funB(): print("b") if name == '__main__': pool = multiprocessing.Pool() pool.map(funcA, listA) pool.close() pool.join() DeploymentSpec( name="docker-example-01", flow=main, flow_runner=DockerFlowRunner( #image = 'prefect-orion:beta6', image = 'prefect-orion-main_prefect-server', image_pull_policy = 'IF_NOT_PRESENT', volumes=["C/Users/data/data"], networks = ['prefect-server'], env = { "USE_SSL": False, "AWS_ACCESS_KEY_ID": "abc", "AWS_SECRET_ACCESS_KEY": "abc", "ENDPOINT_URL": 'http://minio:9000', } ), )
a
Check out task runners in the docs - there are many ways of achieving parallelism and concurrency in your flows and task runners is the recommended way, but you can also use Sequential Task Runner and move this multiprocessing code into the flow
❤️ 1
f
@Anna Geller I have used sequentional task runner in my code but it's not running the multiprocessing function. i.e. its not printing the hello message. code below;
listA = ['1', '2', '3']
@flow(task_runner=SequentialTaskRunner()) def main(): def funcA(): print("A") def funB(): print("b") if name == '__main__': logger = get_run_logger() logger.info(f"Hello") pool = multiprocessing.Pool() pool.map(funcA, listA) pool.close() pool.join() DeploymentSpec( name="docker-example-01", flow=main, flow_runner=DockerFlowRunner( #image = 'prefect-orion:beta6', image = 'prefect-orion-main_prefect-server', image_pull_policy = 'IF_NOT_PRESENT', volumes=["C/Users/data/data"], networks = ['prefect-server'], env = { "USE_SSL": False, "AWS_ACCESS_KEY_ID": "abc", "AWS_SECRET_ACCESS_KEY": "abc", "ENDPOINT_URL": 'http://minio:9000', } ), )
a
I don't understand your code and in the form you shared it it wouldn't work, with or without Prefect. Can you share a complete example?
even if I refactor this to be:
Copy code
import multiprocessing

from prefect import flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
from prefect.flow_runners import DockerFlowRunner


listA = ["1", "2", "3"]


def funcA():
    print("A")


def funB():
    print("b")


@flow(task_runner=SequentialTaskRunner())
def main():
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Hello")
    pool = multiprocessing.Pool()
    pool.map(funcA, listA)
    pool.close()
    pool.join()


if __name__ == "__main__":
    main()
your multiprocessing code doesn't work:
Copy code
File "/Users/anna/.conda/envs/prefect-2-0/lib/python3.10/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/anna/.conda/envs/prefect-2-0/lib/python3.10/multiprocessing/pool.py", line 771, in get
    raise self._value
TypeError: funcA() takes 0 positional arguments but 1 was given