Geoff Irons
02/23/2025, 10:37 PM<asyncio.locks.Semaphore object at 0x7c48c3981d80 [locked, waiters:1]> is bound to a different event loop
As far as I can tell, when you do an await my_task.map() it is spinning up multiple threads when using the ConcurrentRunner and this is causing problems.
It's worth noting that the semaphore is not the only thing causing this issue. We also use async_lru
to cache some of this data to reduce how much it fetched, and it also an error with the same root cause (multiple loops).
So I guess I have a couple questions:
• Are there any suggested ways to provide shared limitations across tasks? This isn't a global "max number of tasks" but a "max number of <thing> inside task"
• Is there any good way to share/cache data between tasks?Marvin
02/23/2025, 10:37 PMasyncio.Semaphore
carefully, ensuring that all operations are bound to the same event loop. Learn more about task concurrency limits here.
2. Sharing Data and Caching Between Tasks:
Prefect provides options for task caching, which can be useful for sharing results between tasks. You can use custom cache key functions or Pydantic models for consistent serialization. This can help in reducing redundant data fetching by caching the results of expensive operations. Explore Prefect's task caching options here.
3. Handling Event Loop Issues:
The error you encountered suggests that the concurrency model in your tasks might be conflicting with Prefect's task runner. Ensure that the semaphore and any other async constructs are initialized within the same event loop context as the tasks. You might need to manage your event loops explicitly if Prefect's default behavior isn't aligning with your needs.
For further exploration, you can check the Prefect API and modules to see if there are any additional utilities that might help in your specific scenario.
If you have more specific questions or need further assistance, feel free to ask!