《AI Agent智能体开发实践 邓立国 邓淇文著 五大实战案例掌握AI Agent开发 LangChain示例 人工智能技术丛书 清华大学出版社》【摘要 书评 试读】- 京东图书

多Agent协同(Multi-Agent Collaboration,MAS)是指多个具备自主决策能力的智能体(Agent)通过通信、任务分配与协同机制,共同完成复杂目标的技术体系。其核心在于突破单个Agent的能力局限,通过群体智能实现更高阶的自动化。

3.3.1  角色分工:定义不同Agent的职能

1. 多Agent系统概述

多Agent系统是由多个自主或半自主的智能Agent组成的系统,这些Agent通过协作来完成单个Agent难以完成的复杂任务。在多Agent系统中,角色分工是关键,不同的Agent承担不同的职责,通过协同工作提高整体效率。

2. Agent角色分工

1)分析师(Analyst)Agent

职责:数据收集、处理、分析和模式识别。

特点:强大的数据处理和推理能力。

2)执行者(Executor)Agent

职责:执行具体任务和操作。

特点:高效的执行能力和操作接口。

3)协调者(Coordinator)Agent

职责:任务分配、资源调度和冲突解决。

特点:全局视角和决策能力。

4)接口(Interface)Agent

职责:与用户或其他系统交互。

特点:良好的交互能力和信息转换能力。

5)监控(Monitor)Agent

职责:系统状态监测和异常检测。

特点:实时性和预警能力。

3. Python实现案例

【示例3.7】实现一个包含分析师Agent和执行者Agent的简单股票分析系统。

#1. 基础架构
from abc import ABC, abstractmethod
from typing import Dict, Any
import random
import time

class Agent(ABC):
    """Agent抽象基类"""
    def __init__(self, name: str):
        self.name = name
        self.knowledge_base = {}
    
    @abstractmethod
    def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """执行任务并返回结果"""
        pass
    
    def communicate(self, message: str) -> str:
        """简单的通信方法"""
        print(f"{self.name} received: {message}")
        return f"{self.name} acknowledged: {message}"
#2. 分析师Agent实现
class AnalystAgent(Agent):
    """分析师Agent,负责数据分析"""
    def __init__(self, name: str):
        super().__init__(name)
        # 初始化分析工具或模型
        self.analysis_tools = ["趋势分析", "技术指标", "模式识别"]
    
    def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """执行分析任务"""
        print(f"{self.name} is analyzing data...")
        
        # 模拟分析过程
        data = task.get("data", {})
        analysis_type = task.get("analysis_type", "趋势分析")
        
        # 模拟分析耗时
        time.sleep(1)
        
        # 生成分析结果
        result = {
            "analysis_type": analysis_type,
            "result": f"{analysis_type}结果: {random.choice(['买入', '卖出', '持有'])}",
            "confidence": random.uniform(0.5, 1.0),
            "factors_considered": random.sample(["价格", "成交量", "市场情绪", "宏观经济"], 2)
        }
        
        print(f"{self.name} completed analysis with result: {result}")
        return result
    
    def train_model(self, dataset):
        """模拟模型训练方法"""
        print(f"{self.name} is training model with {len(dataset)} samples...")
        time.sleep(2)
        return {"status": "success", "accuracy": random.uniform(0.7, 0.95)}
#3. 执行者Agent实现
class ExecutorAgent(Agent):
    """执行者Agent,负责执行交易操作"""
    def __init__(self, name: str):
        super().__init__(name)
        self.available_actions = ["买入", "卖出", "调整仓位", "取消订单"]
    
    def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """执行交易任务"""
        print(f"{self.name} is executing trade...")
        
        action = task.get("action", "买入")
        symbol = task.get("symbol", "UNKNOWN")
        quantity = task.get("quantity", 0)
        
        # 验证操作是否可用
        if action not in self.available_actions:
            return {"status": "error", "message": f"Unsupported action: {action}"}
        
        # 模拟执行耗时
        time.sleep(0.5)
        
        # 生成执行结果
        result = {
            "status": "success",
            "action": action,
            "symbol": symbol,
            "quantity": quantity,
            "execution_price": round(random.uniform(100, 200), 2),
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
        }
        
        print(f"{self.name} executed {action} {quantity} shares of {symbol}")
        return result
    
    def check_balance(self):
        """检查账户余额"""
        return {
            "cash": round(random.uniform(10000, 50000), 2),
            "positions": {f"STOCK_{i}": random.randint(0, 100) for i in range(1, 4)}
        }
