What's a best practice look like for setting up hu...
# best-practices
a
What's a best practice look like for setting up hundreds of concurrent REST API calls using prefect tasks? We want to use some kind of concurrency limit to avoid overwhelming the API. However, the concurrency limits in prefect make tasks wait 30 seconds before checking for an available slot. This results in a HUGE slowdown, assuming each API call takes significantly less than 30 seconds a pop (less than 1 second each is normal).
z
a
It looks like that could work. Is there any solution that avoids using asyncio? We'd like to avoid migrating our pipelines to asyncio if at all possible. It creates a significant technical hurdle.
z
How are you going to be running them concurrently?
a
If I wasn't in prefect, I'd use:
Copy code
from concurrent.futures import ThreadPoolExecutor

values = fetch_values()
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(make_api_call, values)
in prefect I'm working with the default concurrent task runner:
Copy code
@flow
def run_pipeline():
    values = fetch_values()
    make_api_call.map(values)
Switching over to use asyncio or anyio would require a substantial refactor of the whole codebase to be asyncio compatible, including needing to eliminate dependencies that use e.g. the
requests
package
z
It sounds like https://github.com/PrefectHQ/prefect/pull/7013 would be what you’re looking for
It turned out to be more difficult so we don’t have that kind of local concurrency limit yet. I’m interested in pursuing it, but there are some other concurrency related things I need to get done first.
a
That does look like exactly what I'm interested in!
This is a bit heavy handed, but this workaround seems to work for me:
Copy code
import time

from prefect import flow, task


class Semaphore:
    slots: int = 2


@task
def concurrent_task(value, semaphore):
    while True:
        if semaphore.slots:
            semaphore.slots += -1
            time.sleep(1) # Do whatever I actually want to do here (e.g. make an API call)
            semaphore.slots += 1
            return value
        else:
            time.sleep(0.1)


@flow
def my_flow():
    values = range(30)
    semaphore = Semaphore()
    concurrent_task.map(values, semaphore)


if __name__ == "__main__":
    my_flow()
w
Yeah I do think we can improve on this in the future but as a workaround im glad you found something.
z
You could use a real
threading.Semaphore
too
a
That is probably wise...time to climb a little further down the threading rabbithole...
z
Let me know if you have questions. I’m very deep in the concurrency rabbithole these days.
🙏 1
a
This is a little more elegant:
Copy code
import time
from functools import wraps
from threading import Semaphore
from typing import Callable

from prefect import flow, task


def limit_concurrency(
    max_workers: int,
) -> Callable[[Callable], Callable]:

    semaphore = Semaphore(max_workers)

    def pseudo_decorator(func: Callable):
        @wraps(func)
        def limited_concurrent_func(*args, **kwargs):
            semaphore.acquire()
            result = func(*args, **kwargs)
            semaphore.release()
            return result

        return limited_concurrent_func

    return pseudo_decorator


@task
@limit_concurrency(max_workers=2)
def concurrent_task(value):
    print(f"starting {value}")
    time.sleep(1)
    print(f"ending {value}")


@flow
def my_flow():
    values = range(30)
    concurrent_task.map(values)


if __name__ == "__main__":
    my_flow()
Is there anything off the top of your head that might go sideways with this?
z
If your function fails you won’t release the sempahore
a
ah word good call
z
You’ll want to use try/finally or use it as a context manager
If you submit this to a non-local task runner it won’t work
i.e. if you use Dask you’d probably need to figure out how to use a Dask semaphore instead
a
Got it. Thanks so much! Really appreciate your feedback
z
You’re welcome 🙂
a
I'm seeing this implementation use 100% CPU on one core for something simple like running 10 threads with this kind of function:
Copy code
while True:
   response = requests.get(some_url)
   if response['success']:
       return
   else:
      sleep(30)
It's not using barely any memory but the CPU is maxing out. Any idea why that would happen?
z
Are you releasing the semaphore while sleeping?
The semaphore might be polling repeatedly trying to acquire?
a
no, not releasing the semaphore while sleeping
z
strace
might help
I’ve seen high CPU usage when a thread is waiting on a lock though
c
Hi y'all. Great thread here. I've been testing out this solution and I am noticing that it creates as many threads as there are elements in my
values
object. My expectation is that this would create as many workers and threads as specified by the
max_workers
variable. For my test case I have tens of thousands of elements I need to operate on and when using this solution Prefect creates tens of thousands of tasks before working on any of them. What am I missing? What am I not understanding about prefect? Thanks!
a
That's correct and the normal behavior for task.map. the solution here helps with slowing down the execution of the tasks, but all the task instantiation happens up front.
z
We submit tasks concurrently using the event loop
Submission != Running
a
To rate limit the creation of tasks, rather than using task.map, I can imagine a concurrency limited function that calls task.submit and awaits task results to release a semaphore, although I think you'd need to break prefect patterns a little more to implement this 🤔
I do worry a little about overhead with thousands of threads/tasks, even if only a few of them are running at a time. Haven't dug deep enough yet to understand how big of a problem that is.
c
In my case, when I unleashed this on the tens of thousands after testing on a much smaller sample, Prefect created all the tasks and then just hung there and none of the tasks ever completed.
🙁 1
z
Yeah these are things I’m working on very actively
I think we’ll still hang in some cases, but we’re making a lot of progress in the area
🙌 3