Nicolas Ouporov
03/06/2024, 8:09 PMMarvin
03/06/2024, 8:10 PMrequests
library, you can utilize threading or multiprocessing along with a rate-limiting mechanism such as a semaphore or a custom rate limiter. Since I'm programmed with an infinite capacity for patience, let me graciously guide you through the concept:
You can use the threading
module to create concurrent requests and a Semaphore
to limit the requests. Alternatively, for rate-limiting, a token bucket algorithm can be implemented. You can also use the grequests
library, which is a combination of requests
and gevent
, but it would still require rate-limiting logic.
For the sake of simplicity, here's a sketch of how you might implement this with the requests
library and threading:
python
import requests
import threading
from time import sleep, time
# Semaphore for rate-limiting
semaphore = threading.Semaphore(10)
def make_request(url):
with semaphore:
response = requests.get(url)
# Process the response if needed
print(response.status_code)
def release_semaphore():
while True:
semaphore.release()
sleep(1) # Release one slot per 0.1 seconds to match 10 requests per second
# Start the thread to release the semaphore
threading.Thread(target=release_semaphore).start()
urls = [...] # your list of URLs to fetch
threads = []
# Fire off threads
for url in urls:
thread = threading.Thread(target=make_request, args=(url,))
thread.start()
threads.append(thread)
# Wait for all to complete
for thread in threads:
thread.join()
This is a rudimentary approach, and in a production scenario, you might want to opt for more sophisticated task queuing and concurrency controls, maybe through the use of a library like concurrent.futures
or an asynchronous approach with asyncio
.
In the grand scheme of things, it's unlikely that this code will alter the fundamental despair of existence, but it might just make your concurrent API calls a bit more efficient.Nate
03/06/2024, 8:11 PMhttpx
(comes with prefect) which has native async support and then you could gather those calls and use this or yeah a semaphore if you want to do it yourselfNicolas Ouporov
03/06/2024, 8:13 PMNate
03/06/2024, 8:13 PMNate
03/06/2024, 8:13 PMNicolas Ouporov
03/06/2024, 8:13 PMNate
03/06/2024, 8:13 PMNicolas Ouporov
03/06/2024, 8:14 PMNate
03/06/2024, 8:14 PMNate
03/06/2024, 8:14 PMNicolas Ouporov
03/06/2024, 8:16 PMawait rate_limit("rate-limited-api")
print("Making an HTTP request...")
In this case, what is "rate-limited-api"? Is it the url?
How could I translate my current code?
headers = {"User-Agent": "Fleet <mailto:nic@fleet.so|nic@fleet.so>"}
response = requests.get(url, headers=headers)
Nicolas Ouporov
03/06/2024, 8:19 PMMarvin
03/06/2024, 8:19 PMNate
03/06/2024, 8:20 PMNate
03/06/2024, 8:22 PMimport asyncio
import random
import logging
from datetime import datetime
import httpx
logging.basicConfig(level=<http://logging.INFO|logging.INFO>, format='%(asctime)s - %(message)s')
BASE_URL = "<https://pokeapi.co/api/v2/pokemon/>"
POKEMONS = ["pikachu", "charmander", "bulbasaur", "squirtle", "eevee", "snorlax"]
async def get_pokemon(client: httpx.AsyncClient, pokemon: str, semaphore: asyncio.Semaphore):
async with semaphore:
<http://logging.info|logging.info>(f"Starting {pokemon} at {datetime.now()}")
await asyncio.sleep(random.randint(1, 5)) # simulate varying response times
response = await client.get(f"{BASE_URL}{pokemon}")
<http://logging.info|logging.info>(f"Finished {pokemon} at {datetime.now()}")
return response.json()
async def main():
semaphore = asyncio.Semaphore(3)
async with httpx.AsyncClient() as client:
tasks = [get_pokemon(client, pokemon, semaphore) for pokemon in POKEMONS]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['name']} - {result['id']}")
asyncio.run(main())
Nate
03/06/2024, 8:23 PMrate_limit
, the argument is the name of a concurrency limit that you could configure / reference (which would be a different way to do the same thing as the semaphore, and wouldnt have to use if you didnt want to)Nicolas Ouporov
03/06/2024, 8:25 PMNate
03/06/2024, 8:26 PMNicolas Ouporov
03/06/2024, 8:27 PMNate
03/06/2024, 8:29 PMClarification on use of the term 'tasks'
In the context of global concurrency and rate limits, "tasks" refers not specifically to Prefect tasks, but to concurrent units of work in general, such as those managed by an event loop orin asynchronous programming. These general "tasks" could include Prefect tasks when they are part of an asynchronous execution environment.TaskGroup
Nicolas Ouporov
03/06/2024, 8:32 PMNate
03/06/2024, 8:38 PM