#4. 协调者Agent实现
class CoordinatorAgent(Agent):
    """协调者Agent,负责任务分配和协调"""
    def __init__(self, name: str):
        super().__init__(name)
        self.registered_agents = {}
    
    def register_agent(self, agent: Agent):
        """注册Agent到系统中"""
        self.registered_agents[agent.name] = agent
        print(f"Agent {agent.name} registered with coordinator {self.name}")
    
    def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """分配任务给合适的Agent"""
        task_type = task.get("type", "")
        
        if "analysis" in task_type.lower():
            agent = self.registered_agents.get("Analyst")
        elif "execute" in task_type.lower():
            agent = self.registered_agents.get("Executor")
        else:
            return {"status": "error", "message": "Unknown task type"}
        
        if not agent:
            return {"status": "error", "message": "No suitable agent available"}
        
        print(f"{self.name} is delegating task to {agent.name}")
        return agent.perform_task(task)
    
    def broadcast(self, message: str):
        """广播消息给所有注册的Agent"""
        responses = {}
        for name, agent in self.registered_agents.items():
            responses[name] = agent.communicate(message)
        return responses
#5. 系统集成与运行示例
def main():
    # 创建Agent实例
    coordinator = CoordinatorAgent("Coordinator")
    analyst = AnalystAgent("Analyst")
    executor = ExecutorAgent("Executor")
    
    # 注册Agent
    coordinator.register_agent(analyst)
    coordinator.register_agent(executor)
    
    # 模拟分析任务
    print("\n=== Running Analysis Task ===")
    analysis_task = {
        "type": "stock_analysis",
        "data": {"symbol": "AAPL", "history": "1Y"},
        "analysis_type": "技术指标"
    }
    analysis_result = coordinator.perform_task(analysis_task)
    print("Analysis Result:", analysis_result)
    
    # 模拟执行任务
    print("\n=== Running Execution Task ===")
    execution_task = {
        "type": "execute_trade",
        "action": "买入",
        "symbol": "AAPL",
        "quantity": 100,
        "analysis_reference": analysis_result
    }
    execution_result = coordinator.perform_task(execution_task)
    print("Execution Result:", execution_result)
    
    # 测试广播通信
    print("\n=== Testing Broadcast Communication ===")
    broadcast_responses = coordinator.broadcast("System check at " + time.strftime("%H:%M:%S"))
    for agent, response in broadcast_responses.items():
        print(f"{agent}: {response}")

    # 检查执行者状态
    print("\n=== Checking Executor Status ===")
    print("Account Balance:", executor.check_balance())

if __name__ == "__main__":
    main()

运行代码,输出如下:

Agent Analyst registered with coordinator Coordinator
Agent Executor registered with coordinator Coordinator

=== Running Analysis Task ===
Coordinator is delegating task to Analyst
Analyst is analyzing data...
Analyst completed analysis with result: {'analysis_type': '技术指标', 'result': '技术指标结果: 买入', 'confidence': 0.878509121476005, 'factors_considered': ['价格', '成交量']}
Analysis Result: {'analysis_type': '技术指标', 'result': '技术指标结果: 买入', 'confidence': 0.878509121476005, 'factors_considered': ['价格', '成交量']}

=== Running Execution Task ===
Coordinator is delegating task to Executor
Executor is executing trade...
Executor executed 买入 100 shares of AAPL
Execution Result: {'status': 'success', 'action': '买入', 'symbol': 'AAPL', 'quantity': 100, 'execution_price': 109.74, 'timestamp': '2025-07-03 16:37:13'}

=== Testing Broadcast Communication ===
Analyst received: System check at 16:37:13
Executor received: System check at 16:37:13
Analyst: Analyst acknowledged: System check at 16:37:13
Executor: Executor acknowledged: System check at 16:37:13

=== Checking Executor Status ===
Account Balance: {'cash': 19739.02, 'positions': {'STOCK_1': 88, 'STOCK_2': 94, 'STOCK_3': 54}}

这个框架提供了多Agent系统的基本结构和实现示例,可以根据具体应用场景进行扩展和优化。

3.3.2  通信协议:基于自然语言或结构化消息

多Agent协同通信协议是实现多个智能体之间高效协作的关键技术,可以基于自然语言或结构化消息进行通信。

1. 自然语言处理

自然语言通信依赖于强大的自然语言处理(NLP)技术,包括语言理解、语义解析和语言生成。例如,使用Transformer架构的模型(如GPT)可以处理复杂的语言交互。

【示例3.8】基于DeepSeek自然语言的Agent通信示例,使用Transformers库实现语言生成和理解。

此示例在运行过程中,经常遇到无法连接到Hugging Face的模型仓库的错误,这是由于网络问题或访问限制引起的,以下是几种解决方案:

  • 检查网络连接,确保能访问Hugging Face网站。
  • 如果在国内,可以使用国内镜像站点。

提前手动下载模型到本地,然后从本地加载模型。

from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
import os

# -----------------------------
# 配置:选择模型加载方式
# -----------------------------
USE_MIRROR = True  # 是否使用国内镜像
LOCAL_MODEL_PATH = "./local_deepseek_model"  # 本地模型路径,如果存在,则从本地加载
LOCAL_CLASSIFIER_PATH = "./local_classifier_model"  # 本地分类器模型路径

