Skip to content

Performance Tuning

This guide covers performance optimization strategies for A2A Registry deployments.

Performance Overview

A2A Registry is designed for high performance with these characteristics:

  • Sub-millisecond response times for agent lookups
  • Thousands of concurrent requests supported
  • Linear scaling with number of registered agents
  • Memory-efficient storage with minimal overhead

Benchmarking

Performance Metrics

Operation Requests/sec Latency (p95) Memory per Agent
Register Agent 5,000 2ms ~1KB
Get Agent 10,000 1ms -
List All Agents 1,000 10ms -
Search Agents 500 20ms -
Health Check 20,000 0.5ms -

Load Testing

Use these tools to benchmark your deployment:

# Apache Bench
ab -n 1000 -c 10 http://localhost:8000/health

# wrk
wrk -t12 -c400 -d30s http://localhost:8000/health

# Custom Python load test
python scripts/load_test.py --agents 1000 --concurrent 50

Server Optimization

1. Uvicorn Configuration

Optimize the ASGI server settings:

# Production configuration
uvicorn a2a_registry.server:create_app \
  --host 0.0.0.0 \
  --port 8000 \
  --workers 4 \
  --worker-class uvicorn.workers.UvicornWorker \
  --max-requests 10000 \
  --max-requests-jitter 1000 \
  --keepalive 5

2. Worker Processes

Scale with multiple workers:

# gunicorn with uvicorn workers
gunicorn a2a_registry.server:create_app \
  -w 4 \
  -k uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000 \
  --max-requests 10000 \
  --max-requests-jitter 1000 \
  --preload

3. Event Loop Optimization

# server.py optimization
import asyncio
import uvloop  # Optional: faster event loop

# Use uvloop if available
if uvloop:
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

app = create_app()

Storage Optimization

Current In-Memory Storage

The current implementation is already optimized for speed:

class OptimizedInMemoryStorage:
    def __init__(self):
        self._agents: Dict[str, AgentCard] = {}
        self._skills_index: Dict[str, Set[str]] = {}  # skill_id -> agent_ids
        self._lock = asyncio.Lock()

    async def register_agent(self, agent_id: str, agent_card: AgentCard):
        async with self._lock:
            self._agents[agent_id] = agent_card
            # Update skill index for fast searching
            for skill in agent_card.get('skills', []):
                skill_id = skill['id']
                if skill_id not in self._skills_index:
                    self._skills_index[skill_id] = set()
                self._skills_index[skill_id].add(agent_id)

Search Performance

Optimize search operations:

async def search_agents_optimized(self, **criteria) -> List[AgentCard]:
    # Use skill index for skill-based searches
    if 'skills' in criteria:
        candidate_ids = None
        for skill in criteria['skills']:
            skill_agents = self._skills_index.get(skill, set())
            if candidate_ids is None:
                candidate_ids = skill_agents.copy()
            else:
                candidate_ids &= skill_agents  # Intersection

        if not candidate_ids:
            return []

        candidates = [self._agents[aid] for aid in candidate_ids]
    else:
        candidates = list(self._agents.values())

    # Apply other filters
    results = []
    for agent in candidates:
        if self._matches_criteria(agent, criteria):
            results.append(agent)

    return results

Caching Strategies

Response Caching

Implement caching for frequently accessed data:

from functools import lru_cache
import time

class CachedStorage:
    def __init__(self, backend_storage):
        self.backend = backend_storage
        self._cache = {}
        self._cache_ttl = {}

    @lru_cache(maxsize=1000)
    async def get_agent_cached(self, agent_id: str):
        """Cache agent lookups for 5 minutes"""
        return await self.backend.get_agent(agent_id)

    async def list_agents_cached(self):
        """Cache agent list for 30 seconds"""
        now = time.time()
        if 'agents_list' in self._cache:
            if now - self._cache_ttl['agents_list'] < 30:
                return self._cache['agents_list']

        agents = await self.backend.list_agents()
        self._cache['agents_list'] = agents
        self._cache_ttl['agents_list'] = now
        return agents

Client-Side Caching

