Is there a way to dynamically adjust the concurren...
# ask-community
d
Is there a way to dynamically adjust the concurrency limit? Say we start with 8, and we later want to change this to 4 based on the number of tasks running
n
hi @Derek what kind of concurrency are you using, the context manager?
d
Open to using anything in Prefect if it’s possible!
d
Does this provide dynamic control though, we’re trying to use it for fair scheduling where the number of jobs determine how many tasks each flow can have so that the ray cluster doesn’t get completely overwhelmed by any job in particular
n
you can pass in
occupy: int
when you use the context manager, yeah
d
Hm im not clear yet how that could solve our case. For instance, we have a deployment with 8cpus. At first, we may just have one job, so it’s able to create 8 ray tasks at a time. Then, when another comes in, each job should only be able to create 4 ray tasks at a time. That way we’re still saturating the cluster but we’re doing fair scheduling
n
does this help?
Copy code
import asyncio

from prefect import flow, task, unmapped
from prefect.concurrency.asyncio import concurrency

active_flow_runs = 0
GCL_NAME = "dynamic-limit"
TOTAL_SLOTS = 8


@task
async def process_data(item: int, ctx_kwargs: dict) -> int:
    async with concurrency(**ctx_kwargs):
        await asyncio.sleep(0.1)  # Simulate some processing time
        return item * 2


@flow(log_prints=True)
async def dynamic_concurrency_flow(flow_run_id: str, data: list):
    global active_flow_runs
    active_flow_runs += 1

    max_concurrent = max(1, TOTAL_SLOTS // active_flow_runs)

    print(
        f"Flow run {flow_run_id}: Active runs: {active_flow_runs}, Max concurrent: {max_concurrent}"
    )

    results = process_data.map(
        data, ctx_kwargs=unmapped({"names": [GCL_NAME], "occupy": max_concurrent})
    ).result()

    active_flow_runs -= 1
    return results


async def main():
    print("Starting first job")
    await dynamic_concurrency_flow("job1", list(range(20)))

    print("\nStarting second job")
    await dynamic_concurrency_flow("job2", list(range(20, 40)))


if __name__ == "__main__":
    asyncio.run(main())
• created a gcl with a limit of 8 (arbitrarily) and activated it in the UI (you could also use
client.update_global_concurrency_limit
to do this automatically if you're dynamically creating limits themselves) • dynamically determine based on whatever how many slots from that limit we should
occupy
this allows me to submit N tasks and we have a rolling window of 8 slots being filled at a time, as the work completes
d
Got it, thanks for the example Nate! Would it be reasonable to adjust the limits of the GCL, or is this considered anti pattern?
n
i think thats fair game. i think of
limit
as some safe guard / large number that no user of the context manager should ever be able to
occupy
more than, so i think
occupy
is typically what id recommend tweaking based on runtime logic, but im sure there are cases where
limit
would be appropriate to tweak as well if you find yourself in such a case, something like this might be useful
Copy code
from contextlib import asynccontextmanager

from prefect import flow, get_client, task
from prefect.client.schemas.actions import GlobalConcurrencyLimitUpdate

@asynccontextmanager
async def update_limits(name: str, limit: int):
    async with get_client() as client:
        await client.update_global_concurrency_limit(
            name=name,
            concurrency_limit=GlobalConcurrencyLimitUpdate(
                limit=limit,
                active=True,
            ),
        )
        try:
            yield
        finally:
            await client.update_global_concurrency_limit(
                name=name,
                concurrency_limit=GlobalConcurrencyLimitUpdate(
                    limit=limit,
                    active=False,
                ),
            )
d
Great thanks for the information!