# 国内镜像设置
if USE_MIRROR:
    os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"

# -----------------------------
# 1. 加载 DeepSeek 模型和 tokenizer
# -----------------------------

model_name = "deepseek-ai/deepseek-llm-1.3b-chat"

# 检查本地是否有模型
if os.path.exists(LOCAL_MODEL_PATH):
    print(f"从本地加载模型: {LOCAL_MODEL_PATH}")
    model_path = LOCAL_MODEL_PATH
else:
    model_path = model_name

try:
    tokenizer = AutoTokenizer.from_pretrained(
        model_path, 
        trust_remote_code=True,
        local_files_only=os.path.exists(LOCAL_MODEL_PATH)  # 如果是本地路径,则只检查本地文件
    )
    model = AutoModelForCausalLM.from_pretrained(
        model_path,
        torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
        device_map="auto" if torch.cuda.is_available() else None,
        trust_remote_code=True,
        local_files_only=os.path.exists(LOCAL_MODEL_PATH)
    )
except Exception as e:
    print(f"模型加载失败: {e}")
    print("尝试使用更小的模型或检查网络连接")
    # 可以在这里提供备选模型
    exit(1)

# 生成管道(用于非流式生成)
generator = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    device=0 if torch.cuda.is_available() else -1
)

# -----------------------------
# 2. 意图理解
# -----------------------------

classifier_name = "uer/roberta-base-finetuned-dianping-chinese"

# 检查本地是否有分类器模型
if os.path.exists(LOCAL_CLASSIFIER_PATH):
    print(f"从本地加载分类器模型: {LOCAL_CLASSIFIER_PATH}")
    classifier_path = LOCAL_CLASSIFIER_PATH
else:
    classifier_path = classifier_name

try:
    intent_classifier = pipeline(
        "text-classification",
        model=classifier_path,
        tokenizer=classifier_path,
        local_files_only=os.path.exists(LOCAL_CLASSIFIER_PATH)
    )
except Exception as e:
    print(f"分类器模型加载失败: {e}")
    print("将仅使用关键词意图识别")

# 手动关键词意图识别(中文)
def detect_intent(text):
    if any(w in text for w in ["你好", "嗨", "早上好", "下午好"]):
        return "问候"
    elif any(w in text for w in ["帮助", "帮忙", "怎么做", "如何"]):
        return "请求帮助"
    elif any(w in text for w in ["是", "对", "没错", "提供", "信息", "有"]):
        return "提供信息"
    elif any(w in text for w in ["再见", "拜拜", "结束", "退出"]):
        return "告别"
    else:
        return "其他"

# -----------------------------
# 3. 定义支持 DeepSeek 格式的 Agent
# -----------------------------

class DeepSeekAgent:
    def __init__(self, name, model, tokenizer):
        self.name = name
        self.model = model
        self.tokenizer = tokenizer
        self.history = []  # 存储对话历史

    def send(self, message):
        print(f"\033[92m{self.name}: {message}\033[0m")  # 绿色输出
        return message

    def receive(self, message):
        print(f"\033[94m{self.name} 收到: {message}\033[0m")  # 蓝色

        # 意图识别
        intent = detect_intent(message)
        print(f"  🔍 意图识别: {intent}")

        # 构造 DeepSeek 的对话输入格式
        prompt_lines = []
        # 添加历史
        for i in range(0, len(self.history), 2):
            if i + 1 < len(self.history):
                prompt_lines.append(f"<|User|>:{self.history[i]}<|Assistant|>: {self.history[i+1]}")
            else:
                prompt_lines.append(f"<|User|>:{self.history[i]}")
        # 添加当前消息
        prompt_lines.append(f"<|User|>:{message}<|Assistant|>:")

        full_prompt = "\n".join(prompt_lines)

        try:
            # Tokenize 并生成
            inputs = self.tokenizer(full_prompt, return_tensors="pt").to(self.model.device)
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=128,
                    temperature=0.7,
                    top_p=0.9,
                    do_sample=True,
                    pad_token_id=self.tokenizer.eos_token_id,
                    eos_token_id=self.tokenizer.eos_token_id
                )
            # 解码回复
            response = self.tokenizer.decode(outputs[0][inputs['input_ids'].shape[-1]:], skip_special_tokens=True)

            # 清理输出
            reply = response.strip().split("\n")[0]
            reply = reply.replace("<|Assistant|>:", "").strip()

            # 更新历史
            self.history.append(message)
            self.history.append(reply)

            return reply
        except Exception as e:
            print(f"生成回复时出错: {e}")
            return "抱歉,我无法生成回复。"

# -----------------------------
# 4. 创建两个 Agent
# -----------------------------

