https://prefect.io logo
Title
t

Thomas Fredriksen

11/11/2022, 9:39 AM
Hi there. I have a task that where I want to limit concurrency as it is quite memory intensive. I have been playing around with task concurrency limits, but so far I have not been able to limit my task concurrency. Before I go down the rabbit hole, will something like this work?
@asynccontextmanager
async def concurrency_limit(task: Task, limit: int = 10) -> Task:

    context = get_run_context()

    tag = f"{context.flow_run.id}-{task.name}"

    client = get_client()

    await client.create_concurrency_limit(tag, limit)
    decorated_task = task.with_options(tags=[tag])

    try:
        yield decorated_task
    finally:
        await client.delete_concurrency_limit_by_tag(tag)
Example flow:
@task
async def hello_number(nmbr: int):
    log = get_run_logger()
    <http://log.info|log.info>("Hello, my number is: %d", nmbr)
    await asyncio.sleep(1)


@flow(name="example_flow_concurrency_limit")
async def main():

    r = list(range(20))

    async with concurrency_limit(hello_number, 1) as tsk:
        await tsk.map(r)
k

Kalise Richmond

11/11/2022, 4:51 PM
Hey @Thomas Fredriksen have you tried setting the concurrency limit through the CLI that uses tags on tasks?https://docs.prefect.io/concepts/tasks/#task-run-concurrency-limits
t

Thomas Fredriksen

11/14/2022, 10:56 AM
@Kalise Richmond Setting a tag manually seems to work somewhat, however the tasks run quite slowly. The task in question should take roughly a second to run (has a 1-second sleep), but appear to take 10 or 20 seconds. I also noticed that removing the line
await client.delete_concurrency_limit_by_tag(tag)
from the code above produces the same result.
@Kalise Richmond I have been troubleshooting this a bit, and found that waiting for the task to complete is the most stable way to ensure the concurrency limit is not deleted prematurely. Since the task tag is unique, simply reading the state of tags matching the tag gives us all tasks which the concurrency limit applies to:
async def wait_for_concurrency_limit(
    client: OrionClient,
    tag: str,
    check_interval: float = 0.5,
    timeout: float = 0.0
):

    start_time = time.time()

    while True:
        if 0 < timeout < (time.time() - start_time):
            LOG.warning("Concurrency wait timeout")
            break

        tasks = await client.read_task_runs(task_run_filter=TaskRunFilter(tags={"all_": [tag]}))
        active_tasks = [x for x in tasks if x.state.type not in TERMINAL_STATES]

        if len(active_tasks) > 0:
            await anyio.sleep(check_interval)
Dynamic concurrency limit context:
@asynccontextmanager
async def concurrency_limit(task: Task, limit: int = 10) -> Task:

    context = get_run_context()

    tag = f"{context.flow_run.id}-{task.name}"

    client = get_client()

    await client.create_concurrency_limit(tag, limit)
    decorated_task: Task = task.with_options(tags=[tag])

    try:
        yield decorated_task
        await wait_for_concurrency_limit(client, tag)
    finally:
        await client.delete_concurrency_limit_by_tag(tag)
There appear to be a 30 second delay between when one task finishing and freeing up a slot in the concurrency limit, and the next task starting
👀 1
m

Mason Menges

11/15/2022, 7:56 PM
Hey @Thomas Fredriksen We've raised this with our engineering team, they're currently working on improvements to the engine specifically around performance with large numbers of concurrent tasks, etc. One thing to note we wouldn't normally recommend creating the concurrency limits dynamically as this can cause issues if a task attempts to reference a Concurrency limit that no longer exists, It sounds like you were seeing that already to some extent. TLDR: We're working on improvements to the Orion Engine that should address this 😄
t

Thomas Fredriksen

11/15/2022, 7:57 PM
awesome, looking forward to seeing what you come up with 🙂
@Mason Menges Did the engineering team get a chance to look at the concurrency limit issue raised above? I see version 2.7.0 has some concurrency limiting stuff for agents
m

Mason Menges

12/06/2022, 4:19 PM
Still a work in progress, The Engine improvements are a pretty significant refactor so they'll take some time, and also won't be covered in one release it's more of a concentrated effort to optimize different aspects of the engine for flow/task runs at scale, this includes optimize concurrent runs. I don't necessarily have any specific timelines since a lot of this work is likely going to span multiple releases though I know the core parts of it we'd ideally like to have within the next month or 2.
TLDR: Keep an eye on the releases as we'll be announcing these changes as we push them through in the releases. That said If you'd like you can open a separate issues specifically for this use case with a reproducible example of the behavior you're seeing. Though in this case I would say this should be covered by the improvements we're making overall so that would be more for tracking We have several we're tracking around this already, like this issue https://github.com/PrefectHQ/prefect/issues/6891, https://github.com/PrefectHQ/prefect/issues/6961
t

Thomas Fredriksen

12/06/2022, 4:29 PM
Awesome! Thank you so much for your feedback. I will keep an eye on the releases