Imran Nooraddin
09/02/2024, 12:08 PM@task
def add(x: int, y: int) -> int:
return x + y
@task
def multiply(x: int, y: int) -> int:
return x * y
@flow
def add_and_multiply(x:int, y:int):
sum = add(x, y)
product = multiply(x, y)
return sum, product
If the rate limit for add
is 5/s and the rate limit for multiply
is 200/s, how should I modify the code to accomplish this without having to define it in all my flows using these functions?Alexander Azzam
09/02/2024, 1:29 PMImran Nooraddin
09/02/2024, 1:43 PM@task # Rate limit 60/min
async def scrape_website(url: str) -> Dict:
return {"html": "data"}
@task # Rate limit 20/min
async def analyze_website(html: str) -> Dict:
return {"analysis": "data"}
@flow
async def fetch_data_from_website(url: str):
scraped_data = scrape_website.submit(url).result()
analyze_website.submit(scraped_data['html'])
async def main(urls):
# Lets say that this pipe gets called 200/min
tasks = [await fetch_data_from_website(url) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
urls = ["<https://google.com>", "<https://facebook.com>"] * 100
asyncio.run(main(urls))
I want to run these all concurrently, but I want it to wait for open slots if the rate limit has exceeded (ie. it waits 1s per scrape_website
request and like 0.3s per analyze_website
request).
As far as I can tell, I cannot define this as python code and instead have to rely on yaml. which means I need to set global rates, which doesnt really make sense to meNate
09/02/2024, 9:10 PM