Skip to content

Agent Discovery Examples

This guide demonstrates various strategies and patterns for discovering agents in the A2A Registry.

Basic Discovery

The A2A Registry supports both JSON-RPC 2.0 (primary) and REST (secondary) protocols. All examples show JSON-RPC first as it's the default transport per the A2A Protocol v0.3.0 specification.

Search by Name

Find agents by their name using JSON-RPC:

curl -X POST http://localhost:8000/jsonrpc \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "search_agents",
    "params": {"query": "weather"},
    "id": 1
  }'

REST alternative:

curl -X POST http://localhost:8000/agents/search \
  -H "Content-Type: application/json" \
  -d '{"query": "weather"}'

Search by Description

Find agents by keywords in their description using JSON-RPC:

curl -X POST http://localhost:8000/jsonrpc \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "search_agents",
    "params": {"query": "natural language processing"},
    "id": 2
  }'

REST alternative:

curl -X POST http://localhost:8000/agents/search \
  -H "Content-Type: application/json" \
  -d '{"query": "natural language processing"}'

Search by Skills

Find agents with specific capabilities using JSON-RPC:

curl -X POST http://localhost:8000/jsonrpc \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "method": "search_agents",
    "params": {"query": "translate"},
    "id": 3
  }'

REST alternative:

curl -X POST http://localhost:8000/agents/search \
  -H "Content-Type: application/json" \
  -d '{"query": "translate"}'

Advanced Discovery Patterns

Python Discovery Client with JSON-RPC

import requests
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class AgentSearchCriteria:
    required_skills: List[str] = None
    preferred_skills: List[str] = None
    min_version: str = None
    max_response_time_ms: int = None
    regions: List[str] = None
    exclude_agents: List[str] = None

