https://prefect.io logo
c

Conor

07/20/2023, 11:42 PM
Hi, new to prefect but enjoying it so far I was using prefect’s asyncutils library to write some code and I noticed that the gather() implementation here does not allow for arguments to be passed: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils.py#L407 Is this by design, e.g. is there another way that is intended to run flows in parallel where you can submit arguments to the flows? Here is a backwards compatible implementation of gather where you can do that
Copy code
from typing import Callable, Coroutine, Any, Tuple, List

async def gather(*calls: Callable[..., Coroutine[Any, Any, Any]], args: List[Tuple] = None) -> List[Any]:
    """
    Run calls concurrently and gather their results.

    Unlike `asyncio.gather` this expects to receive _callables_ not _coroutines_.
    This matches `anyio` semantics.

    Args:
        *calls: Functions or coroutines to be run concurrently.
        args: A list of tuples, where each tuple contains the arguments for the corresponding callable in `calls`. 
            If no arguments are provided for a callable, use an empty tuple.

    Returns:
        A list containing the results of the calls.
    """
    if args is None:
        args = [()] * len(calls)  # If no arguments provided, use empty tuples

    if len(calls) != len(args):
        raise ValueError("The length of 'calls' and 'args' should be the same.")

    keys = []
    async with create_gather_task_group() as tg:
        for call, arg in zip(calls, args):
            keys.append(tg.start_soon(call, *arg))
    return [tg.get_result(key) for key in keys]
and the call might look like this:
Copy code
results = await gather(func1, func2, func3, args=[(arg1,), (arg2, arg3), (arg4, arg5, arg6)])
Is it worthwhile to submit a pr to do something like this? Or am i thinking of how to submit arguments to flows fundamentally wrong?
Here is a more robust implementation that also supports kwargs.