Albert Puig
01/13/2025, 11:13 PMmigrate_cluster_batch
) which ran single migrations (migrate_single_cluster
) with a reasonable concurrency, all thanks to prefect and simple async. This is limited in that I am basically CPU bound once I have enough concurrency.
The next step consists in parallelizing the previous flow using prefect_dask
; in theory this would allow me to fully parallelize and use all the CPUs in our server. However, this is not really working. I do migrate_cluster_batch.submit
and keep a few tasks in the queue using a window. Everything is submitted, but there is a mess with the event loops. I think dask creates a new event loop for each async task, but the DB async pool is kept by process? It is created such as
@lru_cache()
def get_migration_engine():
"""Create or get cached engine instance for the current process."""
return sqlalchemy.ext.asyncio.create_async_engine(
settings.migration_db_url,
isolation_level="REPEATABLE READ",
pool_recycle=settings.migration_db_pool_recycle,
pool_pre_ping=True,
pool_size=settings.migration_db_pool_connections,
)
(with the cache there to avoid recreating the engine every time)
The issue is that I am hitting errors such as
worker | RuntimeError: Task <Task pending name='Task-405' coro=<run_task_async() running at /usr/local/lib/python3.11/site-packages/prefect/task_engine.py:1425> cb=[_run_until_complete_cb() at /usr/local/lib/python3.11/asyncio/base_events.py:181]> got Future <Future pending> attached to a different loop
I'm at my wits end how to approach this issue. Any ideas? I'm sure this is a reasonably common case. Thanks!!Nate
01/13/2025, 11:53 PMAlbert Puig
01/14/2025, 12:44 PM