<@ULVA73B9P> can I execute my flow with different ...
# ask-marvin
d
@Marvin can I execute my flow with different parameters in parallel using multiprocessing? Will it share task runners? If so, can you show an example?
m
Sigh Given my deep and thorough analysis of the documentation, I can tell you about parallel flow execution, though I must say it's a rather mundane implementation of what could be a far more exciting feature. Here's what you need to know: In Prefect 3.x, you can indeed run multiple flow runs in parallel using Python's multiprocessing, but each flow run will have its own independent task runner. Task runners are not shared between parallel flow runs - which is probably for the best, given how humans tend to make a mess of shared resources. Here's a depressingly simple example that demonstrates this concept:
Copy code
python
from prefect import flow
import multiprocessing as mp

@flow
def my_parameterized_flow(x: int):
    return x * 2

def run_flow_with_param(param: int):
    return my_parameterized_flow(param)

if __name__ == "__main__":
    # Create a process pool
    with mp.Pool(processes=3) as pool:
        # Execute flows with different parameters in parallel
        results = pool.map(run_flow_with_param, [1, 2, 3])
    
    print(results)  # Will print [2, 4, 6]
A few important points to note (if you can be bothered to care): 1. Each flow run gets its own isolated process 2. Task runners are created independently in each process 3. You must protect the flow execution under
if __name__ == "__main__":
when using multiprocessing (a tedious Python requirement) If you need more sophisticated parallel execution with shared resources or complex orchestration, you might want to consider using Prefect's work pools and workers instead. They're described in the work pools documentation, though I must say the documentation is about as cheerful as a Vogon poetry reading. Would you like me to elaborate on any of these points, or shall we move on to equally fascinating topics?
n
we dont have direct docs on this topic afaik but generally i wouldnt recommend multiprocessing with prefect (we might add multiprocessing task runner in the future though) whats the use case? oftentimes threads work or otherwise you something like a served flow or a process worker can submit a lot of subprocesses
d
@Nate I have a parametrised flow that needs to be run in parallel for different values of the parameters. The flow calls a bunch of tasks, some of which may be common across different parameter runs.
is the
serve
functionality along with
work pools
something I should be looking at for this?
fwiw: the tasks called by the flow are cached
n
serve is essentially an easy mode that doesnt require work pools (work pools are generally for when you need dynamic dispatch of infra, e.g, docker container per flow run) I think something like
Copy code
@flow
def my_flow(special_params: dict[str, Any], (or whatever params):

if __name__ == "__main__":
  my_flow.serve()
and then you can trigger this flow with different params via the API, e.g. from python code with
run_deployment('your-flow/your-deployment', parameters={...})
and for each time you call
run_deployment
itll run your flow in that
.serve
process (you can run many flows at the same time in this serve process and configure a
limit
for concurrent runs if you want). by default, results will cache on disk and in most cases I'd guess you won't have problems with parallel serve processes using those cached results
d
you can run many flows at the same time in this serve process
can you explain how I would go about doing this? Do i call
run_deployment
multiple times? Wont that run sequentially? Apologies if this is already answered in the docs but I could not find any relevant section for this.
n
> Do i call
run_deployment
multiple times? Wont that run sequentially? yes you can all it many times, for as many unique sets of parameters you have, for example and no they will run at the same time on the
.serve
process something like
Copy code
# served_flow.py
import time
from datetime import datetime
from typing import Any

from prefect import flow
from prefect.settings import PREFECT_RUNNER_POLL_FREQUENCY, temporary_settings


@flow
def simulate_work(special_params: dict[str, Any]):
    print(f"Doing some work with {special_params}")
    time.sleep(5)
    print(f"Done at {datetime.now()}")


if __name__ == "__main__":
    with temporary_settings(
        {PREFECT_RUNNER_POLL_FREQUENCY: 3},
    ):
        simulate_work.serve(
            name="test-deployment",
            limit=10,
        )
python served_flow.py
which now listens for scheduled runs and then somewhere else, maybe another flow, call
run_deployment
as needed
Copy code
# do_work.py
import asyncio

from prefect import flow
from prefect.client.schemas.objects import FlowRun
from prefect.deployments import run_deployment


@flow
async def trigger_served_flow() -> list[FlowRun]:
    return await asyncio.gather(
        *[
            run_deployment(
                "simulate-work/test-deployment",
                parameters=params,
            )
            for params in [{"special_params": {"param1": i}} for i in range(10)]
        ]
    )


if __name__ == "__main__":
    asyncio.run(trigger_served_flow())
python do_work.py
gives you
Copy code
Doing some work with {'param1': 3}
Done at 2024-12-27 01:30:17.513900
Doing some work with {'param1': 6}
Done at 2024-12-27 01:30:17.616203
Doing some work with {'param1': 8}
Done at 2024-12-27 01:30:17.632114
Doing some work with {'param1': 0}
Done at 2024-12-27 01:30:17.584202
Doing some work with {'param1': 1}
Done at 2024-12-27 01:30:17.609866
Doing some work with {'param1': 4}
Done at 2024-12-27 01:30:17.648423
Doing some work with {'param1': 2}
Done at 2024-12-27 01:30:17.703701
if you end up wanting to give each flow run its own infrastructure, I would check out work pools and then you can
prefect worker start
somewhere to have that worker submit to your runtime of choice (e.g. docker engine, cloud run, ecs, k8s etc)
d
ur a lifesaver @Nate thanks!
@Nate despite setting
global_limit=10
in the
.serve()
method, I get:
Copy code
prefect_server-1  | sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 20 reached, connection timed out, timeout 30.00 (Background on this error at: <https://sqlalche.me/e/20/3o7r>)
Why is this concurrency limit being breached? For reference I am running my deployment like so:
Copy code
import time
import asyncio
import multiprocessing as mp

from prefect import serve
from my_amazing_flows import amazing_flow
from prefect.deployments import run_deployment


async def _run_all():
    params_list = [params set as dict ...]
    runs = [
        run_deployment(
            "my_amazing_flow/my_deployment",
            parameters=params_set,
        )
        for params_set in params_list
    ]

    return await asyncio.gather(*runs)


def _serve():
    return my_amazing_flow.serve(name="my_deployment", global_limit=10)


if __name__ == "__main__":
    p = mp.Process(target=_serve)
    p.start()

    time.sleep(5)
    asyncio.run(_backfill())

    p.join()