Geoff Irons
02/23/2025, 10:29 PM<asyncio.locks.Semaphore object at 0x7c48c3981d80 [locked, waiters:1]> is bound to a different event loop
As far as I can tell, when you do an await my_task.map()
it is spinning up multiple threads when using the ConcurrentRunner and this is causing problems.
It's worth noting that the semaphore is not the only thing causing this issue. We also use async_lru
to cache some of this data to reduce how much it fetched, and it also an error with the same root cause (multiple loops).
So I guess I have a couple questions:
• Are there any suggested ways to provide shared limitations across tasks? This isn't a global "max number of tasks" but a "max number of thing inside task"
• Is there any good way to share/cache data between tasks?Geoff Irons
02/23/2025, 10:32 PMYaron Levi
02/24/2025, 9:37 AMYaron Levi
02/24/2025, 9:37 AMimport aiohttp
import asyncio
async def fetch(url, session):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"<https://jsonplaceholder.typicode.com/todos/1>",
"<https://jsonplaceholder.typicode.com/todos/2>",
"<https://jsonplaceholder.typicode.com/todos/3>"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(url, session) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
print(response)
Geoff Irons
02/24/2025, 8:43 PMimport httpx
import asyncio
sem = asyncio.Semaphore(5)
session = htttpx.ClientSession()
@alru_cache # < -- to avoid duplicate fetches, but input must be hashable, so can't be a session object
async def fetch(url):
async with sem: # without the semaphore you can get pooltimeouts if lots of requests are made concurrently
async with session.get(url) as response:
return await response.body()
aggregate_sem = asyncio.Semaphore(3)
def slow_aggregate_function(data):
# Since this is CPU/RAM bound, doing more than a handful of them creates resource contention and can cause OOM's
async with aggregate_sem:
return sum(data)
@task
async def process_data_point(point):
nearby_points = find_nearby_points(point)
data = await asyncio.gather(*[fetch_url(p) for p in nearby_points])
return slow_aggregate_function(data)
@task
async def process_batched_points(points):
return asyncio.gather(*[process_data_point(p) for p in points])
@flow
async main():
data_points = [...] # really really long
batched = iterutils.batched(data_points, 50)
process_data_points.map(batched)
So here the semaphore gates how many connections can occur across all tasks, and how many aggregates can occur at the same time.
Why both asyncio.gather() and a task map? The threading of the task map is useful as there are other CPu bound operations that it is nice to have prefect parallelize. The asyncio.gather is good because it means we don't need to create a bajillion prefect tasks (and can pre-fetch data)Geoff Irons
02/24/2025, 8:45 PMYaron Levi
02/24/2025, 9:31 PM