agent_a = DeepSeekAgent("AgentA", model, tokenizer)
agent_b = DeepSeekAgent("AgentB", model, tokenizer)

# -----------------------------
# 5. 开始多轮对话
# -----------------------------

print("🗣 基于 DeepSeek 的 Agent 自然语言通信启动...\n")

# AgentA 发起
msg = "你好,我是AgentA,我们可以一起讨论一个合作计划吗?"
msg = agent_a.send(msg)

try:
    for i in range(4):  # 4 轮来回
        print("-" * 50)
        reply = agent_b.receive(msg)
        msg = agent_b.send(reply)

        print("-" * 50)
        reply = agent_a.receive(msg)
        msg = agent_a.send(reply)
except KeyboardInterrupt:
    print("\n对话已手动终止")
except Exception as e:
    print(f"对话过程中出错: {e}")

运行代码,输出如下:

基于 DeepSeek 的 Agent 自然语言通信启动...

AgentA: 你好,我是AgentA,我们可以一起讨论一个合作计划吗?
--------------------------------------------------
AgentB 收到: 你好,我是AgentA,我们可以一起讨论一个合作计划吗?
    意图识别: 问候
AgentB: 当然可以,我很乐意与您合作。请告诉我您有什么计划?
--------------------------------------------------
AgentA 收到: 当然可以,我很乐意与您合作。请告诉我您有什么计划?
    意图识别: 提供信息
AgentA: 我想开发一个智能助手,能自动安排日程和发送邮件。
...

2. 基于结构化消息的通信协议

结构化消息使用预定义的格式和字段,便于机器解析和处理,提高通信效率。

常见的结构化消息格式包括:

  • FIPA ACL:Foundation for Intelligent Physical Agents的标准。
  • JSON-based:自定义JSON格式。
  • Protocol Buffers:Google的高效二进制协议。

【示例3.9】一个基于JSON格式的结构化消息通信示例。

import json

# 定义结构化消息格式
class Message:
    def __init__(self, sender, receiver, msg_type, content):
        self.sender = sender
        self.receiver = receiver
        self.msg_type = msg_type
        self.content = content

    def to_json(self):
        return json.dumps(self.__dict__)

# Agent A 发送结构化消息
agent_a_msg = Message(sender="Agent A", receiver="Agent B", msg_type="REQUEST", content="计算销售数据")
msg_json = agent_a_msg.to_json()
print("Agent A 发送:", msg_json)

# Agent B 接收并解析消息
agent_b_msg = json.loads(msg_json)
print("Agent B 接收:", agent_b_msg)

# Agent B 回复结构化消息
agent_b_response = Message(sender="Agent B", receiver="Agent A", msg_type="RESPONSE", content="销售数据已计算完成")
response_json = agent_b_response.to_json()
print("Agent B 回复:", response_json)

运行代码,输出如下:

Agent A 发送: {"sender": "Agent A", "receiver": "Agent B", "msg_type": "REQUEST", "content": "\u8ba1\u7b97\u9500\u552e\u6570\u636e"}
Agent B 接收: {'sender': 'Agent A', 'receiver': 'Agent B', 'msg_type': 'REQUEST', 'content': '计算销售数据'}
Agent B 回复: {"sender": "Agent B", "receiver": "Agent A", "msg_type": "RESPONSE", "content": "\u9500\u552e\u6570\u636e\u5df2\u8ba1\u7b97\u5b8c\u6210"}

3. 实际应用场景

(1)智能工厂:在智能工厂中,多个机器人和设备需要高效协同工作。结构化消息协议(如FIPA-ACL)被广泛用于任务分配、冲突解决和状态更新。

(2)智能助手:在多智能助手场景中,自然语言通信可用于初次交互和复杂任务的协商,而结构化消息则用于后续的高效执行。

自然语言通信适用于灵活性和适应性要求较高的场景,但计算成本较高。结构化消息通信适用于效率和可扩展性要求较高的场景,但需要预定义协议。在实际应用中,可以根据需求结合使用这两种通信方式。

3.3.3  竞争协调:拍卖机制或投票系统

1. 竞争协调的核心价值

在多智能体系统(MAS)中,当智能体目标冲突或资源有限时,需通过‌竞争协调机制‌实现高效决策。该机制需满足:

  • 公平性:避免单一智能体垄断资源。
  • 效率性:最小化协调成本,最大化系统收益。
  • 稳定性:抑制策略性操纵行为。

2. 拍卖机制:资源竞争的定价策略

1)核心原理

通过‌竞价博弈‌分配稀缺资源,以出价作为竞争信号。其运作遵循:

  • 价高者得:原则实现帕累托最优分配。
  • 显式规则:(如密封投标、公开叫价)决定最终归属。
  • 支付机制:设计(如首价/次价支付)影响竞拍策略。

2)拍卖机制实现

【示例3.10】拍卖机制常用于资源分配和任务分配场景。下面实现一个简单的英式拍卖(公开增价拍卖)。

