Hi all, I am trying to build something very compli...
# ask-community
a
Hi all, I am trying to build something very complicated with prefect and I am totally failing. Let me give the context: I am trying to run a data migration that relies on tons of API calls, and it needs to run very fast. Importantly, it uses postgres via SQL Alchemy, also async. Until now, what I had was a simple flow that ran an async task (
migrate_cluster_batch
) which ran single migrations (
migrate_single_cluster
) with a reasonable concurrency, all thanks to prefect and simple async. This is limited in that I am basically CPU bound once I have enough concurrency. The next step consists in parallelizing the previous flow using
prefect_dask
; in theory this would allow me to fully parallelize and use all the CPUs in our server. However, this is not really working. I do
migrate_cluster_batch.submit
and keep a few tasks in the queue using a window. Everything is submitted, but there is a mess with the event loops. I think dask creates a new event loop for each async task, but the DB async pool is kept by process? It is created such as
Copy code
@lru_cache()
def get_migration_engine():
    """Create or get cached engine instance for the current process."""
    return sqlalchemy.ext.asyncio.create_async_engine(
        settings.migration_db_url,
        isolation_level="REPEATABLE READ",
        pool_recycle=settings.migration_db_pool_recycle,
        pool_pre_ping=True,
        pool_size=settings.migration_db_pool_connections,
    )
(with the cache there to avoid recreating the engine every time) The issue is that I am hitting errors such as
Copy code
worker        | RuntimeError: Task <Task pending name='Task-405' coro=<run_task_async() running at /usr/local/lib/python3.11/site-packages/prefect/task_engine.py:1425> cb=[_run_until_complete_cb() at /usr/local/lib/python3.11/asyncio/base_events.py:181]> got Future <Future pending> attached to a different loop
I'm at my wits end how to approach this issue. Any ideas? I'm sure this is a reasonably common case. Thanks!!
n
hi @Albert Puig - it seems like you might trying to use a db engine object between processes (each task will run in its own process via dask, or if you use the default task runner, thread), which is not going to work i'd recommend creating a new connection object in each task or instead just using async python
a
indeed, thanks! I had to create a connection pool per task, and then clean it up. It's a bit complicated but this makes everything work