What would be the best way to distribute dynamical...
# prefect-getting-started
m
What would be the best way to distribute dynamically API requests across a day into multiple subflows? Can I create and schedule subflows dynamically from one main flow? My idea was to have something like: mainflow(numberapicalls, chunksize) -> fanout[subflows(chunk, schedule). Perhaps I don't need a mainflow, just a basic python function. But still how do I create flows dynamically?
d
Can I create and schedule subflows dynamically from one main flow?
Yes. If you want to concurrently run a subflow within the running flow, you can use asynchronous flows. If you want to schedule a deployment, check out
prefect.deployments.deployments.run_deployment
. To add concurrency control, I recommend using a semaphore and
anyio
. Something like
Copy code
class Controller:
    def __init__(self, size: int):
        self.semaphore = anyio.Semaphore(size)

    async def run(self, flow, *args):
        async with self.semaphore:
            await flow(*args, **kwargs)

@flow
async child_flow():
    ...

@flow
async parent_flow():
    controller = Controller(3)
    async with anyio.create_task_group() as tg:
        tg.start_soon(controller.run, child_flow)
If the API calls can be run in tasks, I have a utility library with a class that takes
Task.map
and breaks it up into batches.
m
That's perfect! Thank you so much!
👍 1