Syméon del Marmol
10/11/2025, 12:11 PMRuntimeError: .... is bound to a different event loop
. What is your advice to use async clients singleton across tasks ?Marvin
10/11/2025, 12:11 PMMarvin
10/11/2025, 12:16 PMfrom prefect import task
@task
async def fetch(url: str):
import httpx
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.get(url)
r.raise_for_status()
return r.json()
2) Loop-local singleton (per-event-loop, per-process)
- This reuses one client within the same event loop, but never shares it across loops or processes.
# http_client.py
import asyncio
import httpx
_client = None
_client_loop = None
_lock = asyncio.Lock()
async def get_httpx_client() -> httpx.AsyncClient:
global _client, _client_loop
loop = asyncio.get_running_loop()
# Fast path
if _client is not None and not _client.is_closed and _client_loop is loop:
return _client
async with _lock:
if _client is None or _client.is_closed or _client_loop is not loop:
# Optionally tune limits/timeouts as needed
_client = httpx.AsyncClient(timeout=30.0)
_client_loop = loop
return _client
async def close_httpx_client():
global _client
if _client is not None and not _client.is_closed:
await _client.aclose()
Use it in your Prefect tasks:
from prefect import task
from .http_client import get_httpx_client, close_httpx_client
@task
async def fetch(url: str):
client = await get_httpx_client()
r = await client.get(url)
r.raise_for_status()
return r.json()
# In your flow, after awaiting all background tasks:
# await close_httpx_client()
Notes specific to Prefect and your setup
- Background tasks with .delay(): These run concurrently within the same process that’s executing your flow. A loop-local client cache works well here. Avoid passing the client object as a task argument; keep the client acquisition inside the task via a helper like get_httpx_client().
- Fanning out to multiple workers: When your flow spawns work that lands on different Prefect workers (i.e., different processes/machines), each worker must create its own client. You cannot share a client instance across processes. The loop-local pattern above naturally becomes “worker-local” in that case.
- Why not a module-level singleton? If your code runs in contexts that may create new event loops (e.g., different threads, different async runners), a module-global client will eventually be accessed from a different loop than the one it was created on. That’s when you get the RuntimeError. Always bind the client to the loop that will use it.
- Connection reuse: httpx only reuses connections within the same client. If each task does multiple requests, the per-task context manager is usually enough. If you need pooling across many background tasks inside the same flow run, use the loop-local cache.
Debug tip
- Log the current loop id to confirm what’s happening:
import asyncio
print("loop:", id(asyncio.get_running_loop()))
If you see different ids between where you create and where you use the client, you’ve found the culprit.
If you can share whether these tasks run only as in-process background tasks or also on separate workers via deployments, I can tailor the pattern (and cleanup/teardown) to your exact setup.
Reference: httpx AsyncClient lifecycle and concurrency notes
- httpx AsyncClient docs