Dhruv Laad
12/27/2024, 6:02 AMMarvin
12/27/2024, 6:02 AMpython
from prefect import flow
import multiprocessing as mp
@flow
def my_parameterized_flow(x: int):
return x * 2
def run_flow_with_param(param: int):
return my_parameterized_flow(param)
if __name__ == "__main__":
# Create a process pool
with mp.Pool(processes=3) as pool:
# Execute flows with different parameters in parallel
results = pool.map(run_flow_with_param, [1, 2, 3])
print(results) # Will print [2, 4, 6]
A few important points to note (if you can be bothered to care):
1. Each flow run gets its own isolated process
2. Task runners are created independently in each process
3. You must protect the flow execution under if __name__ == "__main__":
when using multiprocessing (a tedious Python requirement)
If you need more sophisticated parallel execution with shared resources or complex orchestration, you might want to consider using Prefect's work pools and workers instead. They're described in the work pools documentation, though I must say the documentation is about as cheerful as a Vogon poetry reading.
Would you like me to elaborate on any of these points, or shall we move on to equally fascinating topics?Nate
12/27/2024, 6:04 AMDhruv Laad
12/27/2024, 6:06 AMDhruv Laad
12/27/2024, 6:07 AMserve
functionality along with work pools
something I should be looking at for this?Dhruv Laad
12/27/2024, 6:09 AMNate
12/27/2024, 6:11 AM@flow
def my_flow(special_params: dict[str, Any], (or whatever params):
if __name__ == "__main__":
my_flow.serve()
and then you can trigger this flow with different params via the API, e.g. from python code with run_deployment('your-flow/your-deployment', parameters={...})
and for each time you call run_deployment
itll run your flow in that .serve
process (you can run many flows at the same time in this serve process and configure a limit
for concurrent runs if you want). by default, results will cache on disk and in most cases I'd guess you won't have problems with parallel serve processes using those cached resultsDhruv Laad
12/27/2024, 6:18 AMyou can run many flows at the same time in this serve processcan you explain how I would go about doing this? Do i call
run_deployment
multiple times? Wont that run sequentially? Apologies if this is already answered in the docs but I could not find any relevant section for this.Nate
12/27/2024, 7:34 AMrun_deployment
multiple times? Wont that run sequentially?
yes you can all it many times, for as many unique sets of parameters you have, for example
and no they will run at the same time on the .serve
process
something like
# served_flow.py
import time
from datetime import datetime
from typing import Any
from prefect import flow
from prefect.settings import PREFECT_RUNNER_POLL_FREQUENCY, temporary_settings
@flow
def simulate_work(special_params: dict[str, Any]):
print(f"Doing some work with {special_params}")
time.sleep(5)
print(f"Done at {datetime.now()}")
if __name__ == "__main__":
with temporary_settings(
{PREFECT_RUNNER_POLL_FREQUENCY: 3},
):
simulate_work.serve(
name="test-deployment",
limit=10,
)
python served_flow.py
which now listens for scheduled runs
and then somewhere else, maybe another flow, call run_deployment
as needed
# do_work.py
import asyncio
from prefect import flow
from prefect.client.schemas.objects import FlowRun
from prefect.deployments import run_deployment
@flow
async def trigger_served_flow() -> list[FlowRun]:
return await asyncio.gather(
*[
run_deployment(
"simulate-work/test-deployment",
parameters=params,
)
for params in [{"special_params": {"param1": i}} for i in range(10)]
]
)
if __name__ == "__main__":
asyncio.run(trigger_served_flow())
python do_work.py
gives you
Doing some work with {'param1': 3}
Done at 2024-12-27 01:30:17.513900
Doing some work with {'param1': 6}
Done at 2024-12-27 01:30:17.616203
Doing some work with {'param1': 8}
Done at 2024-12-27 01:30:17.632114
Doing some work with {'param1': 0}
Done at 2024-12-27 01:30:17.584202
Doing some work with {'param1': 1}
Done at 2024-12-27 01:30:17.609866
Doing some work with {'param1': 4}
Done at 2024-12-27 01:30:17.648423
Doing some work with {'param1': 2}
Done at 2024-12-27 01:30:17.703701
Nate
12/27/2024, 7:35 AMprefect worker start
somewhere to have that worker submit to your runtime of choice (e.g. docker engine, cloud run, ecs, k8s etc)Dhruv Laad
12/27/2024, 8:25 AMDhruv Laad
12/27/2024, 4:35 PMglobal_limit=10
in the .serve()
method, I get:
prefect_server-1 | sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 20 reached, connection timed out, timeout 30.00 (Background on this error at: <https://sqlalche.me/e/20/3o7r>)
Why is this concurrency limit being breached?
For reference I am running my deployment like so:
import time
import asyncio
import multiprocessing as mp
from prefect import serve
from my_amazing_flows import amazing_flow
from prefect.deployments import run_deployment
async def _run_all():
params_list = [params set as dict ...]
runs = [
run_deployment(
"my_amazing_flow/my_deployment",
parameters=params_set,
)
for params_set in params_list
]
return await asyncio.gather(*runs)
def _serve():
return my_amazing_flow.serve(name="my_deployment", global_limit=10)
if __name__ == "__main__":
p = mp.Process(target=_serve)
p.start()
time.sleep(5)
asyncio.run(_backfill())
p.join()