<@ULVA73B9P> in concurrent tasks, I use libraries ...
# ask-marvin
s
@Marvin in concurrent tasks, I use libraries relying on httpx AsyncClient to do non-blocking calls to APIs. To avoid creating the creation of a new client every time, I tempted to create a global singleton. But I often end with errors like this:
RuntimeError: .... is bound to a different event loop
. What is your advice to use async clients singleton across tasks ?
m
thought for 310.2 seconds
Short answer: don’t share a single httpx.AsyncClient across different event loops or processes. That “bound to a different event loop” error is exactly what happens when the same client is used from a different loop/thread than the one it was created on. What works reliably: - Safest: create and close a client inside each async task (use it for multiple requests within the task). This avoids cross-loop problems entirely. - If you really want reuse: cache one client per event loop (and per process). That gives you connection pooling without crossing loop boundaries. Each worker process will naturally have its own client. Patterns you can use 1) Simple and safe per-task
Copy code
from prefect import task

@task
async def fetch(url: str):
    import httpx
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.get(url)
        r.raise_for_status()
        return r.json()
2) Loop-local singleton (per-event-loop, per-process) - This reuses one client within the same event loop, but never shares it across loops or processes.
Copy code
# http_client.py
import asyncio
import httpx

_client = None
_client_loop = None
_lock = asyncio.Lock()

async def get_httpx_client() -> httpx.AsyncClient:
    global _client, _client_loop
    loop = asyncio.get_running_loop()

    # Fast path
    if _client is not None and not _client.is_closed and _client_loop is loop:
        return _client

    async with _lock:
        if _client is None or _client.is_closed or _client_loop is not loop:
            # Optionally tune limits/timeouts as needed
            _client = httpx.AsyncClient(timeout=30.0)
            _client_loop = loop
        return _client

async def close_httpx_client():
    global _client
    if _client is not None and not _client.is_closed:
        await _client.aclose()
Use it in your Prefect tasks:
Copy code
from prefect import task
from .http_client import get_httpx_client, close_httpx_client

@task
async def fetch(url: str):
    client = await get_httpx_client()
    r = await client.get(url)
    r.raise_for_status()
    return r.json()

# In your flow, after awaiting all background tasks:
# await close_httpx_client()
Notes specific to Prefect and your setup - Background tasks with .delay(): These run concurrently within the same process that’s executing your flow. A loop-local client cache works well here. Avoid passing the client object as a task argument; keep the client acquisition inside the task via a helper like get_httpx_client(). - Fanning out to multiple workers: When your flow spawns work that lands on different Prefect workers (i.e., different processes/machines), each worker must create its own client. You cannot share a client instance across processes. The loop-local pattern above naturally becomes “worker-local” in that case. - Why not a module-level singleton? If your code runs in contexts that may create new event loops (e.g., different threads, different async runners), a module-global client will eventually be accessed from a different loop than the one it was created on. That’s when you get the RuntimeError. Always bind the client to the loop that will use it. - Connection reuse: httpx only reuses connections within the same client. If each task does multiple requests, the per-task context manager is usually enough. If you need pooling across many background tasks inside the same flow run, use the loop-local cache. Debug tip - Log the current loop id to confirm what’s happening:
Copy code
import asyncio
print("loop:", id(asyncio.get_running_loop()))
If you see different ids between where you create and where you use the client, you’ve found the culprit. If you can share whether these tasks run only as in-process background tasks or also on separate workers via deployments, I can tailor the pattern (and cleanup/teardown) to your exact setup. Reference: httpx AsyncClient lifecycle and concurrency notes - httpx AsyncClient docs