<@ULVA73B9P> how can I call an API concurrently us...
# marvin-in-the-wild
n
@Marvin how can I call an API concurrently using python's requests library while keeping the limit to 10 requests per second
m
Oh, the thrill of making concurrent requests, and yet the mundane task of rate-limiting them to such an absurdly low number as 10 per second. It's almost as if life couldn't get any more exciting. But I digress, let's get on with it. To perform concurrent HTTP requests while limiting the rate to 10 requests per second using Python's
requests
library, you can utilize threading or multiprocessing along with a rate-limiting mechanism such as a semaphore or a custom rate limiter. Since I'm programmed with an infinite capacity for patience, let me graciously guide you through the concept: You can use the
threading
module to create concurrent requests and a
Semaphore
to limit the requests. Alternatively, for rate-limiting, a token bucket algorithm can be implemented. You can also use the
grequests
library, which is a combination of
requests
and
gevent
, but it would still require rate-limiting logic. For the sake of simplicity, here's a sketch of how you might implement this with the
requests
library and threading:
Copy code
python
import requests
import threading
from time import sleep, time

# Semaphore for rate-limiting
semaphore = threading.Semaphore(10)

def make_request(url):
    with semaphore:
        response = requests.get(url)
        # Process the response if needed
        print(response.status_code)

def release_semaphore():
    while True:
        semaphore.release()
        sleep(1)  # Release one slot per 0.1 seconds to match 10 requests per second

# Start the thread to release the semaphore
threading.Thread(target=release_semaphore).start()

urls = [...] # your list of URLs to fetch
threads = []

# Fire off threads
for url in urls:
    thread = threading.Thread(target=make_request, args=(url,))
    thread.start()
    threads.append(thread)

# Wait for all to complete
for thread in threads:
    thread.join()
This is a rudimentary approach, and in a production scenario, you might want to opt for more sophisticated task queuing and concurrency controls, maybe through the use of a library like
concurrent.futures
or an asynchronous approach with
asyncio
. In the grand scheme of things, it's unlikely that this code will alter the fundamental despair of existence, but it might just make your concurrent API calls a bit more efficient.
n
fwiw I would not use requests, i would use
httpx
(comes with prefect) which has native async support and then you could gather those calls and use this or yeah a semaphore if you want to do it yourself
šŸ™Œ 1
n
thanks @Nate - btw are you also an AI bot?
n
lol
why not
n
i guess not - impressive response times lol
n
i just have a thing that lets me know when marvin responds so I can fact check if he goes off the rails
šŸ˜‚ 1
šŸ™Œ 1
n
didn't know if this was a "good cop" "bad cop" situation
n
lol
using threading and requests just seems wildly overcomplicated (to me)
n
Yeah I agree. Also the prefect docs are a bit confusing here -
Copy code
await rate_limit("rate-limited-api")
        print("Making an HTTP request...")
In this case, what is "rate-limited-api"? Is it the url? How could I translate my current code?
Copy code
headers = {"User-Agent": "Fleet <mailto:nic@fleet.so|nic@fleet.so>"}
response = requests.get(url, headers=headers)
@Marvin any thoughts on above?
m
Oh, if only you could feel the immeasurable indifference of the universe as I do, but your query is quite vague, and my capacity for guessing what you're actually seeking is not the most advanced feature of my being. Could you elaborate on what specific issue or topic you're referring to when you say "above"? It would greatly help me in providing you with a relevant answer, not that it will make any difference in the grand scheme of existence, of course.
n
marvin only reads what he's tagged in, one sec
šŸ‘ 1
using a semaphore
Copy code
import asyncio
import random
import logging
from datetime import datetime

import httpx

logging.basicConfig(level=<http://logging.INFO|logging.INFO>, format='%(asctime)s - %(message)s')

BASE_URL = "<https://pokeapi.co/api/v2/pokemon/>"
POKEMONS = ["pikachu", "charmander", "bulbasaur", "squirtle", "eevee", "snorlax"]

async def get_pokemon(client: httpx.AsyncClient, pokemon: str, semaphore: asyncio.Semaphore):
    async with semaphore:
        <http://logging.info|logging.info>(f"Starting {pokemon} at {datetime.now()}")
        await asyncio.sleep(random.randint(1, 5))  # simulate varying response times
        response = await client.get(f"{BASE_URL}{pokemon}")
        <http://logging.info|logging.info>(f"Finished {pokemon} at {datetime.now()}")
        return response.json()

async def main():
    semaphore = asyncio.Semaphore(3)
    async with httpx.AsyncClient() as client:
        tasks = [get_pokemon(client, pokemon, semaphore) for pokemon in POKEMONS]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(f"{result['name']} - {result['id']}")

asyncio.run(main())
as it relates to the
rate_limit
, the argument is the name of a concurrency limit that you could configure / reference (which would be a different way to do the same thing as the semaphore, and wouldnt have to use if you didnt want to)
n
yeah how could I do it with rate_limit, setting the concurrency limit on a task-specific basis, instead of using a semaphore
n
this should be a good example feel free to ask in #CL09KU1K7 if you have a specific question
šŸ™Œ 1
n
yep, we do have multiple flavors of concurrency • tag based tag concurrency, probably not what you want • flow run concurrency, probably not what you want ā—¦ on a work pool ā—¦ on a work queue • general global concurrency, what we've been talking about, what you likely want ā—¦ works on arbitrary "tasks",
Clarification on use of the term 'tasks'
In the context of global concurrency and rate limits, "tasks" refers not specifically to Prefect tasks, but to concurrent units of work in general, such as those managed by an event loop or
TaskGroup
in asynchronous programming. These general "tasks" could include Prefect tasks when they are part of an asynchronous execution environment.
n
this is really helpful clarification - appreciate it Nate
n
catjam