import random
from typing import List, Dict

class EnglishAuction:
    def __init__(self, items: List[str], agents: List[str], initial_prices: Dict[str, float]):
        """
        初始化英式拍卖
        :param items: 拍卖物品列表
        :param agents: 参与拍卖的智能体列表
        :param initial_prices: 各物品的初始价格
        """
        self.items = items
        self.agents = agents
        self.current_prices = initial_prices.copy()
        self.winners = {item: None for item in items}
        self.bids = {item: [] for item in items}
        
    def agent_bid(self, agent: str, item: str, bid_price: float) -> bool:
        """
        智能体出价
        :param agent: 出价智能体
        :param item: 拍卖物品
        :param bid_price: 出价金额
        :return: 是否出价成功
        """
        if bid_price > self.current_prices[item]:
            self.current_prices[item] = bid_price
            self.bids[item].append((agent, bid_price))
            return True
        return False
    
    def close_auction(self) -> Dict[str, str]:
        """
        结束拍卖,确定获胜者
        :return: 各物品的获胜者
        """
        for item in self.items:
            if self.bids[item]:
                # 最高出价者获胜
                self.winners[item] = max(self.bids[item], key=lambda x: x[1])[0]
        return self.winners
    
    def simulate_auction(self, max_rounds: int = 10):
        """
        模拟拍卖过程
        :param max_rounds: 最大轮数
        """
        for _ in range(max_rounds):
            for agent in self.agents:
                for item in self.items:
                    # 模拟智能体随机出价
                    if random.random() > 0.7:  # 30%概率出价
                        current_price = self.current_prices[item]
                        bid_price = current_price * (1 + random.uniform(0.1, 0.5))
                        self.agent_bid(agent, item, bid_price)
            print(f"Round {_+1} current prices: {self.current_prices}")
        
        self.close_auction()
        print("Auction results:", self.winners)

# 使用示例
if __name__ == "__main__":
    items = ["task1", "task2", "resource1"]
    agents = ["agent1", "agent2", "agent3", "agent4"]
    initial_prices = {"task1": 10.0, "task2": 15.0, "resource1": 20.0}
    
    auction = EnglishAuction(items, agents, initial_prices)
    auction.simulate_auction()

运行代码,输出如下:

Round 1 current prices: {'task1': 10.0, 'task2': 15.0, 'resource1': 27.624252137343362}
Round 2 current prices: {'task1': 10.0, 'task2': 15.0, 'resource1': 27.624252137343362}
Round 3 current prices: {'task1': 14.150423418969961, 'task2': 25.598419655339914, 'resource1': 48.43936171930753}
Round 4 current prices: {'task1': 14.150423418969961, 'task2': 34.189396186994834, 'resource1': 84.86987307269457}
Round 5 current prices: {'task1': 14.150423418969961, 'task2': 44.89570715854789, 'resource1': 102.4059754336159}
Round 6 current prices: {'task1': 14.150423418969961, 'task2': 55.70639803093716, 'resource1': 265.4076490101352}
Round 7 current prices: {'task1': 16.165556762263797, 'task2': 81.8150489398106, 'resource1': 454.164529972914}
Round 8 current prices: {'task1': 17.843491163327904, 'task2': 81.8150489398106, 'resource1': 664.0285634582275}
Round 9 current prices: {'task1': 25.410115323935496, 'task2': 81.8150489398106, 'resource1': 776.2515227325367}
Round 10 current prices: {'task1': 72.47785125080446, 'task2': 158.02764874120223, 'resource1': 1088.1494800815683}
Auction results: {'task1': 'agent4', 'task2': 'agent4', 'resource1': 'agent2'}

【示例3.11】投票系统实现,投票系统常用于集体决策场景。下面实现一个多数投票和波达计数(Borda Count)投票系统。

from typing import List, Dict
from collections import defaultdict