class CachedRegistryClient:
    def __init__(self, base_url: str, cache_ttl: int = 300):
        self.client = A2ARegistryClient(base_url)
        self.cache = {}
        self.cache_ttl = cache_ttl

    async def get_agent(self, agent_id: str):
        cache_key = f"agent:{agent_id}"
        now = time.time()

        if cache_key in self.cache:
            data, timestamp = self.cache[cache_key]
            if now - timestamp < self.cache_ttl:
                return data

        data = await self.client.get_agent(agent_id)
        self.cache[cache_key] = (data, now)
        return data

Database Optimization (Future)

PostgreSQL Configuration

When using a database backend:

-- Indexes for fast lookups
CREATE INDEX idx_agents_name ON agents(name);
CREATE INDEX idx_agents_skills ON agents USING GIN(skills);
CREATE INDEX idx_agents_description ON agents USING GIN(to_tsvector('english', description));

-- Partial indexes for common queries
CREATE INDEX idx_active_agents ON agents(id) WHERE active = true;

Connection Pooling

import asyncpg

class DatabaseStorage:
    def __init__(self, database_url: str):
        self.pool = None

    async def init_pool(self):
        self.pool = await asyncpg.create_pool(
            database_url,
            min_size=10,
            max_size=50,
            max_queries=50000,
            max_inactive_connection_lifetime=300
        )

    async def get_agent(self, agent_id: str):
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT data FROM agents WHERE id = $1",
                agent_id
            )
            return json.loads(row['data']) if row else None

Network Optimization

HTTP/2 Support

Enable HTTP/2 for better performance:

# Using hypercorn instead of uvicorn
hypercorn a2a_registry.server:create_app \
  --bind 0.0.0.0:8000 \
  --workers 4 \
  --http2

Compression

Enable response compression:

from starlette.middleware.gzip import GZipMiddleware

app = create_app()
app.add_middleware(GZipMiddleware, minimum_size=1000)

Keep-Alive Connections

# Configure keep-alive
@app.middleware("http")
async def add_keep_alive(request: Request, call_next):
    response = await call_next(request)
    response.headers["Connection"] = "keep-alive"
    response.headers["Keep-Alive"] = "timeout=5, max=1000"
    return response

Memory Optimization

Memory Profiling

Monitor memory usage:

import psutil
import gc

@app.middleware("http")
async def memory_monitoring(request: Request, call_next):
    # Memory before request
    process = psutil.Process()
    mem_before = process.memory_info().rss

    response = await call_next(request)

    # Memory after request
    mem_after = process.memory_info().rss
    mem_diff = mem_after - mem_before

    if mem_diff > 1024 * 1024:  # More than 1MB increase
        print(f"High memory usage for {request.url}: {mem_diff / 1024 / 1024:.2f}MB")
        gc.collect()  # Force garbage collection

    return response

Object Pool Pattern

from typing import List
from queue import Queue

class AgentCardPool:
    """Object pool to reduce allocation overhead"""

    def __init__(self, initial_size: int = 100):
        self._pool = Queue()
        for _ in range(initial_size):
            self._pool.put({})

    def get_agent_card(self) -> dict:
        try:
            return self._pool.get_nowait()
        except:
            return {}  # Create new if pool empty

    def return_agent_card(self, card: dict):
        card.clear()  # Reset the dictionary
        self._pool.put(card)

# Global pool instance
agent_pool = AgentCardPool()

Monitoring and Metrics

Prometheus Metrics

from prometheus_client import Counter, Histogram, Gauge, generate_latest

