<@ULVA73B9P> I have a reasonably large codebase in...
# ask-marvin
g
@Marvin I have a reasonably large codebase in prefect that processes a lot of data. Currently it is all synchronous, and I am in the process of doing some performance enhancements by making it async. One thing that my flow needs to do is to fetch a lot of data. It makes upwards of several thousand requests. Instead of having a task for each they are batched together so each task handles a certain number of them. Inside this task, the concurrency of fetching data is globally limited using an asyncio.Semaphore. Unfortuantely, when mapping over the data I get an error:
<asyncio.locks.Semaphore object at 0x7c48c3981d80 [locked, waiters:1]> is bound to a different event loop
As far as I can tell, when you do an await my_task.map() it is spinning up multiple threads when using the ConcurrentRunner and this is causing problems. It's worth noting that the semaphore is not the only thing causing this issue. We also use
async_lru
to cache some of this data to reduce how much it fetched, and it also an error with the same root cause (multiple loops). So I guess I have a couple questions: • Are there any suggested ways to provide shared limitations across tasks? This isn't a global "max number of tasks" but a "max number of <thing> inside task" • Is there any good way to share/cache data between tasks?
m
Here are some insights and suggestions for dealing with your concurrency and caching challenges in Prefect: 1. Concurrency Limits Across Tasks: Prefect allows you to limit task concurrency using tags. You can set concurrency limits for tasks with specific tags, which helps in managing the number of concurrent task runs. However, this is more about limiting the number of tasks rather than internal operations within a task. For your use case, where you need to limit operations inside a task, you might need to implement a custom solution using
asyncio.Semaphore
carefully, ensuring that all operations are bound to the same event loop. Learn more about task concurrency limits here. 2. Sharing Data and Caching Between Tasks: Prefect provides options for task caching, which can be useful for sharing results between tasks. You can use custom cache key functions or Pydantic models for consistent serialization. This can help in reducing redundant data fetching by caching the results of expensive operations. Explore Prefect's task caching options here. 3. Handling Event Loop Issues: The error you encountered suggests that the concurrency model in your tasks might be conflicting with Prefect's task runner. Ensure that the semaphore and any other async constructs are initialized within the same event loop context as the tasks. You might need to manage your event loops explicitly if Prefect's default behavior isn't aligning with your needs. For further exploration, you can check the Prefect API and modules to see if there are any additional utilities that might help in your specific scenario. If you have more specific questions or need further assistance, feel free to ask!