<@ULVA73B9P> how can i reuse httpx client betwee...
# ask-marvin
c
@Marvin how can i reuse httpx client between tasks avoiding event loop is closed error. Im using threadpoolexecutor, but i wanna take advantage from keep-alive feature. This is my implementation now: import asyncio import httpx from typing import AsyncGenerator # Tamaño máximo del pool (número de clientes concurrentes) POOL_SIZE = 10 # Cola asincrónica que actúa como pool _client_pool: asyncio.Queue[httpx.AsyncClient] = asyncio.Queue() # Inicializa el pool al levantar el módulo async def init_httpx_pool(): for _ in range(POOL_SIZE): client = httpx.AsyncClient(timeout=httpx.Timeout(10.0, connect=5.0)) await _client_pool.put(client) # Context manager asincrónico para usar clientes del pool class ClientFromPool: def __init__(self) -> None: self.client: httpx.AsyncClient | None = None async def __aenter__(self) -> httpx.AsyncClient: self.client = await _client_pool.get() return self.client async def __aexit__(self, exc_type, exc_val, exc_tb): if self.client and not self.client.is_closed: await _client_pool.put(self.client) # Llamalo así desde tus tasks Prefect async def get_client_from_pool() -> AsyncGenerator[httpx.AsyncClient, None]: async with ClientFromPool() as client: yield client # Cierre de todos los clientes del pool (por ejemplo, al apagar el proceso) async def close_all_clients(): while not _client_pool.empty(): client = await _client_pool.get() await client.aclose() and my task: @task(retries=3, _retry_delay_seconds_=[2, 5, 10]) async def fetch_details(_url_: str, _resource_: str, _headers_: Dict[str, str]) -> httpx.Response: logger = get_run_logger() logger.info(f"Obteniendo detalles del recurso {resource}") async with ClientFromPool() as client: response = await client.get(url + resource, headers=headers) return response
m
thought for 102.9 seconds
Based on the research and looking at your implementation, I can suggest an improved approach that will work better with Prefect's architecture while maintaining the keep-alive feature. Here's a refactored version that addresses the event loop issues:
Copy code
python
from prefect import task
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Dict
import httpx
from prefect.logging import get_run_logger

# Configuration
POOL_SIZE = 10
DEFAULT_TIMEOUT = httpx.Timeout(10.0, connect=5.0)

class HttpClientPool:
    def __init__(self):
        self._clients: Dict[int, httpx.AsyncClient] = {}
    
    @asynccontextmanager
    async def get_client(self):
        # Get current task run id to ensure client isolation
        import asyncio
        task_id = id(asyncio.current_task())
        
        try:
            if task_id not in self._clients:
                self._clients[task_id] = httpx.AsyncClient(timeout=DEFAULT_TIMEOUT)
            yield self._clients[task_id]
        except Exception as e:
            # If any error occurs, ensure we clean up the client
            if task_id in self._clients:
                await self._clients[task_id].aclose()
                del self._clients[task_id]
            raise e

    async def cleanup(self):
        """Close all clients in the pool"""
        for client in self._clients.values():
            await client.aclose()
        self._clients.clear()

# Create a global pool instance
client_pool = HttpClientPool()

@task(retries=3, retry_delay_seconds=[2, 5, 10])
async def fetch_details(url: str, resource: str, headers: Dict[str, str]) -> httpx.Response:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Fetching details for resource {resource}")
    
    async with client_pool.get_client() as client:
        response = await client.get(url + resource, headers=headers)
        return response

# Example flow
from prefect import flow

@flow
async def main_flow():
    try:
        # Your flow logic here
        response = await fetch_details("<https://api.example.com>", "/resource", {"Authorization": "Bearer token"})
        # Process response...
    finally:
        # Cleanup at the end of the flow
        await client_pool.cleanup()
Key improvements in this implementation: 1. Task-Specific Clients: Instead of a fixed pool size, we create clients per task execution, identified by the task's ID. This prevents issues with concurrent access and event loop conflicts. 2. Automatic Cleanup: The context manager ensures proper cleanup if any errors occur, and the
cleanup
method can be called at the end of your flow to ensure all clients are properly closed. 3. Resource Management: The
HttpClientPool
class manages client lifecycle and ensures proper cleanup, preventing resource leaks. 4. Keep-alive Support: Since the same client is reused for the same task execution, keep-alive connections are maintained as long as the task is running. To use this in your Prefect flow:
Copy code
python
@flow
async def your_flow():
    try:
        # Multiple calls will reuse the same client within each task
        result1 = await fetch_details(url1, resource1, headers)
        result2 = await fetch_details(url2, resource2, headers)
        # Process results...
    finally:
        # Cleanup at the end of the flow
        await client_pool.cleanup()
This approach: - Avoids event loop closed errors by properly managing client lifecycle - Maintains keep-alive connections within task executions - Provides proper cleanup and resource management - Works well with Prefect's task retry mechanism Remember that if you need to make multiple requests within the same task, the client will be reused automatically, taking advantage of keep-alive connections. The cleanup is handled automatically at the flow level, ensuring no resources are leaked.
If you need to customize the client configuration, you can modify the
HttpClientPool
class to accept parameters for timeouts, limits, or other HTTPX client options.