Derek
07/31/2024, 10:39 PMNate
08/01/2024, 2:38 AMDerek
08/01/2024, 2:45 AMDerek
08/01/2024, 5:24 PMNate
08/01/2024, 5:28 PMoccupy: int
when you use the context manager, yeahDerek
08/01/2024, 5:30 PMNate
08/01/2024, 6:08 PMimport asyncio
from prefect import flow, task, unmapped
from prefect.concurrency.asyncio import concurrency
active_flow_runs = 0
GCL_NAME = "dynamic-limit"
TOTAL_SLOTS = 8
@task
async def process_data(item: int, ctx_kwargs: dict) -> int:
async with concurrency(**ctx_kwargs):
await asyncio.sleep(0.1) # Simulate some processing time
return item * 2
@flow(log_prints=True)
async def dynamic_concurrency_flow(flow_run_id: str, data: list):
global active_flow_runs
active_flow_runs += 1
max_concurrent = max(1, TOTAL_SLOTS // active_flow_runs)
print(
f"Flow run {flow_run_id}: Active runs: {active_flow_runs}, Max concurrent: {max_concurrent}"
)
results = process_data.map(
data, ctx_kwargs=unmapped({"names": [GCL_NAME], "occupy": max_concurrent})
).result()
active_flow_runs -= 1
return results
async def main():
print("Starting first job")
await dynamic_concurrency_flow("job1", list(range(20)))
print("\nStarting second job")
await dynamic_concurrency_flow("job2", list(range(20, 40)))
if __name__ == "__main__":
asyncio.run(main())
Nate
08/01/2024, 6:09 PMclient.update_global_concurrency_limit
to do this automatically if you're dynamically creating limits themselves)
• dynamically determine based on whatever how many slots from that limit we should occupy
Nate
08/01/2024, 6:10 PMDerek
08/01/2024, 6:22 PMNate
08/01/2024, 6:24 PMlimit
as some safe guard / large number that no user of the context manager should ever be able to occupy
more than, so i think occupy
is typically what id recommend tweaking based on runtime logic, but im sure there are cases where limit
would be appropriate to tweak as well
if you find yourself in such a case, something like this might be useful
from contextlib import asynccontextmanager
from prefect import flow, get_client, task
from prefect.client.schemas.actions import GlobalConcurrencyLimitUpdate
@asynccontextmanager
async def update_limits(name: str, limit: int):
async with get_client() as client:
await client.update_global_concurrency_limit(
name=name,
concurrency_limit=GlobalConcurrencyLimitUpdate(
limit=limit,
active=True,
),
)
try:
yield
finally:
await client.update_global_concurrency_limit(
name=name,
concurrency_limit=GlobalConcurrencyLimitUpdate(
limit=limit,
active=False,
),
)
Derek
08/01/2024, 6:28 PM