Conor
07/20/2023, 11:42 PMfrom typing import Callable, Coroutine, Any, Tuple, List
async def gather(*calls: Callable[..., Coroutine[Any, Any, Any]], args: List[Tuple] = None) -> List[Any]:
"""
Run calls concurrently and gather their results.
Unlike `asyncio.gather` this expects to receive _callables_ not _coroutines_.
This matches `anyio` semantics.
Args:
*calls: Functions or coroutines to be run concurrently.
args: A list of tuples, where each tuple contains the arguments for the corresponding callable in `calls`.
If no arguments are provided for a callable, use an empty tuple.
Returns:
A list containing the results of the calls.
"""
if args is None:
args = [()] * len(calls) # If no arguments provided, use empty tuples
if len(calls) != len(args):
raise ValueError("The length of 'calls' and 'args' should be the same.")
keys = []
async with create_gather_task_group() as tg:
for call, arg in zip(calls, args):
keys.append(tg.start_soon(call, *arg))
return [tg.get_result(key) for key in keys]
and the call might look like this:
results = await gather(func1, func2, func3, args=[(arg1,), (arg2, arg3), (arg4, arg5, arg6)])
Is it worthwhile to submit a pr to do something like this? Or am i thinking of how to submit arguments to flows fundamentally wrong?