Ouail Bendidi
02/03/2025, 2:11 PMrun_flow_in_subprocess
(Thanks for that btw 🙌 🙌 ), I know its not a common pattern but I have a use case where that will improve performance, here is an example:
import asyncio
import multiprocessing
from prefect import flow, task
from prefect.flow_engine import run_flow_in_subprocess
@task
async def long_running_task(sleep: int):
await asyncio.sleep(sleep)
@flow
async def my_flow(items: list[int]):
return await asyncio.gather(*[long_running_task(i) for i in items])
@flow
async def my_flow_distributed(items: list[int]):
n_procs = multiprocessing.cpu_count()
batch_size = len(items) // n_procs
procs = []
for i in range(0, len(items), batch_size):
proc = run_flow_in_subprocess(flow=my_flow, parameters={"items": items[i : i + batch_size]})
procs.append(proc)
exit_codes = [p.join() for p in procs]
if any(exit_codes):
raise ValueError()
return True
if __name__ == "__main__":
items = list(range(10000))
asyncio.run(my_flow_distributed(items))
The above is what i had in mind, not sure if there are any gotchas I'm missing.
Also when I tested running the above script, the UI was correctly picking up the subflows, but it wasn't displaying any logs or task runs (locally I could see the subflow logs on my terminal)
any help is appreciated 🙏Ouail Bendidi
02/03/2025, 2:28 PMalex
02/03/2025, 7:39 PMOuail Bendidi
02/04/2025, 11:01 AM