Jainish Savalia
12/05/2024, 11:29 AMimport asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect.concurrency.asyncio import rate_limit
from prefect.deployments import run_deployment
@task
async def process_data(data):
# async with concurrency("database", occupy=1, strict=True):
# await rate_limit("database")
await asyncio.sleep(60)
return data
@flow(
name="Child Flow",
)
async def sf(task_runs: int):
futures = process_data.map(data=[{"d": "Hello"}] * task_runs, return_state=True)
@flow(name="Root Flow")
async def my_flow(task_runs: int = 5, flow_runs: int = 2):
tasks = []
for _ in range(flow_runs):
tasks.append(
run_deployment(
"Child Flow/Child Flow",
parameters=dict(task_runs=task_runs),
as_subflow=True,
)
)
runs = await asyncio.gather(*tasks)
# if __name__ == "__main__":
# my_flow()
Now I am running 50 flows at a time and 500 task in each flow. Still I am not hitting the orchestration api rate limit, although I have hit the log rate limit.
Does anybody knows what is going on?