Eric
04/03/2024, 12:33 AMMarvin
04/03/2024, 12:34 AMNate
04/03/2024, 12:44 AMEric
04/03/2024, 12:44 AMNate
04/03/2024, 12:46 AMEric
04/03/2024, 12:46 AMEric
04/03/2024, 12:47 AMNate
04/03/2024, 12:59 AMimport asyncio
from prefect import flow
from prefect.concurrency.asyncio import concurrency
CUSTOMER_LIMIT_BY_ID: dict[str, int] = {
"customer_1": 1,
}
async def do_customer_work(customer_id: str) -> None:
print(f"Working on customer {customer_id}")
await asyncio.sleep(5)
@flow
async def customer_flow(customer_id: str) -> None:
async with concurrency(customer_id, occupy=CUSTOMER_LIMIT_BY_ID[customer_id]):
await do_customer_work(customer_id)
@flow
async def delegating_flow(customer_ids: list[str]) -> None:
await asyncio.gather(*[customer_flow(customer_id) for customer_id in customer_ids])
if __name__ == "__main__":
asyncio.run(delegating_flow(["customer_1"] * 3))
where in this case I just went into the UI to activate my global concurrency limit called customer_1
, but you can use this to do it programmaticallyEric
04/03/2024, 1:01 AMNate
04/03/2024, 1:02 AMEric
04/03/2024, 1:04 AMNate
04/03/2024, 1:04 AM