Carlos Perez Kuper
07/30/2025, 12:33 AMMarvin
07/30/2025, 12:33 AMMarvin
07/30/2025, 12:35 AMpython
from prefect import task
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Dict
import httpx
from prefect.logging import get_run_logger
# Configuration
POOL_SIZE = 10
DEFAULT_TIMEOUT = httpx.Timeout(10.0, connect=5.0)
class HttpClientPool:
def __init__(self):
self._clients: Dict[int, httpx.AsyncClient] = {}
@asynccontextmanager
async def get_client(self):
# Get current task run id to ensure client isolation
import asyncio
task_id = id(asyncio.current_task())
try:
if task_id not in self._clients:
self._clients[task_id] = httpx.AsyncClient(timeout=DEFAULT_TIMEOUT)
yield self._clients[task_id]
except Exception as e:
# If any error occurs, ensure we clean up the client
if task_id in self._clients:
await self._clients[task_id].aclose()
del self._clients[task_id]
raise e
async def cleanup(self):
"""Close all clients in the pool"""
for client in self._clients.values():
await client.aclose()
self._clients.clear()
# Create a global pool instance
client_pool = HttpClientPool()
@task(retries=3, retry_delay_seconds=[2, 5, 10])
async def fetch_details(url: str, resource: str, headers: Dict[str, str]) -> httpx.Response:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Fetching details for resource {resource}")
async with client_pool.get_client() as client:
response = await client.get(url + resource, headers=headers)
return response
# Example flow
from prefect import flow
@flow
async def main_flow():
try:
# Your flow logic here
response = await fetch_details("<https://api.example.com>", "/resource", {"Authorization": "Bearer token"})
# Process response...
finally:
# Cleanup at the end of the flow
await client_pool.cleanup()
Key improvements in this implementation:
1. Task-Specific Clients: Instead of a fixed pool size, we create clients per task execution, identified by the task's ID. This prevents issues with concurrent access and event loop conflicts.
2. Automatic Cleanup: The context manager ensures proper cleanup if any errors occur, and the cleanup
method can be called at the end of your flow to ensure all clients are properly closed.
3. Resource Management: The HttpClientPool
class manages client lifecycle and ensures proper cleanup, preventing resource leaks.
4. Keep-alive Support: Since the same client is reused for the same task execution, keep-alive connections are maintained as long as the task is running.
To use this in your Prefect flow:
python
@flow
async def your_flow():
try:
# Multiple calls will reuse the same client within each task
result1 = await fetch_details(url1, resource1, headers)
result2 = await fetch_details(url2, resource2, headers)
# Process results...
finally:
# Cleanup at the end of the flow
await client_pool.cleanup()
This approach:
- Avoids event loop closed errors by properly managing client lifecycle
- Maintains keep-alive connections within task executions
- Provides proper cleanup and resource management
- Works well with Prefect's task retry mechanism
Remember that if you need to make multiple requests within the same task, the client will be reused automatically, taking advantage of keep-alive connections. The cleanup is handled automatically at the flow level, ensuring no resources are leaked.Marvin
07/30/2025, 12:35 AMHttpClientPool
class to accept parameters for timeouts, limits, or other HTTPX client options.