Logging and Monitoring¶
Comprehensive logging and monitoring setup for A2A Registry deployments.
Logging Configuration¶
Server Logging¶
A2A Registry uses Python's standard logging module with configurable levels:
# Start server with different log levels
a2a-registry serve --log-level DEBUG # Detailed debug information
a2a-registry serve --log-level INFO # Standard information (default)
a2a-registry serve --log-level WARNING # Warnings and errors only
a2a-registry serve --log-level ERROR # Errors only
Structured Logging¶
Configure structured JSON logging for production:
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add extra fields if present
if hasattr(record, 'agent_id'):
log_entry['agent_id'] = record.agent_id
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
if hasattr(record, 'duration'):
log_entry['duration'] = record.duration
return json.dumps(log_entry)
# Configure logging
def setup_logging():
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("a2a_registry")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
return logger
logger = setup_logging()
Application Logging¶
Add detailed logging to registry operations:
import logging
import time
from functools import wraps
logger = logging.getLogger("a2a_registry.storage")
def log_operation(operation_name: str):
"""Decorator to log storage operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
agent_id = kwargs.get('agent_id', args[1] if len(args) > 1 else 'unknown')
logger.info(f"Starting {operation_name}", extra={
'operation': operation_name,
'agent_id': agent_id
})
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"Completed {operation_name}", extra={
'operation': operation_name,
'agent_id': agent_id,
'duration': duration,
'success': True
})
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"Failed {operation_name}: {str(e)}", extra={
'operation': operation_name,
'agent_id': agent_id,
'duration': duration,
'success': False,
'error': str(e)
})
raise
return wrapper
return decorator
class InMemoryStorage:
@log_operation("register_agent")
async def register_agent(self, agent_id: str, agent_card: dict):
# Implementation with automatic logging
pass
@log_operation("get_agent")
async def get_agent(self, agent_id: str):
# Implementation with automatic logging
pass
Request Logging Middleware¶
Track all API requests:
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Generate unique request ID
request_id = str(uuid.uuid4())
request.state.request_id = request_id
start_time = time.time()
logger.info("Request started", extra={
'request_id': request_id,
'method': request.method,
'url': str(request.url),
'client_ip': request.client.host,
'user_agent': request.headers.get('user-agent', 'unknown')
})
try:
response = await call_next(request)
duration = time.time() - start_time
logger.info("Request completed", extra={
'request_id': request_id,
'method': request.method,
'url': str(request.url),
'status_code': response.status_code,
'duration': duration
})
return response
except Exception as e:
duration = time.time() - start_time
logger.error("Request failed", extra={
'request_id': request_id,
'method': request.method,
'url': str(request.url),
'duration': duration,
'error': str(e)
})
raise
# Add middleware to app
app.add_middleware(RequestLoggingMiddleware)
Log Aggregation¶
ELK Stack Integration¶
Logstash Configuration¶
# logstash.conf
input {
file {
path => "/var/log/a2a-registry/*.log"
start_position => "beginning"
codec => "json"
}
}
filter {
if [logger] == "a2a_registry" {
mutate {
add_tag => ["a2a_registry"]
}
}
# Parse duration as number
if [duration] {
mutate {
convert => { "duration" => "float" }
}
}
# Add timestamp parsing
date {
match => [ "timestamp", "ISO8601" ]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "a2a-registry-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
Elasticsearch Index Template¶
{
"index_patterns": ["a2a-registry-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"operation": {
"type": "keyword"
},
"agent_id": {
"type": "keyword"
},
"duration": {
"type": "float"
},
"status_code": {
"type": "integer"
},
"method": {
"type": "keyword"
},
"url": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
Kibana Dashboards¶
Key visualizations to create:
- Request Rate: Requests per minute over time
- Response Times: Average and percentile response times
- Error Rates: HTTP error status codes
- Agent Operations: Registration, discovery, search operations
- Top Agents: Most frequently accessed agents
Fluentd Integration¶
# fluent.conf
<source>
@type tail
path /var/log/a2a-registry/*.log
pos_file /var/log/fluentd/a2a-registry.log.pos
tag a2a.registry
format json
</source>
<filter a2a.registry>
@type record_transformer
<record>
service "a2a-registry"
environment "#{ENV['ENVIRONMENT']}"
</record>
</filter>
<match a2a.registry>
@type elasticsearch
host elasticsearch
port 9200
index_name a2a-registry
type_name _doc
</match>
Metrics and Monitoring¶
Prometheus Integration¶
from prometheus_client import Counter, Histogram, Gauge, Info, generate_latest
import time
# Define metrics
REQUEST_COUNT = Counter(
'registry_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'registry_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
ACTIVE_AGENTS = Gauge(
'registry_active_agents',
'Number of active agents'
)
AGENT_OPERATIONS = Counter(
'registry_agent_operations_total',
'Total agent operations',
['operation', 'status']
)
REGISTRY_INFO = Info(
'registry_info',
'Registry information'
)
# Set static info
REGISTRY_INFO.info({
'version': '0.420.0',
'python_version': platform.python_version(),
'platform': platform.platform()
})
# Middleware to collect metrics
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
method = request.method
endpoint = request.url.path
try:
response = await call_next(request)
duration = time.time() - start_time
status = str(response.status_code)
# Record metrics
REQUEST_COUNT.labels(method, endpoint, status).inc()
REQUEST_DURATION.labels(method, endpoint).observe(duration)
return response
except Exception as e:
duration = time.time() - start_time
REQUEST_COUNT.labels(method, endpoint, "500").inc()
REQUEST_DURATION.labels(method, endpoint).observe(duration)
raise
# Add metrics endpoint
@app.get("/metrics")
async def get_metrics():
# Update active agents count
agent_count = await storage.count_agents()
ACTIVE_AGENTS.set(agent_count)
return Response(generate_latest(), media_type="text/plain")
# Add middleware
app.add_middleware(MetricsMiddleware)
Custom Metrics Collection¶
class MetricsCollector:
def __init__(self, storage):
self.storage = storage
self.start_time = time.time()
async def collect_metrics(self):
"""Collect custom application metrics"""
metrics = {
'uptime': time.time() - self.start_time,
'agent_count': await self.storage.count_agents(),
'memory_usage': psutil.Process().memory_info().rss,
'cpu_percent': psutil.Process().cpu_percent(),
}
# Collect agent statistics
agents = await self.storage.list_agents()
# Count by transport protocol
transport_counts = {}
for agent in agents:
transport = agent.get('preferred_transport', 'unknown')
transport_counts[transport] = transport_counts.get(transport, 0) + 1
metrics['transports'] = transport_counts
# Count by capabilities
capability_counts = {
'streaming': 0,
'push_notifications': 0,
'state_transition_history': 0
}
for agent in agents:
capabilities = agent.get('capabilities', {})
for cap, value in capabilities.items():
if value and cap in capability_counts:
capability_counts[cap] += 1
metrics['capabilities'] = capability_counts
return metrics
# Periodic metrics collection
async def metrics_background_task():
collector = MetricsCollector(storage)
while True:
try:
metrics = await collector.collect_metrics()
# Update Prometheus gauges
ACTIVE_AGENTS.set(metrics['agent_count'])
# Log metrics for external collection
logger.info("Metrics collected", extra={
'metrics': metrics,
'metric_type': 'application'
})
except Exception as e:
logger.error(f"Metrics collection failed: {e}")
await asyncio.sleep(60) # Collect every minute
# Start background task
@app.on_event("startup")
async def start_metrics_collection():
asyncio.create_task(metrics_background_task())
Health Checks¶
Basic Health Check¶
@app.get("/health")
async def health_check():
"""Basic health check endpoint"""
try:
# Test storage connectivity
await storage.list_agents()
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "0.420.0"
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"timestamp": datetime.utcnow().isoformat(),
"error": str(e)
}
)
Detailed Health Check¶
@app.get("/health/detailed")
async def detailed_health_check():
"""Comprehensive health check with metrics"""
checks = {}
overall_status = "healthy"
# Storage check
try:
start = time.time()
await storage.list_agents()
checks["storage"] = {
"status": "healthy",
"response_time": time.time() - start
}
except Exception as e:
checks["storage"] = {
"status": "unhealthy",
"error": str(e)
}
overall_status = "unhealthy"
# Memory check
process = psutil.Process()
memory_usage = process.memory_info().rss
memory_percent = process.memory_percent()
if memory_percent > 90:
checks["memory"] = {
"status": "warning",
"usage": memory_usage,
"percent": memory_percent
}
if overall_status == "healthy":
overall_status = "warning"
else:
checks["memory"] = {
"status": "healthy",
"usage": memory_usage,
"percent": memory_percent
}
# Disk space check (if using file logging)
disk_usage = psutil.disk_usage('/')
disk_percent = (disk_usage.used / disk_usage.total) * 100
if disk_percent > 90:
checks["disk"] = {
"status": "warning",
"percent": disk_percent
}
if overall_status == "healthy":
overall_status = "warning"
else:
checks["disk"] = {
"status": "healthy",
"percent": disk_percent
}
return {
"status": overall_status,
"timestamp": datetime.utcnow().isoformat(),
"checks": checks,
"uptime": time.time() - start_time,
"version": "0.420.0"
}
Kubernetes Health Checks¶
# kubernetes deployment with health checks
apiVersion: apps/v1
kind: Deployment
metadata:
name: a2a-registry
spec:
template:
spec:
containers:
- name: a2a-registry
image: a2a-registry:v0.1.5
ports:
- containerPort: 8000
# Liveness probe
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
# Readiness probe
readinessProbe:
httpGet:
path: /health/detailed
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3
# Startup probe
startupProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 10
Alerting¶
Prometheus Alerting Rules¶
# prometheus-alerts.yml
groups:
- name: a2a-registry
rules:
- alert: RegistryDown
expr: up{job="a2a-registry"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "A2A Registry is down"
description: "A2A Registry has been down for more than 1 minute"
- alert: HighErrorRate
expr: rate(registry_requests_total{status=~"5.."}[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High error rate in A2A Registry"
description: "Error rate is {{ $value }} errors per second"
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(registry_request_duration_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "High response time in A2A Registry"
description: "95th percentile response time is {{ $value }} seconds"
- alert: HighMemoryUsage
expr: process_resident_memory_bytes{job="a2a-registry"} > 1000000000
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage in A2A Registry"
description: "Memory usage is {{ $value | humanize }}B"
Slack Alerting Integration¶
import aiohttp
import json
class SlackAlerter:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def send_alert(self, level: str, message: str, details: dict = None):
color_map = {
"info": "#36a64f",
"warning": "#ffcc00",
"error": "#ff0000",
"critical": "#8B0000"
}
payload = {
"attachments": [{
"color": color_map.get(level, "#cccccc"),
"title": f"A2A Registry Alert ({level.upper()})",
"text": message,
"timestamp": int(time.time())
}]
}
if details:
payload["attachments"][0]["fields"] = [
{"title": key, "value": str(value), "short": True}
for key, value in details.items()
]
async with aiohttp.ClientSession() as session:
await session.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
# Usage in error handling
alerter = SlackAlerter(os.getenv("SLACK_WEBHOOK_URL"))
async def handle_critical_error(error: Exception, context: dict):
await alerter.send_alert(
level="critical",
message=f"Critical error in A2A Registry: {str(error)}",
details=context
)
Log Analysis¶
Common Log Queries¶
Elasticsearch/Kibana Queries¶
// High error rates
{
"query": {
"bool": {
"must": [
{"range": {"timestamp": {"gte": "now-1h"}}},
{"term": {"level": "ERROR"}}
]
}
},
"aggs": {
"errors_over_time": {
"date_histogram": {
"field": "timestamp",
"interval": "5m"
}
}
}
}
// Slow requests
{
"query": {
"bool": {
"must": [
{"range": {"timestamp": {"gte": "now-1h"}}},
{"range": {"duration": {"gte": 1.0}}}
]
}
},
"sort": [{"duration": {"order": "desc"}}]
}
// Agent operation statistics
{
"query": {
"bool": {
"must": [
{"range": {"timestamp": {"gte": "now-24h"}}},
{"exists": {"field": "operation"}}
]
}
},
"aggs": {
"operations": {
"terms": {"field": "operation"},
"aggs": {
"avg_duration": {
"avg": {"field": "duration"}
}
}
}
}
}
Log Analysis Scripts¶
import json
from collections import defaultdict
from datetime import datetime, timedelta
def analyze_logs(log_file_path: str):
"""Analyze A2A Registry logs for insights"""
operations = defaultdict(list)
errors = []
slow_requests = []
with open(log_file_path, 'r') as f:
for line in f:
try:
log_entry = json.loads(line)
# Collect operation statistics
if 'operation' in log_entry:
operations[log_entry['operation']].append(log_entry)
# Collect errors
if log_entry.get('level') == 'ERROR':
errors.append(log_entry)
# Collect slow requests
if log_entry.get('duration', 0) > 1.0:
slow_requests.append(log_entry)
except json.JSONDecodeError:
continue
# Generate report
print("=== A2A Registry Log Analysis ===\n")
# Operation statistics
print("Operation Statistics:")
for op, entries in operations.items():
durations = [e.get('duration', 0) for e in entries if 'duration' in e]
if durations:
avg_duration = sum(durations) / len(durations)
print(f" {op}: {len(entries)} operations, avg {avg_duration:.3f}s")
print(f"\nErrors: {len(errors)}")
print(f"Slow requests (>1s): {len(slow_requests)}")
# Top errors
if errors:
error_types = defaultdict(int)
for error in errors:
error_msg = error.get('message', 'Unknown error')
error_types[error_msg] += 1
print("\nTop Errors:")
for error, count in sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" {count}x: {error}")
if __name__ == "__main__":
analyze_logs("/var/log/a2a-registry/app.log")
Monitoring Dashboard¶
Grafana Dashboard Configuration¶
{
"dashboard": {
"title": "A2A Registry Monitoring",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(registry_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Response Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(registry_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.50, rate(registry_request_duration_seconds_bucket[5m]))",
"legendFormat": "50th percentile"
}
]
},
{
"title": "Active Agents",
"type": "singlestat",
"targets": [
{
"expr": "registry_active_agents",
"legendFormat": "Agents"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(registry_requests_total{status=~\"4..|5..\"}[5m])",
"legendFormat": "{{status}}"
}
]
}
]
}
}
Best Practices¶
- Use structured logging for easy parsing and analysis
- Include correlation IDs to track requests across services
- Log at appropriate levels to avoid noise
- Monitor key metrics like response time, error rate, and throughput
- Set up alerting for critical conditions
- Regular log rotation to manage disk space
- Centralized logging for distributed deployments
- Health checks for all critical components
Next Steps¶
- Set up Performance Tuning for optimization
- Review Common Issues for troubleshooting
- Check Architecture Guide for system design