https://prefect.io logo
a

Alex Shtuchkin

09/18/2023, 7:15 PM
Hi team! How can we limit task concurrency when using .map() with 1000+ items? Especially when running local flow runs (with remote server), it's currenly not a great experience. Prefect starts all of these tasks at the same time, which clutters both logs and web ui. I've read about the new global concurrency limits and the concurrency context manager, but 1) afaik it only limits concurrency within the tasks, not at the .map level (we see all the tasks started, waiting for their slot, pretty bad in the web ui), 2) the concurrency is global and not per-flow-run, which doesn't make sense when running flows locally (and arguably remotely too for a lot of cases). How do you deal with this in your projects?
My ideal interface to this would be
my_task.map(..., concurrency=10)
, which would make sure to start new tasks only if less than 10 are running in the current flow run. Not sure how possible that is.
Note, I've also tried the tag-based concurrency limits, but it has it's own set of downsides: 1) similar to above, concurrency is global and not per-flow-run, 2) unexpectedly we need to edit the concurrency level via web ui or cli, 3) not sure why exactly that happens, but it fetches a bunch of tasks, processes them, then waits until the end of a 30-second period, then fetches another bunch. this slows down the processing immensely.
Here's the flow I'm using to test this:
Copy code
import numpy as np
from prefect import flow, task
from prefect.concurrency.sync import concurrency


@task(tags=["test-concurrency"])  # either use tag-based concurrency limit
def test_task(i: int) -> float:
    with concurrency("test-concurrency"):  # or context-manager based
        arr = np.random.random((1000, 1000))
        return np.matmul(arr, arr).sum()  # Example of task work, takes ~1 second on my machine.


@flow
def test_flow():
    return test_task.map(range(1000))


if __name__ == "__main__":
    test_flow()
I'm deliberating writing a utility function that would do the limiting and use a regular .submit, but it seems like this functionality could be valuable to a lot of prefect users and would be nice to have it built-in.