I have a reasonably large codebase in prefect that...
# ask-community
g
I have a reasonably large codebase in prefect that processes a lot of data. Currently it is all synchronous, and I am in the process of doing some performance enhancements by making it async. One thing that my flow needs to do is to fetch a lot of data. It makes upwards of several thousand requests. Instead of having a task for each they are batched together so each task handles a certain number of them. Inside this task, the concurrency of fetching data is globally limited using an asyncio.Semaphore. Unfortuantely, when mapping over the data I get an error:
<asyncio.locks.Semaphore object at 0x7c48c3981d80 [locked, waiters:1]> is bound to a different event loop
As far as I can tell, when you do an
await my_task.map()
it is spinning up multiple threads when using the ConcurrentRunner and this is causing problems. It's worth noting that the semaphore is not the only thing causing this issue. We also use
async_lru
to cache some of this data to reduce how much it fetched, and it also an error with the same root cause (multiple loops). So I guess I have a couple questions: • Are there any suggested ways to provide shared limitations across tasks? This isn't a global "max number of tasks" but a "max number of thing inside task" • Is there any good way to share/cache data between tasks?
@Marvin
y
Why do you need a asyncio.Semaphore inside each Task. Can't you just use such code inside each Task:
Copy code
import aiohttp
import asyncio

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "<https://jsonplaceholder.typicode.com/todos/1>",
        "<https://jsonplaceholder.typicode.com/todos/2>",
        "<https://jsonplaceholder.typicode.com/todos/3>"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(url, session) for url in urls]
        responses = await asyncio.gather(*tasks)
    
    for response in responses:
        print(response)
g
We are querying an external server that is quite slow, so we don't want to overload it. The structure of our code is more like:
Copy code
import httpx
import asyncio

sem = asyncio.Semaphore(5)
session = htttpx.ClientSession()

@alru_cache  # < -- to avoid duplicate fetches, but input must be hashable, so can't be a session object
async def fetch(url):
    async with sem:  # without the semaphore you can get pooltimeouts if lots of requests are made concurrently
        async with session.get(url) as response:
             return await response.body()

aggregate_sem = asyncio.Semaphore(3)
def slow_aggregate_function(data):
    # Since this is CPU/RAM bound, doing more than a handful of them creates resource contention and can cause OOM's
    async with aggregate_sem:
        return sum(data)
    

@task
async def process_data_point(point):
    nearby_points = find_nearby_points(point)
    data = await asyncio.gather(*[fetch_url(p) for p in nearby_points])
    return slow_aggregate_function(data)

@task
async def process_batched_points(points):
    return asyncio.gather(*[process_data_point(p) for p in points])

@flow
async main():
    data_points = [...]  # really really long
    batched = iterutils.batched(data_points, 50)
    process_data_points.map(batched)
So here the semaphore gates how many connections can occur across all tasks, and how many aggregates can occur at the same time. Why both asyncio.gather() and a task map? The threading of the task map is useful as there are other CPu bound operations that it is nice to have prefect parallelize. The asyncio.gather is good because it means we don't need to create a bajillion prefect tasks (and can pre-fetch data)
I will admit I hadn't thought to pass a session object into the task, but it feels a bit strange to me to pass something that isn't data into a task. I'll have a play with that
y
Also maybe you can use a counter on an external db (postgres, mongo) to be the limiter of how many requests are currently being made (instead of that Semaphore)