Frank Colson
05/07/2024, 4:50 AMfrom prefect.tasks import P, R, Task
from prefect.utilities.asyncutils import sync as async_to_sync
import asyncio
def concurrency_limited_map(task: Task[P, R], iterable: Iterable[P], *, max_concurrency: int, **kwargs) -> List[R]:
"""
Map a task over an iterable with a maximum concurrency.
:param task: The task to map over the iterable.
:param iterable: The iterable to map over.
:param max_concurrency: The maximum number of tasks to run concurrently.
:param kwargs: Additional keyword arguments to pass to the task
"""
semaphore = asyncio.Semaphore(max_concurrency)
futures = []
async def submit(item):
async with semaphore:
future = task.submit(item, **kwargs)
await future._wait()
return future.result()
async def process():
for item in iterable:
future = asyncio.ensure_future(submit(item))
futures.append(future)
results = await asyncio.gather(*futures)
return results
return async_to_sync(process)
You may opt to want to just return the future instead of getting the result()
.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by