Frank Colson
05/07/2024, 4:50 AMfrom prefect.tasks import P, R, Task
from prefect.utilities.asyncutils import sync as async_to_sync
import asyncio
def concurrency_limited_map(task: Task[P, R], iterable: Iterable[P], *, max_concurrency: int, **kwargs) -> List[R]:
"""
Map a task over an iterable with a maximum concurrency.
:param task: The task to map over the iterable.
:param iterable: The iterable to map over.
:param max_concurrency: The maximum number of tasks to run concurrently.
:param kwargs: Additional keyword arguments to pass to the task
"""
semaphore = asyncio.Semaphore(max_concurrency)
futures = []
async def submit(item):
async with semaphore:
future = task.submit(item, **kwargs)
await future._wait()
return future.result()
async def process():
for item in iterable:
future = asyncio.ensure_future(submit(item))
futures.append(future)
results = await asyncio.gather(*futures)
return results
return async_to_sync(process)
You may opt to want to just return the future instead of getting the result()
.