how can i reuse httpx client between tasks avoiding event loop is closed error. Im using threadpoolexecutor, but i wanna take advantage from keep-alive feature.
This is my implementation now:
import asyncio
import httpx
from typing import AsyncGenerator
# Tamaño máximo del pool (número de clientes concurrentes)
POOL_SIZE = 10
# Cola asincrónica que actúa como pool
_client_pool: asyncio.Queue[httpx.AsyncClient] = asyncio.Queue()
# Inicializa el pool al levantar el módulo
async def init_httpx_pool():
for _ in range(POOL_SIZE):
client = httpx.AsyncClient(timeout=httpx.Timeout(10.0, connect=5.0))
await _client_pool.put(client)
# Context manager asincrónico para usar clientes del pool
class ClientFromPool:
def __init__(self) -> None:
self.client: httpx.AsyncClient | None = None
async def __aenter__(self) -> httpx.AsyncClient:
self.client = await _client_pool.get()
return self.client
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client and not self.client.is_closed:
await _client_pool.put(self.client)
# Llamalo así desde tus tasks Prefect
async def get_client_from_pool() -> AsyncGenerator[httpx.AsyncClient, None]:
async with ClientFromPool() as client:
yield client
# Cierre de todos los clientes del pool (por ejemplo, al apagar el proceso)
async def close_all_clients():
while not _client_pool.empty():
client = await _client_pool.get()
await client.aclose()
and my task:
@task(
retries=3, _retry_delay_seconds_=[2, 5, 10])
async def fetch_details(_url_:
str, _resource_:
str, _headers_: Dict[
str,
str]) -> httpx.Response:
logger = get_run_logger()
logger.info(f"Obteniendo detalles del recurso {resource}")
async with ClientFromPool() as client:
response = await client.get(url + resource,
headers=headers)
return response