class VotingSystem:
    def __init__(self, candidates: List[str], voters: List[str]):
        """
        初始化投票系统
        :param candidates: 候选选项
        :param voters: 投票者列表
        """
        self.candidates = candidates
        self.voters = voters
        self.ballots = []
        
    def submit_ballot(self, voter: str, ranking: List[str]) -> bool:
        """
        提交投票
        :param voter: 投票者
        :param ranking: 投票者的偏好排序
        :return: 是否提交成功
        """
        if voter not in self.voters:
            return False
        if set(ranking) != set(self.candidates):
            return False
            
        self.ballots.append((voter, ranking))
        return True
    
    def majority_vote(self) -> str:
        """
        多数投票制
        :return: 获胜者
        """
        if not self.ballots:
            return None
            
        first_choices = [ballot[1][0] for ballot in self.ballots]
        vote_counts = defaultdict(int)
        for candidate in first_choices:
            vote_counts[candidate] += 1

        winner = max(vote_counts.items(), key=lambda x: x[1])[0]
        return winner

    def borda_count(self) -> str:
        """
        波达计数法
        :return: 获胜者
        """
        if not self.ballots:
            return None

        candidate_scores = defaultdict(int)
        num_candidates = len(self.candidates)

        for _, ranking in self.ballots:
            for position, candidate in enumerate(ranking):
                # 波达计数:第一名得n-1分,第二名得n-2分,…,最后一名得0分
                candidate_scores[candidate] += (num_candidates - position - 1)

        winner = max(candidate_scores.items(), key=lambda x: x[1])[0]
        return winner

    def simulate_voting(self):
        """
        模拟投票过程
        """
        # 模拟投票者提交偏好排序
        for voter in self.voters:
            # 随机生成偏好排序
            ranking = self.candidates.copy()
            random.shuffle(ranking)
            self.submit_ballot(voter, ranking)
            print(f"{voter} votes: {ranking}")

        # 多数投票结果
        majority_winner = self.majority_vote()
        print(f"Majority vote winner: {majority_winner}")

        # 波达计数结果
        borda_winner = self.borda_count()
        print(f"Borda count winner: {borda_winner}")

# 使用示例
if __name__ == "__main__":
    import random

    candidates = ["OptionA", "OptionB", "OptionC", "OptionD"]
    voters = ["Agent1", "Agent2", "Agent3", "Agent4", "Agent5"]

    voting_system = VotingSystem(candidates, voters)
    voting_system.simulate_voting()

运行代码,输出如下:

Agent1 votes: ['OptionA', 'OptionC', 'OptionB', 'OptionD']
Agent2 votes: ['OptionB', 'OptionC', 'OptionD', 'OptionA']
Agent3 votes: ['OptionD', 'OptionB', 'OptionC', 'OptionA']
Agent4 votes: ['OptionC', 'OptionA', 'OptionB', 'OptionD']
Agent5 votes: ['OptionD', 'OptionC', 'OptionB', 'OptionA']
Majority vote winner: OptionD
Borda count winner: OptionC

【示例3.12】智能体协调框架,结合拍卖和投票的智能体协调框架示例。

import random
from typing import List, Dict
from collections import defaultdict

class EnglishAuction:
    def __init__(self, items: List[str], agents: List[str], initial_prices: Dict[str, float]):
        """
        初始化英式拍卖
        :param items: 拍卖物品列表
        :param agents: 参与拍卖的智能体列表
        :param initial_prices: 各物品的初始价格
        """
        self.items = items
        self.agents = agents
        self.current_prices = initial_prices.copy()
        self.winners = {item: None for item in items}
        self.bids = {item: [] for item in items}
        
    def agent_bid(self, agent: str, item: str, bid_price: float) -> bool:
        """
        智能体出价
        :param agent: 出价智能体
        :param item: 拍卖物品
        :param bid_price: 出价金额
        :return: 是否出价成功
        """
        if bid_price > self.current_prices[item]:
            self.current_prices[item] = bid_price
            self.bids[item].append((agent, bid_price))
            return True
        return False
    
    def close_auction(self) -> Dict[str, str]:
        """
        结束拍卖,确定获胜者
        :return: 各物品的获胜者
        """
        for item in self.items:
            if self.bids[item]:
                # 最高出价者获胜
                self.winners[item] = max(self.bids[item], key=lambda x: x[1])[0]
        return self.winners
    
    def simulate_auction(self, max_rounds: int = 10):
        """
        模拟拍卖过程
        :param max_rounds: 最大轮数
        """
        for round_num in range(max_rounds):
            for agent in self.agents:
                for item in self.items:
                    # 模拟智能体随机出价
                    if random.random() > 0.7:  # 30%概率出价
                        current_price = self.current_prices[item]
                        bid_price = current_price * (1 + random.uniform(0.1, 0.5))
                        self.agent_bid(agent, item, bid_price)
            print(f"Round {round_num+1} current prices: {self.current_prices}")
        
        self.close_auction()
        print("Auction results:", self.winners)
        return self.winners

