Miguel Silva
02/07/2024, 12:05 PMDominic Tarro
02/07/2024, 9:04 PMCan I create and schedule subflows dynamically from one main flow?Yes. If you want to concurrently run a subflow within the running flow, you can use asynchronous flows. If you want to schedule a deployment, check out
prefect.deployments.deployments.run_deployment
.
To add concurrency control, I recommend using a semaphore and anyio
. Something like
class Controller:
def __init__(self, size: int):
self.semaphore = anyio.Semaphore(size)
async def run(self, flow, *args):
async with self.semaphore:
await flow(*args, **kwargs)
@flow
async child_flow():
...
@flow
async parent_flow():
controller = Controller(3)
async with anyio.create_task_group() as tg:
tg.start_soon(controller.run, child_flow)
Dominic Tarro
02/07/2024, 9:05 PMTask.map
and breaks it up into batches.Miguel Silva
02/11/2024, 12:54 PM