https://prefect.io logo
Title
a

Austin Weisgrau

02/27/2023, 9:34 PM
What's a best practice look like for setting up hundreds of concurrent REST API calls using prefect tasks? We want to use some kind of concurrency limit to avoid overwhelming the API. However, the concurrency limits in prefect make tasks wait 30 seconds before checking for an available slot. This results in a HUGE slowdown, assuming each API call takes significantly less than 30 seconds a pop (less than 1 second each is normal).
z

Zanie

02/27/2023, 9:47 PM
a

Austin Weisgrau

02/27/2023, 11:04 PM
It looks like that could work. Is there any solution that avoids using asyncio? We'd like to avoid migrating our pipelines to asyncio if at all possible. It creates a significant technical hurdle.
z

Zanie

02/28/2023, 1:35 AM
How are you going to be running them concurrently?
a

Austin Weisgrau

02/28/2023, 1:38 AM
If I wasn't in prefect, I'd use:
from concurrent.futures import ThreadPoolExecutor

values = fetch_values()
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(make_api_call, values)
in prefect I'm working with the default concurrent task runner:
@flow
def run_pipeline():
    values = fetch_values()
    make_api_call.map(values)
Switching over to use asyncio or anyio would require a substantial refactor of the whole codebase to be asyncio compatible, including needing to eliminate dependencies that use e.g. the
requests
package
z

Zanie

02/28/2023, 2:13 AM
It sounds like https://github.com/PrefectHQ/prefect/pull/7013 would be what you’re looking for
It turned out to be more difficult so we don’t have that kind of local concurrency limit yet. I’m interested in pursuing it, but there are some other concurrency related things I need to get done first.
a

Austin Weisgrau

02/28/2023, 2:25 AM
That does look like exactly what I'm interested in!
This is a bit heavy handed, but this workaround seems to work for me:
import time

from prefect import flow, task


class Semaphore:
    slots: int = 2


@task
def concurrent_task(value, semaphore):
    while True:
        if semaphore.slots:
            semaphore.slots += -1
            time.sleep(1) # Do whatever I actually want to do here (e.g. make an API call)
            semaphore.slots += 1
            return value
        else:
            time.sleep(0.1)


@flow
def my_flow():
    values = range(30)
    semaphore = Semaphore()
    concurrent_task.map(values, semaphore)


if __name__ == "__main__":
    my_flow()
w

Will Raphaelson

03/02/2023, 10:34 PM
Yeah I do think we can improve on this in the future but as a workaround im glad you found something.
z

Zanie

03/02/2023, 10:35 PM
You could use a real
threading.Semaphore
too
a

Austin Weisgrau

03/02/2023, 10:35 PM
That is probably wise...time to climb a little further down the threading rabbithole...
z

Zanie

03/02/2023, 10:38 PM
Let me know if you have questions. I’m very deep in the concurrency rabbithole these days.
🙏 1
a

Austin Weisgrau

03/02/2023, 10:38 PM
This is a little more elegant:
import time
from functools import wraps
from threading import Semaphore
from typing import Callable

from prefect import flow, task


def limit_concurrency(
    max_workers: int,
) -> Callable[[Callable], Callable]:

    semaphore = Semaphore(max_workers)

    def pseudo_decorator(func: Callable):
        @wraps(func)
        def limited_concurrent_func(*args, **kwargs):
            semaphore.acquire()
            result = func(*args, **kwargs)
            semaphore.release()
            return result

        return limited_concurrent_func

    return pseudo_decorator


@task
@limit_concurrency(max_workers=2)
def concurrent_task(value):
    print(f"starting {value}")
    time.sleep(1)
    print(f"ending {value}")


@flow
def my_flow():
    values = range(30)
    concurrent_task.map(values)


if __name__ == "__main__":
    my_flow()
Is there anything off the top of your head that might go sideways with this?
z

Zanie

03/02/2023, 10:41 PM
If your function fails you won’t release the sempahore
a

Austin Weisgrau

03/02/2023, 10:41 PM
ah word good call
z

Zanie

03/02/2023, 10:41 PM
You’ll want to use try/finally or use it as a context manager
If you submit this to a non-local task runner it won’t work
i.e. if you use Dask you’d probably need to figure out how to use a Dask semaphore instead
a

Austin Weisgrau

03/02/2023, 10:44 PM
Got it. Thanks so much! Really appreciate your feedback
z

Zanie

03/02/2023, 10:46 PM
You’re welcome 🙂
a

Austin Weisgrau

03/20/2023, 5:56 PM
I'm seeing this implementation use 100% CPU on one core for something simple like running 10 threads with this kind of function:
while True:
   response = requests.get(some_url)
   if response['success']:
       return
   else:
      sleep(30)
It's not using barely any memory but the CPU is maxing out. Any idea why that would happen?
z

Zanie

03/20/2023, 6:11 PM
Are you releasing the semaphore while sleeping?
The semaphore might be polling repeatedly trying to acquire?
a

Austin Weisgrau

03/20/2023, 6:13 PM
no, not releasing the semaphore while sleeping
z

Zanie

03/20/2023, 6:14 PM
strace
might help
I’ve seen high CPU usage when a thread is waiting on a lock though
c

Cody Frisby

03/23/2023, 4:48 PM
Hi y'all. Great thread here. I've been testing out this solution and I am noticing that it creates as many threads as there are elements in my
values
object. My expectation is that this would create as many workers and threads as specified by the
max_workers
variable. For my test case I have tens of thousands of elements I need to operate on and when using this solution Prefect creates tens of thousands of tasks before working on any of them. What am I missing? What am I not understanding about prefect? Thanks!
a

Austin Weisgrau

03/23/2023, 4:49 PM
That's correct and the normal behavior for task.map. the solution here helps with slowing down the execution of the tasks, but all the task instantiation happens up front.
z

Zanie

03/23/2023, 4:50 PM
We submit tasks concurrently using the event loop
Submission != Running
a

Austin Weisgrau

03/23/2023, 4:53 PM
To rate limit the creation of tasks, rather than using task.map, I can imagine a concurrency limited function that calls task.submit and awaits task results to release a semaphore, although I think you'd need to break prefect patterns a little more to implement this 🤔
I do worry a little about overhead with thousands of threads/tasks, even if only a few of them are running at a time. Haven't dug deep enough yet to understand how big of a problem that is.
c

Cody Frisby

03/23/2023, 4:57 PM
In my case, when I unleashed this on the tens of thousands after testing on a much smaller sample, Prefect created all the tasks and then just hung there and none of the tasks ever completed.
🙁 1
z

Zanie

03/23/2023, 4:58 PM
Yeah these are things I’m working on very actively
I think we’ll still hang in some cases, but we’re making a lot of progress in the area
🙌 3