class VotingSystem:
    def __init__(self, candidates: List[str], voters: List[str]):
        """
        初始化投票系统
        :param candidates: 候选选项
        :param voters: 投票者列表
        """
        self.candidates = candidates
        self.voters = voters
        self.ballots = []
        
    def submit_ballot(self, voter: str, ranking: List[str]) -> bool:
        """
        提交投票
        :param voter: 投票者
        :param ranking: 投票者的偏好排序
        :return: 是否提交成功
        """
        if voter not in self.voters:
            return False
        if set(ranking) != set(self.candidates):
            return False
            
        self.ballots.append((voter, ranking))
        return True
    
    def majority_vote(self) -> str:
        """
        多数投票制
        :return: 获胜者
        """
        if not self.ballots:
            return None
            
        first_choices = [ballot[1][0] for ballot in self.ballots]
        vote_counts = defaultdict(int)
        for candidate in first_choices:
            vote_counts[candidate] += 1
            
        winner = max(vote_counts.items(), key=lambda x: x[1])[0]
        return winner
    
    def borda_count(self) -> str:
        """
        波达计数法
        :return: 获胜者
        """
        if not self.ballots:
            return None
            
        candidate_scores = defaultdict(int)
        num_candidates = len(self.candidates)
        
        for _, ranking in self.ballots:
            for position, candidate in enumerate(ranking):
                # 波达计数:第一名得n-1分,第二名得n-2分,…,最后一名得0分
                candidate_scores[candidate] += (num_candidates - position - 1)
                
        winner = max(candidate_scores.items(), key=lambda x: x[1])[0]
        return winner
    
    def simulate_voting(self):
        """
        模拟投票过程
        """
        # 模拟投票者提交偏好排序
        for voter in self.voters:
            # 随机生成偏好排序
            ranking = self.candidates.copy()
            random.shuffle(ranking)
            self.submit_ballot(voter, ranking)
            print(f"{voter} votes: {ranking}")
        
        # 多数投票结果
        majority_winner = self.majority_vote()
        print(f"Majority vote winner: {majority_winner}")
        
        # 波达计数结果
        borda_winner = self.borda_count()
        print(f"Borda count winner: {borda_winner}")
        
        return majority_winner, borda_winner

class Agent:
    def __init__(self, name, resources, preferences):
        self.name = name
        self.resources = resources
        self.preferences = preferences  # {item: value}
    
    def bid_strategy(self, auction_type, item, current_bid):
        """根据拍卖类型制定出价策略"""
        if auction_type == "english":
            # 英式拍卖策略:出价比当前最高价高,但不超过自己的估值
            my_value = self.preferences.get(item, 0)
            if current_bid < my_value:
                return min(current_bid + 5, my_value)  # 简单增量策略
            return 0
        elif auction_type == "vickrey":
            # Vickrey拍卖策略:出价等于真实估值
            return self.preferences.get(item, 0)
        return 0
    
    def vote_strategy(self, voting_system, alternatives):
        """投票策略"""
        if isinstance(voting_system, MajorityVoting):
            # 多数投票:选择最偏好的选项
            return max(alternatives, key=lambda x: self.preferences.get(x, 0))
        elif isinstance(voting_system, BordaVoting):
            # Borda投票:按偏好排序
            return sorted(alternatives, key=lambda x: self.preferences.get(x, 0), reverse=True)
        return None

class MajorityVoting:
    def __init__(self, alternatives):
        """
        多数投票系统
        :param alternatives: 可选方案列表
        """
        self.alternatives = alternatives
        self.votes = {alt: 0 for alt in alternatives}
        self.voters = set()

    def cast_vote(self, voter, alternative):
        """投票"""
        if alternative not in self.alternatives:
            raise ValueError("Invalid alternative")

        self.votes[alternative] += 1
        self.voters.add(voter)

    def get_winner(self):
        """获取获胜方案"""
        if not self.voters:
            return None

        return max(self.votes.items(), key=lambda x: x[1])[0]

    def get_vote_distribution(self):
        """获取投票分布"""
        return self.votes

class BordaVoting:
    def __init__(self, alternatives):
        """
        Borda投票系统
        :param alternatives: 可选方案列表
        """
        self.alternatives = alternatives
        self.rankings = []
        self.voters = set()

    def submit_ranking(self, voter, ranking):
        """提交排序"""
        if set(ranking) != set(self.alternatives):
            raise ValueError("Invalid ranking")

        self.rankings.append((voter, ranking))
        self.voters.add(voter)

    def get_winner(self):
        """获取获胜方案"""
        if not self.voters:
            return None
            
        candidate_scores = defaultdict(int)
        num_candidates = len(self.alternatives)
        
        for _, ranking in self.rankings:
            for position, candidate in enumerate(ranking):
                candidate_scores[candidate] += (num_candidates - position - 1)
                
        return max(candidate_scores.items(), key=lambda x: x[1])[0]

class VickreyAuction:
    def __init__(self, items):
        self.items = items
        self.bids = {item: [] for item in items}
        self.winners = {item: None for item in items}
        self.winning_prices = {item: 0 for item in items}

    def submit_bid(self, item, agent, bid):
        self.bids[item].append((agent, bid))

    def determine_winner(self, item):
        if not self.bids[item]:
            return
            
        sorted_bids = sorted(self.bids[item], key=lambda x: x[1], reverse=True)
        if len(sorted_bids) >= 2:
            self.winners[item] = sorted_bids[0][0]
            self.winning_prices[item] = sorted_bids[1][1]
        elif len(sorted_bids) == 1:
            self.winners[item] = sorted_bids[0][0]
            self.winning_prices[item] = sorted_bids[0][1]

    def get_status(self, item):
        return {
            'winner': self.winners[item],
            'winning_price': self.winning_prices[item]
        }

