What would be the best way to distribute dynamically API requests across a day into multiple subflows? Can I create and schedule subflows dynamically from one main flow?
My idea was to have something like: mainflow(numberapicalls, chunksize) -> fanout[subflows(chunk, schedule). Perhaps I don't need a mainflow, just a basic python function. But still how do I create flows dynamically?
d
Dominic Tarro
02/07/2024, 9:04 PM
Can I create and schedule subflows dynamically from one main flow?
Yes. If you want to concurrently run a subflow within the running flow, you can use asynchronous flows. If you want to schedule a deployment, check out
prefect.deployments.deployments.run_deployment
.
To add concurrency control, I recommend using a semaphore and
anyio
. Something like
Copy code
class Controller:
def __init__(self, size: int):
self.semaphore = anyio.Semaphore(size)
async def run(self, flow, *args):
async with self.semaphore:
await flow(*args, **kwargs)
@flow
async child_flow():
...
@flow
async parent_flow():
controller = Controller(3)
async with anyio.create_task_group() as tg:
tg.start_soon(controller.run, child_flow)
Dominic Tarro
02/07/2024, 9:05 PM
If the API calls can be run in tasks, I have a utility library with a class that takes
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.