智能体的多Agent协同
多Agent系统是由多个自主或半自主的智能Agent组成的系统,这些Agent通过协作来完成单个Agent难以完成的复杂任务。在多Agent系统中,角色分工是关键,不同的Agent承担不同的职责,通过协同工作提高整体效率。
《AI Agent智能体开发实践 邓立国 邓淇文著 五大实战案例掌握AI Agent开发 LangChain示例 人工智能技术丛书 清华大学出版社》【摘要 书评 试读】- 京东图书
多Agent协同(Multi-Agent Collaboration,MAS)是指多个具备自主决策能力的智能体(Agent)通过通信、任务分配与协同机制,共同完成复杂目标的技术体系。其核心在于突破单个Agent的能力局限,通过群体智能实现更高阶的自动化。
3.3.1 角色分工:定义不同Agent的职能
1. 多Agent系统概述
多Agent系统是由多个自主或半自主的智能Agent组成的系统,这些Agent通过协作来完成单个Agent难以完成的复杂任务。在多Agent系统中,角色分工是关键,不同的Agent承担不同的职责,通过协同工作提高整体效率。
2. Agent角色分工
1)分析师(Analyst)Agent
职责:数据收集、处理、分析和模式识别。
特点:强大的数据处理和推理能力。
2)执行者(Executor)Agent
职责:执行具体任务和操作。
特点:高效的执行能力和操作接口。
3)协调者(Coordinator)Agent
职责:任务分配、资源调度和冲突解决。
特点:全局视角和决策能力。
4)接口(Interface)Agent
职责:与用户或其他系统交互。
特点:良好的交互能力和信息转换能力。
5)监控(Monitor)Agent
职责:系统状态监测和异常检测。
特点:实时性和预警能力。
3. Python实现案例
【示例3.7】实现一个包含分析师Agent和执行者Agent的简单股票分析系统。
#1. 基础架构
from abc import ABC, abstractmethod
from typing import Dict, Any
import random
import time
class Agent(ABC):
"""Agent抽象基类"""
def __init__(self, name: str):
self.name = name
self.knowledge_base = {}
@abstractmethod
def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行任务并返回结果"""
pass
def communicate(self, message: str) -> str:
"""简单的通信方法"""
print(f"{self.name} received: {message}")
return f"{self.name} acknowledged: {message}"
#2. 分析师Agent实现
class AnalystAgent(Agent):
"""分析师Agent,负责数据分析"""
def __init__(self, name: str):
super().__init__(name)
# 初始化分析工具或模型
self.analysis_tools = ["趋势分析", "技术指标", "模式识别"]
def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行分析任务"""
print(f"{self.name} is analyzing data...")
# 模拟分析过程
data = task.get("data", {})
analysis_type = task.get("analysis_type", "趋势分析")
# 模拟分析耗时
time.sleep(1)
# 生成分析结果
result = {
"analysis_type": analysis_type,
"result": f"{analysis_type}结果: {random.choice(['买入', '卖出', '持有'])}",
"confidence": random.uniform(0.5, 1.0),
"factors_considered": random.sample(["价格", "成交量", "市场情绪", "宏观经济"], 2)
}
print(f"{self.name} completed analysis with result: {result}")
return result
def train_model(self, dataset):
"""模拟模型训练方法"""
print(f"{self.name} is training model with {len(dataset)} samples...")
time.sleep(2)
return {"status": "success", "accuracy": random.uniform(0.7, 0.95)}
#3. 执行者Agent实现
class ExecutorAgent(Agent):
"""执行者Agent,负责执行交易操作"""
def __init__(self, name: str):
super().__init__(name)
self.available_actions = ["买入", "卖出", "调整仓位", "取消订单"]
def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行交易任务"""
print(f"{self.name} is executing trade...")
action = task.get("action", "买入")
symbol = task.get("symbol", "UNKNOWN")
quantity = task.get("quantity", 0)
# 验证操作是否可用
if action not in self.available_actions:
return {"status": "error", "message": f"Unsupported action: {action}"}
# 模拟执行耗时
time.sleep(0.5)
# 生成执行结果
result = {
"status": "success",
"action": action,
"symbol": symbol,
"quantity": quantity,
"execution_price": round(random.uniform(100, 200), 2),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
print(f"{self.name} executed {action} {quantity} shares of {symbol}")
return result
def check_balance(self):
"""检查账户余额"""
return {
"cash": round(random.uniform(10000, 50000), 2),
"positions": {f"STOCK_{i}": random.randint(0, 100) for i in range(1, 4)}
}
#4. 协调者Agent实现
class CoordinatorAgent(Agent):
"""协调者Agent,负责任务分配和协调"""
def __init__(self, name: str):
super().__init__(name)
self.registered_agents = {}
def register_agent(self, agent: Agent):
"""注册Agent到系统中"""
self.registered_agents[agent.name] = agent
print(f"Agent {agent.name} registered with coordinator {self.name}")
def perform_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""分配任务给合适的Agent"""
task_type = task.get("type", "")
if "analysis" in task_type.lower():
agent = self.registered_agents.get("Analyst")
elif "execute" in task_type.lower():
agent = self.registered_agents.get("Executor")
else:
return {"status": "error", "message": "Unknown task type"}
if not agent:
return {"status": "error", "message": "No suitable agent available"}
print(f"{self.name} is delegating task to {agent.name}")
return agent.perform_task(task)
def broadcast(self, message: str):
"""广播消息给所有注册的Agent"""
responses = {}
for name, agent in self.registered_agents.items():
responses[name] = agent.communicate(message)
return responses
#5. 系统集成与运行示例
def main():
# 创建Agent实例
coordinator = CoordinatorAgent("Coordinator")
analyst = AnalystAgent("Analyst")
executor = ExecutorAgent("Executor")
# 注册Agent
coordinator.register_agent(analyst)
coordinator.register_agent(executor)
# 模拟分析任务
print("\n=== Running Analysis Task ===")
analysis_task = {
"type": "stock_analysis",
"data": {"symbol": "AAPL", "history": "1Y"},
"analysis_type": "技术指标"
}
analysis_result = coordinator.perform_task(analysis_task)
print("Analysis Result:", analysis_result)
# 模拟执行任务
print("\n=== Running Execution Task ===")
execution_task = {
"type": "execute_trade",
"action": "买入",
"symbol": "AAPL",
"quantity": 100,
"analysis_reference": analysis_result
}
execution_result = coordinator.perform_task(execution_task)
print("Execution Result:", execution_result)
# 测试广播通信
print("\n=== Testing Broadcast Communication ===")
broadcast_responses = coordinator.broadcast("System check at " + time.strftime("%H:%M:%S"))
for agent, response in broadcast_responses.items():
print(f"{agent}: {response}")
# 检查执行者状态
print("\n=== Checking Executor Status ===")
print("Account Balance:", executor.check_balance())
if __name__ == "__main__":
main()
运行代码,输出如下:
Agent Analyst registered with coordinator Coordinator
Agent Executor registered with coordinator Coordinator
=== Running Analysis Task ===
Coordinator is delegating task to Analyst
Analyst is analyzing data...
Analyst completed analysis with result: {'analysis_type': '技术指标', 'result': '技术指标结果: 买入', 'confidence': 0.878509121476005, 'factors_considered': ['价格', '成交量']}
Analysis Result: {'analysis_type': '技术指标', 'result': '技术指标结果: 买入', 'confidence': 0.878509121476005, 'factors_considered': ['价格', '成交量']}
=== Running Execution Task ===
Coordinator is delegating task to Executor
Executor is executing trade...
Executor executed 买入 100 shares of AAPL
Execution Result: {'status': 'success', 'action': '买入', 'symbol': 'AAPL', 'quantity': 100, 'execution_price': 109.74, 'timestamp': '2025-07-03 16:37:13'}
=== Testing Broadcast Communication ===
Analyst received: System check at 16:37:13
Executor received: System check at 16:37:13
Analyst: Analyst acknowledged: System check at 16:37:13
Executor: Executor acknowledged: System check at 16:37:13
=== Checking Executor Status ===
Account Balance: {'cash': 19739.02, 'positions': {'STOCK_1': 88, 'STOCK_2': 94, 'STOCK_3': 54}}
这个框架提供了多Agent系统的基本结构和实现示例,可以根据具体应用场景进行扩展和优化。
3.3.2 通信协议:基于自然语言或结构化消息
多Agent协同通信协议是实现多个智能体之间高效协作的关键技术,可以基于自然语言或结构化消息进行通信。
1. 自然语言处理
自然语言通信依赖于强大的自然语言处理(NLP)技术,包括语言理解、语义解析和语言生成。例如,使用Transformer架构的模型(如GPT)可以处理复杂的语言交互。
【示例3.8】基于DeepSeek自然语言的Agent通信示例,使用Transformers库实现语言生成和理解。
此示例在运行过程中,经常遇到无法连接到Hugging Face的模型仓库的错误,这是由于网络问题或访问限制引起的,以下是几种解决方案:
- 检查网络连接,确保能访问Hugging Face网站。
- 如果在国内,可以使用国内镜像站点。
提前手动下载模型到本地,然后从本地加载模型。
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
import os
# -----------------------------
# 配置:选择模型加载方式
# -----------------------------
USE_MIRROR = True # 是否使用国内镜像
LOCAL_MODEL_PATH = "./local_deepseek_model" # 本地模型路径,如果存在,则从本地加载
LOCAL_CLASSIFIER_PATH = "./local_classifier_model" # 本地分类器模型路径
# 国内镜像设置
if USE_MIRROR:
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
# -----------------------------
# 1. 加载 DeepSeek 模型和 tokenizer
# -----------------------------
model_name = "deepseek-ai/deepseek-llm-1.3b-chat"
# 检查本地是否有模型
if os.path.exists(LOCAL_MODEL_PATH):
print(f"从本地加载模型: {LOCAL_MODEL_PATH}")
model_path = LOCAL_MODEL_PATH
else:
model_path = model_name
try:
tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code=True,
local_files_only=os.path.exists(LOCAL_MODEL_PATH) # 如果是本地路径,则只检查本地文件
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else None,
trust_remote_code=True,
local_files_only=os.path.exists(LOCAL_MODEL_PATH)
)
except Exception as e:
print(f"模型加载失败: {e}")
print("尝试使用更小的模型或检查网络连接")
# 可以在这里提供备选模型
exit(1)
# 生成管道(用于非流式生成)
generator = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device=0 if torch.cuda.is_available() else -1
)
# -----------------------------
# 2. 意图理解
# -----------------------------
classifier_name = "uer/roberta-base-finetuned-dianping-chinese"
# 检查本地是否有分类器模型
if os.path.exists(LOCAL_CLASSIFIER_PATH):
print(f"从本地加载分类器模型: {LOCAL_CLASSIFIER_PATH}")
classifier_path = LOCAL_CLASSIFIER_PATH
else:
classifier_path = classifier_name
try:
intent_classifier = pipeline(
"text-classification",
model=classifier_path,
tokenizer=classifier_path,
local_files_only=os.path.exists(LOCAL_CLASSIFIER_PATH)
)
except Exception as e:
print(f"分类器模型加载失败: {e}")
print("将仅使用关键词意图识别")
# 手动关键词意图识别(中文)
def detect_intent(text):
if any(w in text for w in ["你好", "嗨", "早上好", "下午好"]):
return "问候"
elif any(w in text for w in ["帮助", "帮忙", "怎么做", "如何"]):
return "请求帮助"
elif any(w in text for w in ["是", "对", "没错", "提供", "信息", "有"]):
return "提供信息"
elif any(w in text for w in ["再见", "拜拜", "结束", "退出"]):
return "告别"
else:
return "其他"
# -----------------------------
# 3. 定义支持 DeepSeek 格式的 Agent
# -----------------------------
class DeepSeekAgent:
def __init__(self, name, model, tokenizer):
self.name = name
self.model = model
self.tokenizer = tokenizer
self.history = [] # 存储对话历史
def send(self, message):
print(f"\033[92m{self.name}: {message}\033[0m") # 绿色输出
return message
def receive(self, message):
print(f"\033[94m{self.name} 收到: {message}\033[0m") # 蓝色
# 意图识别
intent = detect_intent(message)
print(f" 🔍 意图识别: {intent}")
# 构造 DeepSeek 的对话输入格式
prompt_lines = []
# 添加历史
for i in range(0, len(self.history), 2):
if i + 1 < len(self.history):
prompt_lines.append(f"<|User|>:{self.history[i]}<|Assistant|>: {self.history[i+1]}")
else:
prompt_lines.append(f"<|User|>:{self.history[i]}")
# 添加当前消息
prompt_lines.append(f"<|User|>:{message}<|Assistant|>:")
full_prompt = "\n".join(prompt_lines)
try:
# Tokenize 并生成
inputs = self.tokenizer(full_prompt, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=128,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
# 解码回复
response = self.tokenizer.decode(outputs[0][inputs['input_ids'].shape[-1]:], skip_special_tokens=True)
# 清理输出
reply = response.strip().split("\n")[0]
reply = reply.replace("<|Assistant|>:", "").strip()
# 更新历史
self.history.append(message)
self.history.append(reply)
return reply
except Exception as e:
print(f"生成回复时出错: {e}")
return "抱歉,我无法生成回复。"
# -----------------------------
# 4. 创建两个 Agent
# -----------------------------
agent_a = DeepSeekAgent("AgentA", model, tokenizer)
agent_b = DeepSeekAgent("AgentB", model, tokenizer)
# -----------------------------
# 5. 开始多轮对话
# -----------------------------
print("🗣 基于 DeepSeek 的 Agent 自然语言通信启动...\n")
# AgentA 发起
msg = "你好,我是AgentA,我们可以一起讨论一个合作计划吗?"
msg = agent_a.send(msg)
try:
for i in range(4): # 4 轮来回
print("-" * 50)
reply = agent_b.receive(msg)
msg = agent_b.send(reply)
print("-" * 50)
reply = agent_a.receive(msg)
msg = agent_a.send(reply)
except KeyboardInterrupt:
print("\n对话已手动终止")
except Exception as e:
print(f"对话过程中出错: {e}")
运行代码,输出如下:
基于 DeepSeek 的 Agent 自然语言通信启动...
AgentA: 你好,我是AgentA,我们可以一起讨论一个合作计划吗?
--------------------------------------------------
AgentB 收到: 你好,我是AgentA,我们可以一起讨论一个合作计划吗?
意图识别: 问候
AgentB: 当然可以,我很乐意与您合作。请告诉我您有什么计划?
--------------------------------------------------
AgentA 收到: 当然可以,我很乐意与您合作。请告诉我您有什么计划?
意图识别: 提供信息
AgentA: 我想开发一个智能助手,能自动安排日程和发送邮件。
...
2. 基于结构化消息的通信协议
结构化消息使用预定义的格式和字段,便于机器解析和处理,提高通信效率。
常见的结构化消息格式包括:
- FIPA ACL:Foundation for Intelligent Physical Agents的标准。
- JSON-based:自定义JSON格式。
- Protocol Buffers:Google的高效二进制协议。
【示例3.9】一个基于JSON格式的结构化消息通信示例。
import json
# 定义结构化消息格式
class Message:
def __init__(self, sender, receiver, msg_type, content):
self.sender = sender
self.receiver = receiver
self.msg_type = msg_type
self.content = content
def to_json(self):
return json.dumps(self.__dict__)
# Agent A 发送结构化消息
agent_a_msg = Message(sender="Agent A", receiver="Agent B", msg_type="REQUEST", content="计算销售数据")
msg_json = agent_a_msg.to_json()
print("Agent A 发送:", msg_json)
# Agent B 接收并解析消息
agent_b_msg = json.loads(msg_json)
print("Agent B 接收:", agent_b_msg)
# Agent B 回复结构化消息
agent_b_response = Message(sender="Agent B", receiver="Agent A", msg_type="RESPONSE", content="销售数据已计算完成")
response_json = agent_b_response.to_json()
print("Agent B 回复:", response_json)
运行代码,输出如下:
Agent A 发送: {"sender": "Agent A", "receiver": "Agent B", "msg_type": "REQUEST", "content": "\u8ba1\u7b97\u9500\u552e\u6570\u636e"}
Agent B 接收: {'sender': 'Agent A', 'receiver': 'Agent B', 'msg_type': 'REQUEST', 'content': '计算销售数据'}
Agent B 回复: {"sender": "Agent B", "receiver": "Agent A", "msg_type": "RESPONSE", "content": "\u9500\u552e\u6570\u636e\u5df2\u8ba1\u7b97\u5b8c\u6210"}
3. 实际应用场景
(1)智能工厂:在智能工厂中,多个机器人和设备需要高效协同工作。结构化消息协议(如FIPA-ACL)被广泛用于任务分配、冲突解决和状态更新。
(2)智能助手:在多智能助手场景中,自然语言通信可用于初次交互和复杂任务的协商,而结构化消息则用于后续的高效执行。
自然语言通信适用于灵活性和适应性要求较高的场景,但计算成本较高。结构化消息通信适用于效率和可扩展性要求较高的场景,但需要预定义协议。在实际应用中,可以根据需求结合使用这两种通信方式。
3.3.3 竞争协调:拍卖机制或投票系统
1. 竞争协调的核心价值
在多智能体系统(MAS)中,当智能体目标冲突或资源有限时,需通过竞争协调机制实现高效决策。该机制需满足:
- 公平性:避免单一智能体垄断资源。
- 效率性:最小化协调成本,最大化系统收益。
- 稳定性:抑制策略性操纵行为。
2. 拍卖机制:资源竞争的定价策略
1)核心原理
通过竞价博弈分配稀缺资源,以出价作为竞争信号。其运作遵循:
- 价高者得:原则实现帕累托最优分配。
- 显式规则:(如密封投标、公开叫价)决定最终归属。
- 支付机制:设计(如首价/次价支付)影响竞拍策略。
2)拍卖机制实现
【示例3.10】拍卖机制常用于资源分配和任务分配场景。下面实现一个简单的英式拍卖(公开增价拍卖)。
import random
from typing import List, Dict
class EnglishAuction:
def __init__(self, items: List[str], agents: List[str], initial_prices: Dict[str, float]):
"""
初始化英式拍卖
:param items: 拍卖物品列表
:param agents: 参与拍卖的智能体列表
:param initial_prices: 各物品的初始价格
"""
self.items = items
self.agents = agents
self.current_prices = initial_prices.copy()
self.winners = {item: None for item in items}
self.bids = {item: [] for item in items}
def agent_bid(self, agent: str, item: str, bid_price: float) -> bool:
"""
智能体出价
:param agent: 出价智能体
:param item: 拍卖物品
:param bid_price: 出价金额
:return: 是否出价成功
"""
if bid_price > self.current_prices[item]:
self.current_prices[item] = bid_price
self.bids[item].append((agent, bid_price))
return True
return False
def close_auction(self) -> Dict[str, str]:
"""
结束拍卖,确定获胜者
:return: 各物品的获胜者
"""
for item in self.items:
if self.bids[item]:
# 最高出价者获胜
self.winners[item] = max(self.bids[item], key=lambda x: x[1])[0]
return self.winners
def simulate_auction(self, max_rounds: int = 10):
"""
模拟拍卖过程
:param max_rounds: 最大轮数
"""
for _ in range(max_rounds):
for agent in self.agents:
for item in self.items:
# 模拟智能体随机出价
if random.random() > 0.7: # 30%概率出价
current_price = self.current_prices[item]
bid_price = current_price * (1 + random.uniform(0.1, 0.5))
self.agent_bid(agent, item, bid_price)
print(f"Round {_+1} current prices: {self.current_prices}")
self.close_auction()
print("Auction results:", self.winners)
# 使用示例
if __name__ == "__main__":
items = ["task1", "task2", "resource1"]
agents = ["agent1", "agent2", "agent3", "agent4"]
initial_prices = {"task1": 10.0, "task2": 15.0, "resource1": 20.0}
auction = EnglishAuction(items, agents, initial_prices)
auction.simulate_auction()
运行代码,输出如下:
Round 1 current prices: {'task1': 10.0, 'task2': 15.0, 'resource1': 27.624252137343362}
Round 2 current prices: {'task1': 10.0, 'task2': 15.0, 'resource1': 27.624252137343362}
Round 3 current prices: {'task1': 14.150423418969961, 'task2': 25.598419655339914, 'resource1': 48.43936171930753}
Round 4 current prices: {'task1': 14.150423418969961, 'task2': 34.189396186994834, 'resource1': 84.86987307269457}
Round 5 current prices: {'task1': 14.150423418969961, 'task2': 44.89570715854789, 'resource1': 102.4059754336159}
Round 6 current prices: {'task1': 14.150423418969961, 'task2': 55.70639803093716, 'resource1': 265.4076490101352}
Round 7 current prices: {'task1': 16.165556762263797, 'task2': 81.8150489398106, 'resource1': 454.164529972914}
Round 8 current prices: {'task1': 17.843491163327904, 'task2': 81.8150489398106, 'resource1': 664.0285634582275}
Round 9 current prices: {'task1': 25.410115323935496, 'task2': 81.8150489398106, 'resource1': 776.2515227325367}
Round 10 current prices: {'task1': 72.47785125080446, 'task2': 158.02764874120223, 'resource1': 1088.1494800815683}
Auction results: {'task1': 'agent4', 'task2': 'agent4', 'resource1': 'agent2'}
【示例3.11】投票系统实现,投票系统常用于集体决策场景。下面实现一个多数投票和波达计数(Borda Count)投票系统。
from typing import List, Dict
from collections import defaultdict
class VotingSystem:
def __init__(self, candidates: List[str], voters: List[str]):
"""
初始化投票系统
:param candidates: 候选选项
:param voters: 投票者列表
"""
self.candidates = candidates
self.voters = voters
self.ballots = []
def submit_ballot(self, voter: str, ranking: List[str]) -> bool:
"""
提交投票
:param voter: 投票者
:param ranking: 投票者的偏好排序
:return: 是否提交成功
"""
if voter not in self.voters:
return False
if set(ranking) != set(self.candidates):
return False
self.ballots.append((voter, ranking))
return True
def majority_vote(self) -> str:
"""
多数投票制
:return: 获胜者
"""
if not self.ballots:
return None
first_choices = [ballot[1][0] for ballot in self.ballots]
vote_counts = defaultdict(int)
for candidate in first_choices:
vote_counts[candidate] += 1
winner = max(vote_counts.items(), key=lambda x: x[1])[0]
return winner
def borda_count(self) -> str:
"""
波达计数法
:return: 获胜者
"""
if not self.ballots:
return None
candidate_scores = defaultdict(int)
num_candidates = len(self.candidates)
for _, ranking in self.ballots:
for position, candidate in enumerate(ranking):
# 波达计数:第一名得n-1分,第二名得n-2分,…,最后一名得0分
candidate_scores[candidate] += (num_candidates - position - 1)
winner = max(candidate_scores.items(), key=lambda x: x[1])[0]
return winner
def simulate_voting(self):
"""
模拟投票过程
"""
# 模拟投票者提交偏好排序
for voter in self.voters:
# 随机生成偏好排序
ranking = self.candidates.copy()
random.shuffle(ranking)
self.submit_ballot(voter, ranking)
print(f"{voter} votes: {ranking}")
# 多数投票结果
majority_winner = self.majority_vote()
print(f"Majority vote winner: {majority_winner}")
# 波达计数结果
borda_winner = self.borda_count()
print(f"Borda count winner: {borda_winner}")
# 使用示例
if __name__ == "__main__":
import random
candidates = ["OptionA", "OptionB", "OptionC", "OptionD"]
voters = ["Agent1", "Agent2", "Agent3", "Agent4", "Agent5"]
voting_system = VotingSystem(candidates, voters)
voting_system.simulate_voting()
运行代码,输出如下:
Agent1 votes: ['OptionA', 'OptionC', 'OptionB', 'OptionD']
Agent2 votes: ['OptionB', 'OptionC', 'OptionD', 'OptionA']
Agent3 votes: ['OptionD', 'OptionB', 'OptionC', 'OptionA']
Agent4 votes: ['OptionC', 'OptionA', 'OptionB', 'OptionD']
Agent5 votes: ['OptionD', 'OptionC', 'OptionB', 'OptionA']
Majority vote winner: OptionD
Borda count winner: OptionC
【示例3.12】智能体协调框架,结合拍卖和投票的智能体协调框架示例。
import random
from typing import List, Dict
from collections import defaultdict
class EnglishAuction:
def __init__(self, items: List[str], agents: List[str], initial_prices: Dict[str, float]):
"""
初始化英式拍卖
:param items: 拍卖物品列表
:param agents: 参与拍卖的智能体列表
:param initial_prices: 各物品的初始价格
"""
self.items = items
self.agents = agents
self.current_prices = initial_prices.copy()
self.winners = {item: None for item in items}
self.bids = {item: [] for item in items}
def agent_bid(self, agent: str, item: str, bid_price: float) -> bool:
"""
智能体出价
:param agent: 出价智能体
:param item: 拍卖物品
:param bid_price: 出价金额
:return: 是否出价成功
"""
if bid_price > self.current_prices[item]:
self.current_prices[item] = bid_price
self.bids[item].append((agent, bid_price))
return True
return False
def close_auction(self) -> Dict[str, str]:
"""
结束拍卖,确定获胜者
:return: 各物品的获胜者
"""
for item in self.items:
if self.bids[item]:
# 最高出价者获胜
self.winners[item] = max(self.bids[item], key=lambda x: x[1])[0]
return self.winners
def simulate_auction(self, max_rounds: int = 10):
"""
模拟拍卖过程
:param max_rounds: 最大轮数
"""
for round_num in range(max_rounds):
for agent in self.agents:
for item in self.items:
# 模拟智能体随机出价
if random.random() > 0.7: # 30%概率出价
current_price = self.current_prices[item]
bid_price = current_price * (1 + random.uniform(0.1, 0.5))
self.agent_bid(agent, item, bid_price)
print(f"Round {round_num+1} current prices: {self.current_prices}")
self.close_auction()
print("Auction results:", self.winners)
return self.winners
class VotingSystem:
def __init__(self, candidates: List[str], voters: List[str]):
"""
初始化投票系统
:param candidates: 候选选项
:param voters: 投票者列表
"""
self.candidates = candidates
self.voters = voters
self.ballots = []
def submit_ballot(self, voter: str, ranking: List[str]) -> bool:
"""
提交投票
:param voter: 投票者
:param ranking: 投票者的偏好排序
:return: 是否提交成功
"""
if voter not in self.voters:
return False
if set(ranking) != set(self.candidates):
return False
self.ballots.append((voter, ranking))
return True
def majority_vote(self) -> str:
"""
多数投票制
:return: 获胜者
"""
if not self.ballots:
return None
first_choices = [ballot[1][0] for ballot in self.ballots]
vote_counts = defaultdict(int)
for candidate in first_choices:
vote_counts[candidate] += 1
winner = max(vote_counts.items(), key=lambda x: x[1])[0]
return winner
def borda_count(self) -> str:
"""
波达计数法
:return: 获胜者
"""
if not self.ballots:
return None
candidate_scores = defaultdict(int)
num_candidates = len(self.candidates)
for _, ranking in self.ballots:
for position, candidate in enumerate(ranking):
# 波达计数:第一名得n-1分,第二名得n-2分,…,最后一名得0分
candidate_scores[candidate] += (num_candidates - position - 1)
winner = max(candidate_scores.items(), key=lambda x: x[1])[0]
return winner
def simulate_voting(self):
"""
模拟投票过程
"""
# 模拟投票者提交偏好排序
for voter in self.voters:
# 随机生成偏好排序
ranking = self.candidates.copy()
random.shuffle(ranking)
self.submit_ballot(voter, ranking)
print(f"{voter} votes: {ranking}")
# 多数投票结果
majority_winner = self.majority_vote()
print(f"Majority vote winner: {majority_winner}")
# 波达计数结果
borda_winner = self.borda_count()
print(f"Borda count winner: {borda_winner}")
return majority_winner, borda_winner
class Agent:
def __init__(self, name, resources, preferences):
self.name = name
self.resources = resources
self.preferences = preferences # {item: value}
def bid_strategy(self, auction_type, item, current_bid):
"""根据拍卖类型制定出价策略"""
if auction_type == "english":
# 英式拍卖策略:出价比当前最高价高,但不超过自己的估值
my_value = self.preferences.get(item, 0)
if current_bid < my_value:
return min(current_bid + 5, my_value) # 简单增量策略
return 0
elif auction_type == "vickrey":
# Vickrey拍卖策略:出价等于真实估值
return self.preferences.get(item, 0)
return 0
def vote_strategy(self, voting_system, alternatives):
"""投票策略"""
if isinstance(voting_system, MajorityVoting):
# 多数投票:选择最偏好的选项
return max(alternatives, key=lambda x: self.preferences.get(x, 0))
elif isinstance(voting_system, BordaVoting):
# Borda投票:按偏好排序
return sorted(alternatives, key=lambda x: self.preferences.get(x, 0), reverse=True)
return None
class MajorityVoting:
def __init__(self, alternatives):
"""
多数投票系统
:param alternatives: 可选方案列表
"""
self.alternatives = alternatives
self.votes = {alt: 0 for alt in alternatives}
self.voters = set()
def cast_vote(self, voter, alternative):
"""投票"""
if alternative not in self.alternatives:
raise ValueError("Invalid alternative")
self.votes[alternative] += 1
self.voters.add(voter)
def get_winner(self):
"""获取获胜方案"""
if not self.voters:
return None
return max(self.votes.items(), key=lambda x: x[1])[0]
def get_vote_distribution(self):
"""获取投票分布"""
return self.votes
class BordaVoting:
def __init__(self, alternatives):
"""
Borda投票系统
:param alternatives: 可选方案列表
"""
self.alternatives = alternatives
self.rankings = []
self.voters = set()
def submit_ranking(self, voter, ranking):
"""提交排序"""
if set(ranking) != set(self.alternatives):
raise ValueError("Invalid ranking")
self.rankings.append((voter, ranking))
self.voters.add(voter)
def get_winner(self):
"""获取获胜方案"""
if not self.voters:
return None
candidate_scores = defaultdict(int)
num_candidates = len(self.alternatives)
for _, ranking in self.rankings:
for position, candidate in enumerate(ranking):
candidate_scores[candidate] += (num_candidates - position - 1)
return max(candidate_scores.items(), key=lambda x: x[1])[0]
class VickreyAuction:
def __init__(self, items):
self.items = items
self.bids = {item: [] for item in items}
self.winners = {item: None for item in items}
self.winning_prices = {item: 0 for item in items}
def submit_bid(self, item, agent, bid):
self.bids[item].append((agent, bid))
def determine_winner(self, item):
if not self.bids[item]:
return
sorted_bids = sorted(self.bids[item], key=lambda x: x[1], reverse=True)
if len(sorted_bids) >= 2:
self.winners[item] = sorted_bids[0][0]
self.winning_prices[item] = sorted_bids[1][1]
elif len(sorted_bids) == 1:
self.winners[item] = sorted_bids[0][0]
self.winning_prices[item] = sorted_bids[0][1]
def get_status(self, item):
return {
'winner': self.winners[item],
'winning_price': self.winning_prices[item]
}
class CoordinationFramework:
def __init__(self, agents, items, voting_alternatives):
self.agents = agents
self.items = items
self.voting_alternatives = voting_alternatives
def run_auction(self, auction_type):
"""运行拍卖"""
if auction_type == "english":
initial_prices = {item: 0 for item in self.items}
auction = EnglishAuction(self.items, [agent.name for agent in self.agents], initial_prices)
for round_num in range(10): # 进行10轮拍卖
for agent in self.agents:
for item in self.items:
current_price = auction.current_prices[item]
bid = agent.bid_strategy("english", item, current_price)
if bid > 0:
auction.agent_bid(agent.name, item, bid)
print(f"Round {round_num+1} current prices: {auction.current_prices}")
auction.close_auction()
return auction
elif auction_type == "vickrey":
auction = VickreyAuction(self.items)
for item in self.items:
for agent in self.agents:
bid = agent.bid_strategy("vickrey", item, 0)
auction.submit_bid(item, agent.name, bid)
auction.determine_winner(item)
return auction
return None
def run_voting(self, voting_type):
"""运行投票"""
if voting_type == "majority":
voting = MajorityVoting(self.voting_alternatives)
for agent in self.agents:
vote = agent.vote_strategy(voting, self.voting_alternatives)
voting.cast_vote(agent.name, vote)
return voting
elif voting_type == "borda":
voting = BordaVoting(self.voting_alternatives)
for agent in self.agents:
ranking = agent.vote_strategy(voting, self.voting_alternatives)
voting.submit_ranking(agent.name, ranking)
return voting
return None
# 使用示例
if __name__ == "__main__":
# 创建智能体
agents = [
Agent("agent1", 200, {"painting": 150, "book": 50, "optionA": 10, "optionB": 5}),
Agent("agent2", 300, {"painting": 100, "book": 80, "optionA": 5, "optionB": 8}),
Agent("agent3", 250, {"painting": 120, "book": 70, "optionA": 7, "optionB": 6})
]
# 创建协调框架
framework = CoordinationFramework(
agents=agents,
items=["painting", "book"],
voting_alternatives=["optionA", "optionB"]
)
# 运行英式拍卖
print("=== English Auction ===")
english_result = framework.run_auction("english")
print("English Auction Results:")
for item in english_result.items:
print(f"{item}: {english_result.winners[item]}")
# 运行多数投票
print("\n=== Majority Voting ===")
majority_result = framework.run_voting("majority")
print("Majority Voting Results:")
print("Winner:", majority_result.get_winner())
print("Distribution:", majority_result.get_vote_distribution())
# 运行Borda投票
print("\n=== Borda Voting ===")
borda_result = framework.run_voting("borda")
print("Borda Voting Results:")
print("Winner:", borda_result.get_winner())
# 运行Vickrey拍卖
print("\n=== Vickrey Auction ===")
vickrey_result = framework.run_auction("vickrey")
print("Vickrey Auction Results:")
for item in vickrey_result.items:
status = vickrey_result.get_status(item)
print(f"{item}: Winner={status['winner']}, Price={status['winning_price']}")
运行代码,输出如下:
=== English Auction ===
Round 1 current prices: {'painting': 15, 'book': 15}
Round 2 current prices: {'painting': 30, 'book': 30}
Round 3 current prices: {'painting': 45, 'book': 45}
Round 4 current prices: {'painting': 60, 'book': 60}
Round 5 current prices: {'painting': 75, 'book': 70}
Round 6 current prices: {'painting': 90, 'book': 75}
Round 7 current prices: {'painting': 105, 'book': 80}
Round 8 current prices: {'painting': 115, 'book': 80}
Round 9 current prices: {'painting': 120, 'book': 80}
Round 10 current prices: {'painting': 125, 'book': 80}
English Auction Results:
painting: agent1
book: agent2
=== Majority Voting ===
Majority Voting Results:
Winner: optionA
Distribution: {'optionA': 2, 'optionB': 1}
=== Borda Voting ===
Borda Voting Results:
Winner: optionA
=== Vickrey Auction ===
Vickrey Auction Results:
painting: Winner=agent1, Price=120
book: Winner=agent2, Price=70
以上代码实现了AI智能体竞争协调中的两种主要机制:拍卖机制和投票系统。这两种机制可以用于多智能体系统中的资源分配、决策制定等协调问题。你可以根据具体需求扩展这些基本实现,例如添加更复杂的出价策略、考虑预算约束、实现组合拍卖等。

更多推荐
所有评论(0)