class CoordinationFramework:
    def __init__(self, agents, items, voting_alternatives):
        self.agents = agents
        self.items = items
        self.voting_alternatives = voting_alternatives
    
    def run_auction(self, auction_type):
        """运行拍卖"""
        if auction_type == "english":
            initial_prices = {item: 0 for item in self.items}
            auction = EnglishAuction(self.items, [agent.name for agent in self.agents], initial_prices)
            
            for round_num in range(10):  # 进行10轮拍卖
                for agent in self.agents:
                    for item in self.items:
                        current_price = auction.current_prices[item]
                        bid = agent.bid_strategy("english", item, current_price)
                        if bid > 0:
                            auction.agent_bid(agent.name, item, bid)
                
                print(f"Round {round_num+1} current prices: {auction.current_prices}")
            
            auction.close_auction()
            return auction
        elif auction_type == "vickrey":
            auction = VickreyAuction(self.items)
            for item in self.items:
                for agent in self.agents:
                    bid = agent.bid_strategy("vickrey", item, 0)
                    auction.submit_bid(item, agent.name, bid)
                auction.determine_winner(item)
            return auction
        return None
    
    def run_voting(self, voting_type):
        """运行投票"""
        if voting_type == "majority":
            voting = MajorityVoting(self.voting_alternatives)
            for agent in self.agents:
                vote = agent.vote_strategy(voting, self.voting_alternatives)
                voting.cast_vote(agent.name, vote)
            return voting
        elif voting_type == "borda":
            voting = BordaVoting(self.voting_alternatives)
            for agent in self.agents:
                ranking = agent.vote_strategy(voting, self.voting_alternatives)
                voting.submit_ranking(agent.name, ranking)
            return voting
        return None

# 使用示例
if __name__ == "__main__":
    # 创建智能体
    agents = [
        Agent("agent1", 200, {"painting": 150, "book": 50, "optionA": 10, "optionB": 5}),
        Agent("agent2", 300, {"painting": 100, "book": 80, "optionA": 5, "optionB": 8}),
        Agent("agent3", 250, {"painting": 120, "book": 70, "optionA": 7, "optionB": 6})
    ]

    # 创建协调框架
    framework = CoordinationFramework(
        agents=agents,
        items=["painting", "book"],
        voting_alternatives=["optionA", "optionB"]
    )

    # 运行英式拍卖
    print("=== English Auction ===")
    english_result = framework.run_auction("english")
    print("English Auction Results:")
    for item in english_result.items:
        print(f"{item}: {english_result.winners[item]}")

    # 运行多数投票
    print("\n=== Majority Voting ===")
    majority_result = framework.run_voting("majority")
    print("Majority Voting Results:")
    print("Winner:", majority_result.get_winner())
    print("Distribution:", majority_result.get_vote_distribution())

    # 运行Borda投票
    print("\n=== Borda Voting ===")
    borda_result = framework.run_voting("borda")
    print("Borda Voting Results:")
    print("Winner:", borda_result.get_winner())

    # 运行Vickrey拍卖
    print("\n=== Vickrey Auction ===")
    vickrey_result = framework.run_auction("vickrey")
    print("Vickrey Auction Results:")
    for item in vickrey_result.items:
        status = vickrey_result.get_status(item)
        print(f"{item}: Winner={status['winner']}, Price={status['winning_price']}")

运行代码,输出如下:

=== English Auction ===
Round 1 current prices: {'painting': 15, 'book': 15}
Round 2 current prices: {'painting': 30, 'book': 30}
Round 3 current prices: {'painting': 45, 'book': 45}
Round 4 current prices: {'painting': 60, 'book': 60}
Round 5 current prices: {'painting': 75, 'book': 70}
Round 6 current prices: {'painting': 90, 'book': 75}
Round 7 current prices: {'painting': 105, 'book': 80}
Round 8 current prices: {'painting': 115, 'book': 80}
Round 9 current prices: {'painting': 120, 'book': 80}
Round 10 current prices: {'painting': 125, 'book': 80}
English Auction Results:
painting: agent1
book: agent2

=== Majority Voting ===
Majority Voting Results:
Winner: optionA
Distribution: {'optionA': 2, 'optionB': 1}

=== Borda Voting ===
Borda Voting Results:
Winner: optionA

=== Vickrey Auction ===
Vickrey Auction Results:
painting: Winner=agent1, Price=120
book: Winner=agent2, Price=70

以上代码实现了AI智能体竞争协调中的两种主要机制:拍卖机制和投票系统。这两种机制可以用于多智能体系统中的资源分配、决策制定等协调问题。你可以根据具体需求扩展这些基本实现,例如添加更复杂的出价策略、考虑预算约束、实现组合拍卖等。

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