Nice trick to limit task concurrency without relyi...
# data-tricks-and-tips
f
Nice trick to limit task concurrency without relying on tags and the CLI/UI
Copy code
from prefect.tasks import P, R, Task
from prefect.utilities.asyncutils import sync as async_to_sync
import asyncio

def concurrency_limited_map(task: Task[P, R], iterable: Iterable[P], *, max_concurrency: int, **kwargs) -> List[R]:
    """
    Map a task over an iterable with a maximum concurrency.

    :param task: The task to map over the iterable.
    :param iterable: The iterable to map over.
    :param max_concurrency: The maximum number of tasks to run concurrently.
    :param kwargs: Additional keyword arguments to pass to the task
    """

    semaphore = asyncio.Semaphore(max_concurrency)
    futures = []

    async def submit(item):
        async with semaphore:
            future = task.submit(item, **kwargs)
            await future._wait()
            return future.result()

    async def process():
        for item in iterable:
            future = asyncio.ensure_future(submit(item))
            futures.append(future)

        results = await asyncio.gather(*futures)
        return results

    return async_to_sync(process)
You may opt to want to just return the future instead of getting the
result()
.
🙌 1
âž• 1