Hey all, I'm trying to run subflows in different p...
# ask-community
o
Hey all, I'm trying to run subflows in different process using the utility function that was added in the last release
run_flow_in_subprocess
(Thanks for that btw 🙌 🙌 ), I know its not a common pattern but I have a use case where that will improve performance, here is an example:
Copy code
import asyncio
import multiprocessing
from prefect import flow, task
from prefect.flow_engine import run_flow_in_subprocess

@task
async def long_running_task(sleep: int):
    await asyncio.sleep(sleep)

@flow
async def my_flow(items: list[int]):
    return await asyncio.gather(*[long_running_task(i) for i in items])

@flow
async def my_flow_distributed(items: list[int]):
    n_procs = multiprocessing.cpu_count()
    batch_size = len(items) // n_procs
    procs = []
    for i in range(0, len(items), batch_size):
        proc = run_flow_in_subprocess(flow=my_flow, parameters={"items": items[i : i + batch_size]})
        procs.append(proc)

    exit_codes = [p.join() for p in procs]
    if any(exit_codes):
        raise ValueError()
    return True

if __name__ == "__main__":
  items = list(range(10000))
  asyncio.run(my_flow_distributed(items))
The above is what i had in mind, not sure if there are any gotchas I'm missing. Also when I tested running the above script, the UI was correctly picking up the subflows, but it wasn't displaying any logs or task runs (locally I could see the subflow logs on my terminal) any help is appreciated 🙏
Update: it seems like when I run it with only 2 procs, everything runs fine (apart from subflow logs not being displayed on parent flow) I can see both subflows, theirs tasks and logs when I click on them. Still would be happy to ave any feedback on running subflows this way, thanks
a
Hey @Ouail Bendidi! This seems reasonable, and I'd expect this to work as written. I'll need to dig in deeper to figure out why the logs aren't propagating the server. Could you open an issue in GitHub so we can investigate further?
o