Dhruv Laad
12/27/2024, 6:02 AMMarvin
12/27/2024, 6:02 AMpython
from prefect import flow
import multiprocessing as mp
@flow
def my_parameterized_flow(x: int):
    return x * 2
def run_flow_with_param(param: int):
    return my_parameterized_flow(param)
if __name__ == "__main__":
    # Create a process pool
    with mp.Pool(processes=3) as pool:
        # Execute flows with different parameters in parallel
        results = pool.map(run_flow_with_param, [1, 2, 3])
    
    print(results)  # Will print [2, 4, 6]if __name__ == "__main__":Nate
12/27/2024, 6:04 AMDhruv Laad
12/27/2024, 6:06 AMDhruv Laad
12/27/2024, 6:07 AMservework poolsDhruv Laad
12/27/2024, 6:09 AMNate
12/27/2024, 6:11 AM@flow
def my_flow(special_params: dict[str, Any], (or whatever params):
if __name__ == "__main__":
  my_flow.serve()run_deployment('your-flow/your-deployment', parameters={...})run_deployment.servelimitDhruv Laad
12/27/2024, 6:18 AMyou can run many flows at the same time in this serve processcan you explain how I would go about doing this? Do i call
run_deploymentNate
12/27/2024, 7:34 AMrun_deployment.serve# served_flow.py
import time
from datetime import datetime
from typing import Any
from prefect import flow
from prefect.settings import PREFECT_RUNNER_POLL_FREQUENCY, temporary_settings
@flow
def simulate_work(special_params: dict[str, Any]):
    print(f"Doing some work with {special_params}")
    time.sleep(5)
    print(f"Done at {datetime.now()}")
if __name__ == "__main__":
    with temporary_settings(
        {PREFECT_RUNNER_POLL_FREQUENCY: 3},
    ):
        simulate_work.serve(
            name="test-deployment",
            limit=10,
        )python served_flow.pyrun_deployment# do_work.py
import asyncio
from prefect import flow
from prefect.client.schemas.objects import FlowRun
from prefect.deployments import run_deployment
@flow
async def trigger_served_flow() -> list[FlowRun]:
    return await asyncio.gather(
        *[
            run_deployment(
                "simulate-work/test-deployment",
                parameters=params,
            )
            for params in [{"special_params": {"param1": i}} for i in range(10)]
        ]
    )
if __name__ == "__main__":
    asyncio.run(trigger_served_flow())python do_work.pyDoing some work with {'param1': 3}
Done at 2024-12-27 01:30:17.513900
Doing some work with {'param1': 6}
Done at 2024-12-27 01:30:17.616203
Doing some work with {'param1': 8}
Done at 2024-12-27 01:30:17.632114
Doing some work with {'param1': 0}
Done at 2024-12-27 01:30:17.584202
Doing some work with {'param1': 1}
Done at 2024-12-27 01:30:17.609866
Doing some work with {'param1': 4}
Done at 2024-12-27 01:30:17.648423
Doing some work with {'param1': 2}
Done at 2024-12-27 01:30:17.703701Nate
12/27/2024, 7:35 AMprefect worker startDhruv Laad
12/27/2024, 8:25 AMDhruv Laad
12/27/2024, 4:35 PMglobal_limit=10.serve()prefect_server-1  | sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 20 reached, connection timed out, timeout 30.00 (Background on this error at: <https://sqlalche.me/e/20/3o7r>)import time
import asyncio
import multiprocessing as mp
from prefect import serve
from my_amazing_flows import amazing_flow
from prefect.deployments import run_deployment
async def _run_all():
    params_list = [params set as dict ...]
    runs = [
        run_deployment(
            "my_amazing_flow/my_deployment",
            parameters=params_set,
        )
        for params_set in params_list
    ]
    return await asyncio.gather(*runs)
def _serve():
    return my_amazing_flow.serve(name="my_deployment", global_limit=10)
if __name__ == "__main__":
    p = mp.Process(target=_serve)
    p.start()
    time.sleep(5)
    asyncio.run(_backfill())
    p.join()