class AgentDiscoveryClient:
    def __init__(self, registry_url="http://localhost:8000"):
        self.registry_url = registry_url
        self.request_id = 0

    def _next_id(self) -> int:
        """Generate next JSON-RPC request ID."""
        self.request_id += 1
        return self.request_id

    def _jsonrpc_request(self, method: str, params: dict = None) -> dict:
        """Make a JSON-RPC 2.0 request."""
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "id": self._next_id()
        }
        if params:
            payload["params"] = params

        response = requests.post(
            f"{self.registry_url}/jsonrpc",
            json=payload,
            headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()
        result = response.json()

        if "error" in result:
            raise Exception(f"JSON-RPC Error: {result['error']}")

        return result.get("result", {})

    def find_agents_by_skill(self, skill_id: str) -> List[Dict[str, Any]]:
        """Find all agents that have a specific skill using JSON-RPC."""
        result = self._jsonrpc_request("search_agents", {"query": skill_id})
        return result.get("agents", [])

    def find_agents_by_skill_rest(self, skill_id: str) -> List[Dict[str, Any]]:
        """Find all agents that have a specific skill using REST (fallback)."""
        response = requests.post(
            f"{self.registry_url}/agents/search",
            json={"query": skill_id}
        )
        response.raise_for_status()
        return response.json()["agents"]

    def find_best_agent_for_task(self, 
                                required_skills: List[str],
                                task_description: str = None) -> Optional[Dict[str, Any]]:
        """Find the best agent for a specific task."""
        all_agents = self.get_all_agents()

        # Filter agents that have all required skills
        compatible_agents = []
        for agent in all_agents:
            agent_skills = {skill["id"] for skill in agent.get("skills", [])}
            if all(skill in agent_skills for skill in required_skills):
                compatible_agents.append(agent)

        if not compatible_agents:
            return None

        # Rank agents by relevance
        ranked_agents = self._rank_agents_by_relevance(
            compatible_agents, required_skills, task_description
        )

        return ranked_agents[0] if ranked_agents else None

    def find_agents_by_criteria(self, criteria: AgentSearchCriteria) -> List[Dict[str, Any]]:
        """Find agents matching complex criteria."""
        all_agents = self.get_all_agents()
        matching_agents = []

        for agent in all_agents:
            if self._matches_criteria(agent, criteria):
                matching_agents.append(agent)

        return matching_agents

    def get_all_agents(self) -> List[Dict[str, Any]]:
        """Get all registered agents using JSON-RPC."""
        result = self._jsonrpc_request("list_agents")
        return result.get("agents", [])

    def get_all_agents_rest(self) -> List[Dict[str, Any]]:
        """Get all registered agents using REST (fallback)."""
        response = requests.get(f"{self.registry_url}/agents")
        response.raise_for_status()
        return response.json()["agents"]

    def find_similar_agents(self, reference_agent_id: str) -> List[Dict[str, Any]]:
        """Find agents similar to a reference agent using JSON-RPC."""
        # Get reference agent
        try:
            result = self._jsonrpc_request("get_agent", {"agent_name": reference_agent_id})
            reference_agent = result.get("agent_card")
        except Exception:
            return []

        if not reference_agent:
            return []
        reference_skills = {skill["id"] for skill in reference_agent.get("skills", [])}

        # Find agents with overlapping skills
        all_agents = self.get_all_agents()
        similar_agents = []

        for agent in all_agents:
            if agent["name"] == reference_agent_id:
                continue

            agent_skills = {skill["id"] for skill in agent.get("skills", [])}
            overlap = len(reference_skills.intersection(agent_skills))

            if overlap > 0:
                similarity_score = overlap / len(reference_skills.union(agent_skills))
                agent["similarity_score"] = similarity_score
                similar_agents.append(agent)

        # Sort by similarity
        similar_agents.sort(key=lambda x: x["similarity_score"], reverse=True)
        return similar_agents

    def _matches_criteria(self, agent: Dict[str, Any], criteria: AgentSearchCriteria) -> bool:
        """Check if an agent matches the given criteria."""
        # Check required skills
        if criteria.required_skills:
            agent_skills = {skill["id"] for skill in agent.get("skills", [])}
            if not all(skill in agent_skills for skill in criteria.required_skills):
                return False

        # Check version requirements
        if criteria.min_version:
            from packaging import version
            if version.parse(agent.get("version", "0.0.0")) < version.parse(criteria.min_version):
                return False

        # Check response time requirements
        if criteria.max_response_time_ms:
            agent_response_time = agent.get("metadata", {}).get("average_response_time_ms")
            if agent_response_time and agent_response_time > criteria.max_response_time_ms:
                return False

        # Check region preferences
        if criteria.regions:
            agent_regions = agent.get("metadata", {}).get("preferred_regions", [])
            if not any(region in agent_regions for region in criteria.regions):
                return False

        # Check exclusion list
        if criteria.exclude_agents and agent["name"] in criteria.exclude_agents:
            return False

        return True

    def _rank_agents_by_relevance(self, 
                                 agents: List[Dict[str, Any]], 
                                 required_skills: List[str],
                                 task_description: str = None) -> List[Dict[str, Any]]:
        """Rank agents by their relevance to the task."""
        scored_agents = []

        for agent in agents:
            score = 0
            agent_skills = {skill["id"] for skill in agent.get("skills", [])}

            # Base score: number of matching skills
            matching_skills = len(set(required_skills).intersection(agent_skills))
            score += matching_skills * 10

            # Bonus for additional relevant skills
            if task_description:
                for skill in agent.get("skills", []):
                    if any(word in skill["description"].lower() 
                          for word in task_description.lower().split()):
                        score += 5

            # Performance bonuses
            metadata = agent.get("metadata", {})
            if metadata.get("average_response_time_ms", 1000) < 500:
                score += 5  # Fast response time

            if metadata.get("uptime_percentage", 0) > 99:
                score += 3  # High uptime

            agent["relevance_score"] = score
            scored_agents.append(agent)

        scored_agents.sort(key=lambda x: x["relevance_score"], reverse=True)
        return scored_agents

# Usage examples
def main():
    discovery = AgentDiscoveryClient()

    # Find agents for natural language processing
    nlp_agents = discovery.find_agents_by_skill("natural_language_processing")
    print(f"Found {len(nlp_agents)} NLP agents")

    # Find best agent for translation task
    translation_agent = discovery.find_best_agent_for_task(
        required_skills=["translate_text", "detect_language"],
        task_description="Translate document from Spanish to English"
    )
    if translation_agent:
        print(f"Best translation agent: {translation_agent['name']}")

    # Find agents with specific criteria
    criteria = AgentSearchCriteria(
        required_skills=["image_analysis"],
        min_version="2.0.0",
        max_response_time_ms=1000,
        regions=["us-east-1", "us-west-2"]
    )
    matching_agents = discovery.find_agents_by_criteria(criteria)
    print(f"Found {len(matching_agents)} agents matching criteria")

    # Find similar agents
    if nlp_agents:
        similar = discovery.find_similar_agents(nlp_agents[0]["name"])
        print(f"Found {len(similar)} similar agents")

if __name__ == "__main__":
    main()

Task-Specific Discovery

Multi-Modal Task Discovery

class TaskOrchestrator:
    def __init__(self, discovery_client: AgentDiscoveryClient):
        self.discovery = discovery_client

    def find_agents_for_complex_task(self, task_definition: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
        """Find agents for a complex multi-step task."""
        required_capabilities = task_definition.get("required_capabilities", [])
        optional_capabilities = task_definition.get("optional_capabilities", [])
        constraints = task_definition.get("constraints", {})

        # Find agents for each capability
        capability_agents = {}

        for capability in required_capabilities:
            agents = self.discovery.find_agents_by_skill(capability)

            # Apply constraints
            if constraints:
                agents = self._apply_constraints(agents, constraints)

            capability_agents[capability] = agents

        # Find agents that can handle multiple capabilities
        multi_capability_agents = self._find_multi_capability_agents(
            required_capabilities, capability_agents
        )

        return {
            "single_capability": capability_agents,
            "multi_capability": multi_capability_agents,
            "orchestration_plan": self._create_orchestration_plan(
                required_capabilities, capability_agents, multi_capability_agents
            )
        }

    def _apply_constraints(self, agents: List[Dict[str, Any]], constraints: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Apply constraints to filter agents."""
        filtered_agents = []

        for agent in agents:
            metadata = agent.get("metadata", {})

            # Performance constraints
            if "max_response_time" in constraints:
                if metadata.get("average_response_time_ms", 0) > constraints["max_response_time"]:
                    continue

            # Resource constraints
            if "max_memory_usage" in constraints:
                if metadata.get("memory_usage_mb", 0) > constraints["max_memory_usage"]:
                    continue

            # Compliance constraints
            if "required_compliance" in constraints:
                agent_compliance = metadata.get("compliance", [])
                if not all(req in agent_compliance for req in constraints["required_compliance"]):
                    continue

            filtered_agents.append(agent)

        return filtered_agents

    def _find_multi_capability_agents(self, 
                                    required_capabilities: List[str],
                                    capability_agents: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
        """Find agents that can handle multiple capabilities."""
        all_agents = self.discovery.get_all_agents()
        multi_capability = []

        for agent in all_agents:
            agent_skills = {skill["id"] for skill in agent.get("skills", [])}

            # Count how many required capabilities this agent can handle
            covered_capabilities = [
                cap for cap in required_capabilities
                if cap in agent_skills
            ]

            if len(covered_capabilities) > 1:
                agent["covered_capabilities"] = covered_capabilities
                agent["capability_coverage"] = len(covered_capabilities) / len(required_capabilities)
                multi_capability.append(agent)

        # Sort by capability coverage
        multi_capability.sort(key=lambda x: x["capability_coverage"], reverse=True)
        return multi_capability

    def _create_orchestration_plan(self, 
                                 required_capabilities: List[str],
                                 capability_agents: Dict[str, List[Dict[str, Any]]],
                                 multi_capability_agents: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create an orchestration plan for the task."""
        plan = {
            "strategy": "unknown",
            "agents": [],
            "workflow": []
        }

        # Strategy 1: Single agent handles everything
        if multi_capability_agents:
            best_agent = multi_capability_agents[0]
            if best_agent["capability_coverage"] == 1.0:
                plan["strategy"] = "single_agent"
                plan["agents"] = [best_agent]
                plan["workflow"] = [
                    {
                        "step": 1,
                        "agent": best_agent["name"],
                        "capabilities": best_agent["covered_capabilities"]
                    }
                ]
                return plan

        # Strategy 2: Multi-agent pipeline
        plan["strategy"] = "multi_agent_pipeline"
        step = 1

        for capability in required_capabilities:
            agents = capability_agents.get(capability, [])
            if agents:
                best_agent = agents[0]  # Assume first is best ranked
                plan["agents"].append(best_agent)
                plan["workflow"].append({
                    "step": step,
                    "agent": best_agent["name"],
                    "capability": capability
                })
                step += 1

        return plan

# Example usage
def document_processing_example():
    discovery = AgentDiscoveryClient()
    orchestrator = TaskOrchestrator(discovery)

    # Define a complex document processing task
    task = {
        "name": "legal_document_analysis",
        "description": "Analyze legal documents for compliance and extract key information",
        "required_capabilities": [
            "pdf_text_extraction",
            "legal_entity_recognition",
            "compliance_checking",
            "document_summarization"
        ],
        "optional_capabilities": [
            "language_detection",
            "sentiment_analysis"
        ],
        "constraints": {
            "max_response_time": 5000,  # 5 seconds
            "required_compliance": ["GDPR", "SOX"],
            "preferred_regions": ["us-east-1", "eu-west-1"]
        }
    }

    # Find suitable agents
    result = orchestrator.find_agents_for_complex_task(task)

    print("Orchestration Plan:")
    print(f"Strategy: {result['orchestration_plan']['strategy']}")
    print(f"Required agents: {len(result['orchestration_plan']['agents'])}")

    for step in result['orchestration_plan']['workflow']:
        print(f"Step {step['step']}: {step['agent']} - {step.get('capability', step.get('capabilities'))}")

def main():
    document_processing_example()

if __name__ == "__main__":
    main()

Discovery with Load Balancing

import random
from typing import Dict, List, Any

class LoadBalancedDiscovery:
    def __init__(self, discovery_client: AgentDiscoveryClient):
        self.discovery = discovery_client
        self.agent_metrics = {}  # Track agent performance metrics

    def find_agent_with_load_balancing(self, 
                                     required_skill: str,
                                     load_balancing_strategy: str = "round_robin") -> Dict[str, Any]:
        """Find an agent with load balancing considerations."""
        candidate_agents = self.discovery.find_agents_by_skill(required_skill)

        if not candidate_agents:
            return None

        if load_balancing_strategy == "round_robin":
            return self._round_robin_selection(candidate_agents, required_skill)
        elif load_balancing_strategy == "least_loaded":
            return self._least_loaded_selection(candidate_agents)
        elif load_balancing_strategy == "random":
            return random.choice(candidate_agents)
        elif load_balancing_strategy == "performance_weighted":
            return self._performance_weighted_selection(candidate_agents)
        else:
            return candidate_agents[0]  # Default to first

    def _round_robin_selection(self, agents: List[Dict[str, Any]], skill: str) -> Dict[str, Any]:
        """Select agent using round-robin strategy."""
        if skill not in self.agent_metrics:
            self.agent_metrics[skill] = {"round_robin_index": 0}

        index = self.agent_metrics[skill]["round_robin_index"]
        selected_agent = agents[index % len(agents)]

        # Update index for next selection
        self.agent_metrics[skill]["round_robin_index"] = (index + 1) % len(agents)

        return selected_agent

    def _least_loaded_selection(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Select the least loaded agent."""
        # In a real implementation, this would query actual load metrics
        # For this example, we'll use simulated metrics

        least_loaded = None
        min_load = float('inf')

        for agent in agents:
            # Simulate getting current load (0-100%)
            current_load = self._get_agent_current_load(agent["name"])

            if current_load < min_load:
                min_load = current_load
                least_loaded = agent

        return least_loaded

    def _performance_weighted_selection(self, agents: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Select agent based on performance weights."""
        weights = []

        for agent in agents:
            # Calculate weight based on performance metrics
            metadata = agent.get("metadata", {})

            # Lower response time = higher weight
            response_time = metadata.get("average_response_time_ms", 1000)
            response_weight = max(0, 1000 - response_time) / 1000

            # Higher uptime = higher weight
            uptime = metadata.get("uptime_percentage", 95) / 100

            # Lower current load = higher weight
            current_load = self._get_agent_current_load(agent["name"])
            load_weight = max(0, 100 - current_load) / 100

            # Combined weight
            total_weight = (response_weight * 0.4 + uptime * 0.3 + load_weight * 0.3)
            weights.append(total_weight)

        # Weighted random selection
        return self._weighted_random_choice(agents, weights)

    def _get_agent_current_load(self, agent_name: str) -> float:
        """Get current load percentage for an agent."""
        # In a real implementation, this would query the agent's metrics endpoint
        # For this example, return a random load between 0-100
        return random.uniform(0, 100)

    def _weighted_random_choice(self, agents: List[Dict[str, Any]], weights: List[float]) -> Dict[str, Any]:
        """Select a random agent based on weights."""
        total_weight = sum(weights)
        if total_weight == 0:
            return random.choice(agents)

        r = random.uniform(0, total_weight)
        cumulative_weight = 0

        for agent, weight in zip(agents, weights):
            cumulative_weight += weight
            if r <= cumulative_weight:
                return agent

        return agents[-1]  # Fallback

    def record_agent_performance(self, agent_name: str, response_time_ms: int, success: bool):
        """Record performance metrics for an agent."""
        if agent_name not in self.agent_metrics:
            self.agent_metrics[agent_name] = {
                "response_times": [],
                "success_rate": []
            }

        metrics = self.agent_metrics[agent_name]
        metrics["response_times"].append(response_time_ms)
        metrics["success_rate"].append(success)

        # Keep only recent metrics (last 100 requests)
        metrics["response_times"] = metrics["response_times"][-100:]
        metrics["success_rate"] = metrics["success_rate"][-100:]

# Example usage
def load_balancing_example():
    discovery = AgentDiscoveryClient()
    lb_discovery = LoadBalancedDiscovery(discovery)

    # Simulate multiple requests for the same skill
    skill = "text_classification"

    print("Load-balanced agent selection:")
    for i in range(5):
        agent = lb_discovery.find_agent_with_load_balancing(
            skill, 
            "round_robin"
        )
        if agent:
            print(f"Request {i+1}: Selected {agent['name']}")

    print("\nPerformance-weighted selection:")
    for i in range(5):
        agent = lb_discovery.find_agent_with_load_balancing(
            skill,
            "performance_weighted"
        )
        if agent:
            print(f"Request {i+1}: Selected {agent['name']}")

if __name__ == "__main__":
    load_balancing_example()

Geographic Discovery

class GeographicDiscovery:
    def __init__(self, discovery_client: AgentDiscoveryClient):
        self.discovery = discovery_client

        # Region proximity mapping (simplified)
        self.region_proximity = {
            "us-east-1": ["us-east-2", "us-west-1", "us-west-2"],
            "us-west-1": ["us-west-2", "us-east-1", "us-east-2"],
            "eu-west-1": ["eu-central-1", "eu-north-1"],
            "ap-southeast-1": ["ap-southeast-2", "ap-northeast-1"]
        }

    def find_nearest_agents(self, 
                           required_skill: str,
                           user_region: str,
                           max_distance: int = 2) -> List[Dict[str, Any]]:
        """Find agents nearest to the user's region."""
        all_agents = self.discovery.find_agents_by_skill(required_skill)
        regional_agents = []

        for agent in all_agents:
            agent_regions = agent.get("metadata", {}).get("preferred_regions", [])

            for region in agent_regions:
                distance = self._calculate_region_distance(user_region, region)
                if distance <= max_distance:
                    agent["region_distance"] = distance
                    agent["matched_region"] = region
                    regional_agents.append(agent)
                    break  # Use the closest region for this agent

        # Sort by distance
        regional_agents.sort(key=lambda x: x["region_distance"])
        return regional_agents

    def _calculate_region_distance(self, region1: str, region2: str) -> int:
        """Calculate distance between regions (simplified)."""
        if region1 == region2:
            return 0

        if region2 in self.region_proximity.get(region1, []):
            return 1

        # Check if they're in the same continent
        continent1 = region1.split("-")[0]
        continent2 = region2.split("-")[0]

        if continent1 == continent2:
            return 2

        return 3  # Different continents

    def find_multi_region_deployment(self, 
                                   required_skill: str,
                                   target_regions: List[str]) -> Dict[str, List[Dict[str, Any]]]:
        """Find agents for multi-region deployment."""
        deployment_plan = {}

        for region in target_regions:
            nearest_agents = self.find_nearest_agents(required_skill, region, max_distance=1)
            deployment_plan[region] = nearest_agents

        return deployment_plan

# Example usage
def geographic_discovery_example():
    discovery = AgentDiscoveryClient()
    geo_discovery = GeographicDiscovery(discovery)

    # Find nearest translation agents to EU region
    agents = geo_discovery.find_nearest_agents(
        "translate_text",
        "eu-west-1",
        max_distance=2
    )

    print("Nearest translation agents to EU:")
    for agent in agents:
        print(f"{agent['name']} - Distance: {agent['region_distance']} - Region: {agent['matched_region']}")

    # Plan multi-region deployment
    target_regions = ["us-east-1", "eu-west-1", "ap-southeast-1"]
    deployment = geo_discovery.find_multi_region_deployment("image_analysis", target_regions)

    print("\nMulti-region deployment plan:")
    for region, agents in deployment.items():
        print(f"{region}: {len(agents)} available agents")

if __name__ == "__main__":
    geographic_discovery_example()

This comprehensive guide covers various agent discovery patterns, from basic search to complex multi-criteria discovery with load balancing and geographic considerations.