# Metrics
REQUEST_COUNT = Counter('registry_requests_total', 'Total requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('registry_request_duration_seconds', 'Request duration')
ACTIVE_AGENTS = Gauge('registry_active_agents', 'Number of active agents')
MEMORY_USAGE = Gauge('registry_memory_usage_bytes', 'Memory usage')

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    # Record metrics
    duration = time.time() - start_time
    REQUEST_COUNT.labels(request.method, request.url.path).inc()
    REQUEST_DURATION.observe(duration)

    return response

@app.get("/metrics")
async def get_metrics():
    return Response(generate_latest(), media_type="text/plain")

Custom Health Metrics

@app.get("/health/detailed")
async def detailed_health():
    process = psutil.Process()

    return {
        "status": "healthy",
        "agents_count": await storage.count_agents(),
        "memory_usage": process.memory_info().rss,
        "cpu_percent": process.cpu_percent(),
        "uptime": time.time() - start_time,
        "requests_per_second": calculate_rps(),
        "average_response_time": calculate_avg_response_time()
    }

Configuration Tuning

Environment Variables

# Performance tuning environment variables
export UVICORN_WORKERS=4
export UVICORN_MAX_REQUESTS=10000
export UVICORN_KEEPALIVE=5
export REGISTRY_CACHE_TTL=300
export REGISTRY_MAX_AGENTS=10000

Configuration Class

from pydantic import BaseSettings

class PerformanceSettings(BaseSettings):
    max_agents: int = 10000
    cache_ttl: int = 300
    worker_count: int = 4
    max_requests_per_worker: int = 10000
    keepalive_timeout: int = 5
    enable_compression: bool = True
    enable_metrics: bool = True

    class Config:
        env_prefix = "REGISTRY_"

settings = PerformanceSettings()

Load Balancing

Multiple Registry Instances

# docker-compose.yml
version: '3.8'
services:
  registry-1:
    image: a2a-registry:latest
    ports:
      - "8001:8000"
    environment:
      - REGISTRY_INSTANCE_ID=1

  registry-2:
    image: a2a-registry:latest
    ports:
      - "8002:8000"
    environment:
      - REGISTRY_INSTANCE_ID=2

  nginx:
    image: nginx:alpine
    ports:
      - "8000:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - registry-1
      - registry-2

Nginx Configuration

upstream registry_backend {
    least_conn;
    server registry-1:8000 max_fails=3 fail_timeout=30s;
    server registry-2:8000 max_fails=3 fail_timeout=30s;
}

server {
    listen 80;

    location / {
        proxy_pass http://registry_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_connect_timeout 5s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    location /health {
        proxy_pass http://registry_backend/health;
        proxy_connect_timeout 1s;
        proxy_send_timeout 1s;
        proxy_read_timeout 1s;
    }
}

Performance Testing

Load Test Script

import asyncio
import aiohttp
import time
from typing import List

class LoadTester:
    def __init__(self, registry_url: str):
        self.url = registry_url
        self.results = []

    async def register_agent(self, session: aiohttp.ClientSession, agent_id: str):
        start = time.time()

        agent_card = {
            "name": f"test-agent-{agent_id}",
            "description": f"Load test agent {agent_id}",
            "url": f"http://localhost:300{agent_id % 10}",
            "version": "0.420.0",
            "protocol_version": "0.3.0",
            "skills": [{"id": "test", "description": "Test skill"}]
        }

        async with session.post(
            f"{self.url}/agents",
            json={"agent_card": agent_card}
        ) as response:
            duration = time.time() - start
            success = response.status == 201
            self.results.append({
                "operation": "register",
                "duration": duration,
                "success": success
            })

    async def run_load_test(self, num_agents: int, concurrency: int):
        connector = aiohttp.TCPConnector(limit=concurrency * 2)
        async with aiohttp.ClientSession(connector=connector) as session:
            # Create semaphore to limit concurrency
            semaphore = asyncio.Semaphore(concurrency)

            async def bounded_register(agent_id):
                async with semaphore:
                    await self.register_agent(session, agent_id)

            # Run load test
            start_time = time.time()
            tasks = [bounded_register(i) for i in range(num_agents)]
            await asyncio.gather(*tasks)
            total_time = time.time() - start_time

            # Calculate statistics
            successful = sum(1 for r in self.results if r["success"])
            avg_duration = sum(r["duration"] for r in self.results) / len(self.results)

            print(f"Load Test Results:")
            print(f"  Total agents: {num_agents}")
            print(f"  Successful: {successful}")
            print(f"  Failed: {num_agents - successful}")
            print(f"  Total time: {total_time:.2f}s")
            print(f"  Requests/sec: {num_agents / total_time:.2f}")
            print(f"  Avg response time: {avg_duration * 1000:.2f}ms")

# Run load test
async def main():
    tester = LoadTester("http://localhost:8000")
    await tester.run_load_test(num_agents=1000, concurrency=50)

if __name__ == "__main__":
    asyncio.run(main())

Best Practices Summary

  1. Use multiple workers for production deployments
  2. Enable compression for larger responses
  3. Implement caching for frequently accessed data
  4. Monitor memory usage and implement cleanup
  5. Use connection pooling for database backends
  6. Set up proper health checks and monitoring
  7. Load test your deployment before production
  8. Monitor key metrics (response time, throughput, errors)

Next Steps