互联网大厂面试题100道-美团拼多多篇
互联网大厂面试题精选摘要 编程基础篇 高效配送路径算法:基于贪心算法和K-means聚类的TSP变种解法,考虑优先级、距离和时间约束 数据结构应用:包含堆、树、图等15道经典算法题,涵盖动态规划和贪心算法 多线程编程:10道语言特性题,涉及Java/Python并发控制和网络编程 系统设计篇 O2O核心系统:10道题目聚焦位置服务、订单系统和智能调度 电商解决方案:8道题覆盖商品管理、库存系统和促
·
互联网大厂经典面试题100道 - 美团、拼多多篇
📋 总体提纲
🎯 第一部分:编程基础 (30题)
- 数据结构与算法 (15题):堆、树、图、动态规划、贪心算法
- 编程语言 (10题):Java/Python/C++、多线程、网络编程
- 设计模式 (5题):策略、模板、观察者、工厂、单例
🔧 第二部分:系统设计 (25题)
- O2O系统 (10题):位置服务、商家管理、订单系统、配送调度
- 电商系统 (8题):商品管理、库存系统、价格策略、促销活动
- 推荐系统 (7题):协同过滤、内容推荐、实时推荐、A/B测试
🌐 第三部分:公司特色题目 (45题)
- 美团特色 (25题):外卖配送、商家入驻、用户增长、数据分析
- 拼多多特色 (20题):社交电商、拼团模式、农产品上行、百亿补贴
📝 详细题目与答案
🎯 第一部分:编程基础 (30题)
数据结构与算法 (15题)
1. 实现一个高效的配送路径规划算法
import heapq
import math
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
import time
@dataclass
class Location:
"""位置信息"""
lat: float # 纬度
lng: float # 经度
address: str = ""
def distance_to(self, other: 'Location') -> float:
"""计算两点间距离(使用Haversine公式)"""
R = 6371 # 地球半径(公里)
lat1, lon1 = math.radians(self.lat), math.radians(self.lng)
lat2, lon2 = math.radians(other.lat), math.radians(other.lng)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(a))
return R * c
@dataclass
class DeliveryOrder:
"""配送订单"""
order_id: str
restaurant: Location
customer: Location
priority: int = 1 # 优先级,数字越小优先级越高
preparation_time: int = 0 # 准备时间(分钟)
deadline: Optional[float] = None # 截止时间(时间戳)
class DeliveryRouteOptimizer:
"""配送路径优化器"""
def __init__(self, depot: Location):
self.depot = depot # 配送中心位置
self.speed = 20 # 平均配送速度(公里/小时)
def optimize_single_route(self, orders: List[DeliveryOrder]) -> List[DeliveryOrder]:
"""优化单个骑手的配送路径(TSP变种)"""
if not orders:
return []
# 使用贪心算法的改进版本
unvisited = orders.copy()
route = []
current_location = self.depot
while unvisited:
best_order = None
best_score = float('inf')
for order in unvisited:
# 计算从当前位置到餐厅的距离
to_restaurant = current_location.distance_to(order.restaurant)
# 计算从餐厅到客户的距离
to_customer = order.restaurant.distance_to(order.customer)
# 综合评分:距离 + 优先级 + 时间约束
distance_score = (to_restaurant + to_customer) / self.speed * 60 # 转换为分钟
priority_score = order.priority * 10 # 优先级权重
time_score = 0
if order.deadline:
current_time = time.time()
estimated_arrival = current_time + distance_score + order.preparation_time
if estimated_arrival > order.deadline:
time_score = (estimated_arrival - order.deadline) * 100 # 超时惩罚
total_score = distance_score + priority_score + time_score
if total_score < best_score:
best_score = total_score
best_order = order
if best_order:
route.append(best_order)
unvisited.remove(best_order)
current_location = best_order.customer
return route
def optimize_multiple_routes(self, orders: List[DeliveryOrder],
num_riders: int) -> List[List[DeliveryOrder]]:
"""优化多个骑手的配送路径(车辆路径问题)"""
if not orders:
return []
# 按优先级和地理位置聚类
clusters = self._cluster_orders(orders, num_riders)
# 为每个聚类优化路径
routes = []
for cluster in clusters:
route = self.optimize_single_route(cluster)
if route:
routes.append(route)
return routes
def _cluster_orders(self, orders: List[DeliveryOrder],
num_clusters: int) -> List[List[DeliveryOrder]]:
"""使用K-means聚类订单"""
if len(orders) <= num_clusters:
return [[order] for order in orders]
# 简化的K-means实现
import random
# 随机选择初始中心点
centers = random.sample(orders, num_clusters)
clusters = [[] for _ in range(num_clusters)]
for _ in range(10): # 迭代次数
# 清空聚类
clusters = [[] for _ in range(num_clusters)]
# 分配订单到最近的聚类
for order in orders:
min_dist = float('inf')
best_cluster = 0
for i, center in enumerate(centers):
dist = order.restaurant.distance_to(center.restaurant)
if dist < min_dist:
min_dist = dist
best_cluster = i
clusters[best_cluster].append(order)
# 更新聚类中心
for i, cluster in enumerate(clusters):
if cluster:
# 计算平均位置作为新中心
avg_lat = sum(order.restaurant.lat for order in cluster) / len(cluster)
avg_lng = sum(order.restaurant.lng for order in cluster) / len(cluster)
# 找到最接近平均位置的订单作为新中心
min_dist = float('inf')
best_order = None
for order in cluster:
dist = math.sqrt((order.restaurant.lat - avg_lat)**2 +
(order.restaurant.lng - avg_lng)**2)
if dist < min_dist:
min_dist = dist
best_order = order
if best_order:
centers[i] = best_order
return clusters
def calculate_route_distance(self, route: List[DeliveryOrder]) -> float:
"""计算路径总距离"""
if not route:
return 0.0
total_distance = 0.0
current_location = self.depot
for order in route:
total_distance += current_location.distance_to(order.restaurant)
total_distance += order.restaurant.distance_to(order.customer)
current_location = order.customer
# 返回配送中心
total_distance += current_location.distance_to(self.depot)
return total_distance
def estimate_delivery_time(self, route: List[DeliveryOrder]) -> float:
"""估算配送时间(分钟)"""
if not route:
return 0.0
total_time = 0.0
current_location = self.depot
for order in route:
# 到餐厅的时间
travel_time = current_location.distance_to(order.restaurant) / self.speed * 60
total_time += travel_time
# 等待准备时间
total_time += max(0, order.preparation_time - total_time)
# 到客户的时间
travel_time = order.restaurant.distance_to(order.customer) / self.speed * 60
total_time += travel_time
current_location = order.customer
# 返回配送中心的时间
total_time += current_location.distance_to(self.depot) / self.speed * 60
return total_time
class RealTimeRouteOptimizer:
"""实时路径优化器"""
def __init__(self, optimizer: DeliveryRouteOptimizer):
self.optimizer = optimizer
self.active_routes: Dict[str, List[DeliveryOrder]] = {} # rider_id -> route
self.pending_orders: List[DeliveryOrder] = []
self.completed_orders: List[str] = []
def add_order(self, order: DeliveryOrder):
"""添加新订单"""
self.pending_orders.append(order)
self._rebalance_routes()
def complete_order(self, rider_id: str, order_id: str):
"""完成订单"""
if rider_id in self.active_routes:
route = self.active_routes[rider_id]
self.active_routes[rider_id] = [o for o in route if o.order_id != order_id]
self.completed_orders.append(order_id)
self._rebalance_routes()
def _rebalance_routes(self):
"""重新平衡路径"""
# 收集所有未完成的订单
all_orders = self.pending_orders.copy()
for route in self.active_routes.values():
all_orders.extend(route)
# 重新优化路径
num_riders = len(self.active_routes) + max(1, len(self.pending_orders) // 5)
new_routes = self.optimizer.optimize_multiple_routes(all_orders, num_riders)
# 更新路径
self.active_routes.clear()
self.pending_orders.clear()
for i, route in enumerate(new_routes):
rider_id = f"rider_{i+1}"
self.active_routes[rider_id] = route
def get_route_for_rider(self, rider_id: str) -> List[DeliveryOrder]:
"""获取骑手的配送路径"""
return self.active_routes.get(rider_id, [])
def get_stats(self) -> Dict:
"""获取统计信息"""
total_orders = len(self.completed_orders) + len(self.pending_orders)
for route in self.active_routes.values():
total_orders += len(route)
return {
'total_orders': total_orders,
'completed_orders': len(self.completed_orders),
'pending_orders': len(self.pending_orders),
'active_riders': len(self.active_routes),
'orders_per_rider': {
rider_id: len(route)
for rider_id, route in self.active_routes.items()
}
}
# 使用示例
if __name__ == "__main__":
# 创建配送中心
depot = Location(39.9042, 116.4074, "北京市朝阳区")
# 创建路径优化器
optimizer = DeliveryRouteOptimizer(depot)
# 创建一些测试订单
orders = [
DeliveryOrder(
"order_1",
Location(39.9100, 116.4100, "餐厅A"),
Location(39.9150, 116.4150, "客户1"),
priority=1,
preparation_time=15
),
DeliveryOrder(
"order_2",
Location(39.9050, 116.4050, "餐厅B"),
Location(39.9200, 116.4200, "客户2"),
priority=2,
preparation_time=10
),
DeliveryOrder(
"order_3",
Location(39.9000, 116.4000, "餐厅C"),
Location(39.9250, 116.4250, "客户3"),
priority=1,
preparation_time=20
),
DeliveryOrder(
"order_4",
Location(39.9120, 116.4080, "餐厅D"),
Location(39.9180, 116.4120, "客户4"),
priority=3,
preparation_time=5
),
]
# 优化单个骑手路径
single_route = optimizer.optimize_single_route(orders)
print("单个骑手优化路径:")
for i, order in enumerate(single_route):
print(f" {i+1}. {order.order_id}: {order.restaurant.address} -> {order.customer.address}")
distance = optimizer.calculate_route_distance(single_route)
time_needed = optimizer.estimate_delivery_time(single_route)
print(f"总距离: {distance:.2f}公里")
print(f"预计时间: {time_needed:.2f}分钟")
# 优化多个骑手路径
print("\n多个骑手优化路径:")
multi_routes = optimizer.optimize_multiple_routes(orders, 2)
for i, route in enumerate(multi_routes):
print(f"骑手 {i+1}:")
for j, order in enumerate(route):
print(f" {j+1}. {order.order_id}")
# 实时优化测试
print("\n实时优化测试:")
real_time_optimizer = RealTimeRouteOptimizer(optimizer)
# 添加订单
for order in orders[:2]:
real_time_optimizer.add_order(order)
print("初始状态:", real_time_optimizer.get_stats())
# 添加更多订单
for order in orders[2:]:
real_time_optimizer.add_order(order)
print("添加订单后:", real_time_optimizer.get_stats())
# 完成订单
real_time_optimizer.complete_order("rider_1", "order_1")
print("完成订单后:", real_time_optimizer.get_stats())
2. 实现一个商品推荐算法
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Set
from collections import defaultdict, Counter
import math
import time
from dataclasses import dataclass
@dataclass
class User:
user_id: str
age: int
gender: str
location: str
interests: List[str]
@dataclass
class Product:
product_id: str
category: str
brand: str
price: float
tags: List[str]
rating: float
sales_count: int
@dataclass
class Interaction:
user_id: str
product_id: str
action: str # view, click, purchase, cart, favorite
timestamp: float
rating: Optional[float] = None
class CollaborativeFilteringRecommender:
"""协同过滤推荐器"""
def __init__(self, min_interactions: int = 5):
self.min_interactions = min_interactions
self.user_item_matrix = None
self.user_similarity = None
self.item_similarity = None
self.user_mapping = {}
self.item_mapping = {}
self.reverse_user_mapping = {}
self.reverse_item_mapping = {}
def fit(self, interactions: List[Interaction]):
"""训练协同过滤模型"""
# 构建用户-物品交互矩阵
user_item_data = defaultdict(lambda: defaultdict(float))
for interaction in interactions:
weight = self._get_action_weight(interaction.action)
if interaction.rating:
weight = interaction.rating
user_item_data[interaction.user_id][interaction.product_id] += weight
# 创建用户和物品映射
users = list(user_item_data.keys())
items = set()
for user_data in user_item_data.values():
items.update(user_data.keys())
items = list(items)
self.user_mapping = {user: idx for idx, user in enumerate(users)}
self.item_mapping = {item: idx for idx, item in enumerate(items)}
self.reverse_user_mapping = {idx: user for user, idx in self.user_mapping.items()}
self.reverse_item_mapping = {idx: item for item, idx in self.item_mapping.items()}
# 构建矩阵
n_users = len(users)
n_items = len(items)
self.user_item_matrix = np.zeros((n_users, n_items))
for user_id, user_data in user_item_data.items():
user_idx = self.user_mapping[user_id]
for product_id, weight in user_data.items():
item_idx = self.item_mapping[product_id]
self.user_item_matrix[user_idx, item_idx] = weight
# 计算相似度
self._compute_similarities()
def _get_action_weight(self, action: str) -> float:
"""获取行为权重"""
weights = {
'view': 1.0,
'click': 2.0,
'cart': 3.0,
'favorite': 4.0,
'purchase': 5.0
}
return weights.get(action, 1.0)
def _compute_similarities(self):
"""计算用户和物品相似度"""
# 用户相似度(余弦相似度)
self.user_similarity = np.zeros((len(self.user_mapping), len(self.user_mapping)))
for i in range(len(self.user_mapping)):
for j in range(i, len(self.user_mapping)):
if i == j:
self.user_similarity[i, j] = 1.0
else:
sim = self._cosine_similarity(self.user_item_matrix[i], self.user_item_matrix[j])
self.user_similarity[i, j] = sim
self.user_similarity[j, i] = sim
# 物品相似度
self.item_similarity = np.zeros((len(self.item_mapping), len(self.item_mapping)))
for i in range(len(self.item_mapping)):
for j in range(i, len(self.item_mapping)):
if i == j:
self.item_similarity[i, j] = 1.0
else:
sim = self._cosine_similarity(self.user_item_matrix[:, i], self.user_item_matrix[:, j])
self.item_similarity[i, j] = sim
self.item_similarity[j, i] = sim
def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""计算余弦相似度"""
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def recommend_for_user(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""为用户推荐物品"""
if user_id not in self.user_mapping:
return []
user_idx = self.user_mapping[user_id]
user_vector = self.user_item_matrix[user_idx]
# 找到相似用户
similar_users = np.argsort(self.user_similarity[user_idx])[::-1][1:21] # 前20个相似用户
# 计算推荐分数
item_scores = defaultdict(float)
for similar_user_idx in similar_users:
similarity = self.user_similarity[user_idx, similar_user_idx]
similar_user_vector = self.user_item_matrix[similar_user_idx]
for item_idx, rating in enumerate(similar_user_vector):
if rating > 0 and user_vector[item_idx] == 0: # 用户未交互过的物品
item_scores[item_idx] += similarity * rating
# 排序并返回top-k
sorted_items = sorted(item_scores.items(), key=lambda x: x[1], reverse=True)
recommendations = []
for item_idx, score in sorted_items[:top_k]:
product_id = self.reverse_item_mapping[item_idx]
recommendations.append((product_id, score))
return recommendations
def recommend_similar_items(self, product_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""推荐相似物品"""
if product_id not in self.item_mapping:
return []
item_idx = self.item_mapping[product_id]
item_similarities = self.item_similarity[item_idx]
# 获取最相似的物品
similar_indices = np.argsort(item_similarities)[::-1][1:top_k+1]
recommendations = []
for similar_idx in similar_indices:
similar_product_id = self.reverse_item_mapping[similar_idx]
similarity = item_similarities[similar_idx]
recommendations.append((similar_product_id, similarity))
return recommendations
class ContentBasedRecommender:
"""基于内容的推荐器"""
def __init__(self):
self.product_features = {}
self.user_profiles = {}
self.feature_weights = {
'category': 0.3,
'brand': 0.2,
'price_range': 0.2,
'tags': 0.3
}
def fit(self, users: List[User], products: List[Product], interactions: List[Interaction]):
"""训练基于内容的模型"""
# 构建物品特征向量
for product in products:
self.product_features[product.product_id] = self._extract_product_features(product)
# 构建用户画像
user_interactions = defaultdict(list)
for interaction in interactions:
user_interactions[interaction.user_id].append(interaction)
for user in users:
self.user_profiles[user.user_id] = self._build_user_profile(
user, user_interactions[user.user_id]
)
def _extract_product_features(self, product: Product) -> Dict:
"""提取物品特征"""
features = {
'category': product.category,
'brand': product.brand,
'price_range': self._get_price_range(product.price),
'tags': set(product.tags),
'rating': product.rating,
'sales_count': product.sales_count
}
return features
def _get_price_range(self, price: float) -> str:
"""获取价格区间"""
if price < 50:
return "low"
elif price < 200:
return "medium"
else:
return "high"
def _build_user_profile(self, user: User, interactions: List[Interaction]) -> Dict:
"""构建用户画像"""
profile = {
'age_group': self._get_age_group(user.age),
'gender': user.gender,
'location': user.location,
'interests': set(user.interests),
'preferred_categories': Counter(),
'preferred_brands': Counter(),
'preferred_price_ranges': Counter(),
'preferred_tags': Counter()
}
# 基于交互历史更新偏好
for interaction in interactions:
if interaction.product_id in self.product_features:
features = self.product_features[interaction.product_id]
weight = self._get_action_weight(interaction.action)
profile['preferred_categories'][features['category']] += weight
profile['preferred_brands'][features['brand']] += weight
profile['preferred_price_ranges'][features['price_range']] += weight
for tag in features['tags']:
profile['preferred_tags'][tag] += weight
return profile
def _get_age_group(self, age: int) -> str:
"""获取年龄组"""
if age < 18:
return "teen"
elif age < 30:
return "young_adult"
elif age < 50:
return "adult"
else:
return "senior"
def _get_action_weight(self, action: str) -> float:
"""获取行为权重"""
weights = {
'view': 1.0,
'click': 2.0,
'cart': 3.0,
'favorite': 4.0,
'purchase': 5.0
}
return weights.get(action, 1.0)
def recommend_for_user(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""为用户推荐物品"""
if user_id not in self.user_profiles:
return []
user_profile = self.user_profiles[user_id]
item_scores = []
for product_id, features in self.product_features.items():
score = self._calculate_content_score(user_profile, features)
item_scores.append((product_id, score))
# 排序并返回top-k
item_scores.sort(key=lambda x: x[1], reverse=True)
return item_scores[:top_k]
def _calculate_content_score(self, user_profile: Dict, product_features: Dict) -> float:
"""计算内容匹配分数"""
score = 0.0
# 类别匹配
category_score = user_profile['preferred_categories'].get(product_features['category'], 0)
score += category_score * self.feature_weights['category']
# 品牌匹配
brand_score = user_profile['preferred_brands'].get(product_features['brand'], 0)
score += brand_score * self.feature_weights['brand']
# 价格区间匹配
price_score = user_profile['preferred_price_ranges'].get(product_features['price_range'], 0)
score += price_score * self.feature_weights['price_range']
# 标签匹配
tag_score = 0
for tag in product_features['tags']:
tag_score += user_profile['preferred_tags'].get(tag, 0)
score += tag_score * self.feature_weights['tags']
# 考虑物品质量和热度
score += product_features['rating'] * 0.1
score += min(product_features['sales_count'] / 1000, 1) * 0.1
return score
class HybridRecommender:
"""混合推荐器"""
def __init__(self, cf_weight: float = 0.6, content_weight: float = 0.4):
self.cf_recommender = CollaborativeFilteringRecommender()
self.content_recommender = ContentBasedRecommender()
self.cf_weight = cf_weight
self.content_weight = content_weight
def fit(self, users: List[User], products: List[Product], interactions: List[Interaction]):
"""训练混合推荐模型"""
self.cf_recommender.fit(interactions)
self.content_recommender.fit(users, products, interactions)
def recommend_for_user(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""混合推荐"""
# 协同过滤推荐
cf_recommendations = self.cf_recommender.recommend_for_user(user_id, top_k * 2)
cf_scores = {item_id: score for item_id, score in cf_recommendations}
# 基于内容推荐
content_recommendations = self.content_recommender.recommend_for_user(user_id, top_k * 2)
content_scores = {item_id: score for item_id, score in content_recommendations}
# 合并分数
combined_scores = defaultdict(float)
all_items = set(cf_scores.keys()) | set(content_scores.keys())
for item_id in all_items:
cf_score = cf_scores.get(item_id, 0)
content_score = content_scores.get(item_id, 0)
# 归一化分数
cf_score = min(cf_score / 5.0, 1.0) # 假设CF分数在0-5范围
content_score = min(content_score / 10.0, 1.0) # 假设内容分数在0-10范围
combined_scores[item_id] = (cf_score * self.cf_weight +
content_score * self.content_weight)
# 排序并返回top-k
sorted_recommendations = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_recommendations[:top_k]
class RecommendationSystem:
"""推荐系统主类"""
def __init__(self):
self.hybrid_recommender = HybridRecommender()
self.users = {}
self.products = {}
self.interactions = []
self.is_trained = False
def add_user(self, user: User):
"""添加用户"""
self.users[user.user_id] = user
def add_product(self, product: Product):
"""添加商品"""
self.products[product.product_id] = product
def add_interaction(self, interaction: Interaction):
"""添加交互"""
self.interactions.append(interaction)
self.is_trained = False
def train(self):
"""训练推荐模型"""
users_list = list(self.users.values())
products_list = list(self.products.values())
self.hybrid_recommender.fit(users_list, products_list, self.interactions)
self.is_trained = True
def recommend(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""获取推荐"""
if not self.is_trained:
self.train()
return self.hybrid_recommender.recommend_for_user(user_id, top_k)
def recommend_similar_products(self, product_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""推荐相似商品"""
if not self.is_trained:
self.train()
return self.hybrid_recommender.cf_recommender.recommend_similar_items(product_id, top_k)
def get_user_profile(self, user_id: str) -> Dict:
"""获取用户画像"""
return self.hybrid_recommender.content_recommender.user_profiles.get(user_id, {})
# 使用示例
if __name__ == "__main__":
# 创建推荐系统
rec_system = RecommendationSystem()
# 添加用户
users = [
User("user_1", 25, "male", "北京", ["电子产品", "运动"]),
User("user_2", 30, "female", "上海", ["服装", "美妆"]),
User("user_3", 28, "male", "广州", ["食品", "家居"])
]
for user in users:
rec_system.add_user(user)
# 添加商品
products = [
Product("product_1", "电子产品", "Apple", 5999, ["手机", "5G"], 4.8, 10000),
Product("product_2", "服装", "Nike", 599, ["运动鞋", "舒适"], 4.5, 5000),
Product("product_3", "美妆", "Lancome", 299, ["口红", "保湿"], 4.6, 3000),
Product("product_4", "食品", "三只松鼠", 99, ["坚果", "健康"], 4.7, 8000),
Product("product_5", "家居", "IKEA", 199, ["收纳", "简约"], 4.4, 2000)
]
for product in products:
rec_system.add_product(product)
# 添加交互
interactions = [
Interaction("user_1", "product_1", "view", time.time() - 3600),
Interaction("user_1", "product_1", "click", time.time() - 3000),
Interaction("user_1", "product_2", "view", time.time() - 2400),
Interaction("user_2", "product_3", "view", time.time() - 1800),
Interaction("user_2", "product_3", "purchase", time.time() - 1200),
Interaction("user_3", "product_4", "view", time.time() - 600),
Interaction("user_3", "product_5", "click", time.time() - 300)
]
for interaction in interactions:
rec_system.add_interaction(interaction)
# 训练模型
rec_system.train()
# 获取推荐
print("为user_1的推荐:")
recommendations = rec_system.recommend("user_1", top_k=3)
for product_id, score in recommendations:
product = rec_system.products[product_id]
print(f" {product.name} ({product.category}): {score:.3f}")
print("\n相似商品推荐 (product_1):")
similar_products = rec_system.recommend_similar_products("product_1", top_k=3)
for product_id, similarity in similar_products:
product = rec_system.products[product_id]
print(f" {product.name}: {similarity:.3f}")
# 查看用户画像
print("\nuser_1的用户画像:")
profile = rec_system.get_user_profile("user_1")
print(f" 偏好类别: {dict(profile.get('preferred_categories', []))}")
print(f" 偏好品牌: {dict(profile.get('preferred_brands', []))}")
print(f" 偏好价格区间: {dict(profile.get('preferred_price_ranges', []))}")
3. 实现一个库存管理系统
import threading
import time
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import heapq
import json
class InventoryStatus(Enum):
IN_STOCK = "in_stock"
LOW_STOCK = "low_stock"
OUT_OF_STOCK = "out_of_stock"
DISCONTINUED = "discontinued"
@dataclass
class Product:
product_id: str
name: str
category: str
price: float
cost: float
supplier: str
min_stock_level: int = 10
max_stock_level: int = 100
@dataclass
class StockItem:
product_id: str
quantity: int
reserved_quantity: int = 0
available_quantity: int = field(init=False)
last_updated: float = field(default_factory=time.time)
def __post_init__(self):
self.available_quantity = self.quantity - self.reserved_quantity
def update_available_quantity(self):
"""更新可用数量"""
self.available_quantity = max(0, self.quantity - self.reserved_quantity)
self.last_updated = time.time()
def get_status(self, min_stock_level: int) -> InventoryStatus:
"""获取库存状态"""
if self.quantity == 0:
return InventoryStatus.OUT_OF_STOCK
elif self.quantity <= min_stock_level:
return InventoryStatus.LOW_STOCK
else:
return InventoryStatus.IN_STOCK
@dataclass
class StockTransaction:
transaction_id: str
product_id: str
transaction_type: str # 'in', 'out', 'reserve', 'release'
quantity: int
timestamp: float = field(default_factory=time.time)
reference_id: Optional[str] = None
reason: str = ""
class InventoryManager:
"""库存管理器"""
def __init__(self):
self.products: Dict[str, Product] = {}
self.stock_items: Dict[str, StockItem] = {}
self.transactions: List[StockTransaction] = []
self.lock = threading.RLock()
# 预警系统
self.low_stock_threshold = 0.2 # 低库存阈值比例
self.out_of_stock_alerts = set()
self.low_stock_alerts = set()
# 统计信息
self.stats = {
'total_products': 0,
'total_value': 0.0,
'low_stock_count': 0,
'out_of_stock_count': 0
}
def add_product(self, product: Product):
"""添加商品"""
with self.lock:
self.products[product.product_id] = product
if product.product_id not in self.stock_items:
self.stock_items[product.product_id] = StockItem(product.product_id, 0)
self._update_stats()
def add_stock(self, product_id: str, quantity: int, reason: str = "入库") -> bool:
"""增加库存"""
with self.lock:
if product_id not in self.products:
return False
stock_item = self.stock_items[product_id]
stock_item.quantity += quantity
stock_item.update_available_quantity()
# 记录交易
transaction = StockTransaction(
f"IN_{int(time.time())}_{product_id}",
product_id, 'in', quantity, reason=reason
)
self.transactions.append(transaction)
# 检查预警
self._check_stock_alerts(product_id)
self._update_stats()
return True
def remove_stock(self, product_id: str, quantity: int,
reason: str = "出库") -> bool:
"""减少库存"""
with self.lock:
if product_id not in self.stock_items:
return False
stock_item = self.stock_items[product_id]
if stock_item.available_quantity < quantity:
return False
stock_item.quantity -= quantity
stock_item.update_available_quantity()
# 记录交易
transaction = StockTransaction(
f"OUT_{int(time.time())}_{product_id}",
product_id, 'out', quantity, reason=reason
)
self.transactions.append(transaction)
# 检查预警
self._check_stock_alerts(product_id)
self._update_stats()
return True
def reserve_stock(self, product_id: str, quantity: int,
reference_id: str) -> bool:
"""预留库存"""
with self.lock:
if product_id not in self.stock_items:
return False
stock_item = self.stock_items[product_id]
if stock_item.available_quantity < quantity:
return False
stock_item.reserved_quantity += quantity
stock_item.update_available_quantity()
# 记录交易
transaction = StockTransaction(
f"RESERVE_{int(time.time())}_{product_id}",
product_id, 'reserve', quantity, reference_id=reference_id
)
self.transactions.append(transaction)
return True
def release_stock(self, product_id: str, quantity: int,
reference_id: str) -> bool:
"""释放预留库存"""
with self.lock:
if product_id not in self.stock_items:
return False
stock_item = self.stock_items[product_id]
if stock_item.reserved_quantity < quantity:
return False
stock_item.reserved_quantity -= quantity
stock_item.update_available_quantity()
# 记录交易
transaction = StockTransaction(
f"RELEASE_{int(time.time())}_{product_id}",
product_id, 'release', quantity, reference_id=reference_id
)
self.transactions.append(transaction)
return True
def confirm_reserved_stock(self, product_id: str, quantity: int,
reference_id: str) -> bool:
"""确认预留库存(实际出库)"""
with self.lock:
if product_id not in self.stock_items:
return False
stock_item = self.stock_items[product_id]
if stock_item.reserved_quantity < quantity:
return False
# 先释放预留,再减少库存
stock_item.reserved_quantity -= quantity
stock_item.quantity -= quantity
stock_item.update_available_quantity()
# 记录交易
transaction = StockTransaction(
f"CONFIRM_{int(time.time())}_{product_id}",
product_id, 'out', quantity, reference_id=reference_id,
reason="确认预留库存"
)
self.transactions.append(transaction)
# 检查预警
self._check_stock_alerts(product_id)
self._update_stats()
return True
def get_stock_level(self, product_id: str) -> Optional[StockItem]:
"""获取库存水平"""
with self.lock:
return self.stock_items.get(product_id)
def get_stock_status(self, product_id: str) -> Optional[InventoryStatus]:
"""获取库存状态"""
with self.lock:
if product_id not in self.stock_items or product_id not in self.products:
return None
stock_item = self.stock_items[product_id]
product = self.products[product_id]
return stock_item.get_status(product.min_stock_level)
def get_low_stock_products(self) -> List[Tuple[str, StockItem]]:
"""获取低库存商品"""
with self.lock:
low_stock = []
for product_id, stock_item in self.stock_items.items():
if product_id in self.products:
product = self.products[product_id]
status = stock_item.get_status(product.min_stock_level)
if status == InventoryStatus.LOW_STOCK:
low_stock.append((product_id, stock_item))
return low_stock
def get_out_of_stock_products(self) -> List[Tuple[str, StockItem]]:
"""获取缺货商品"""
with self.lock:
out_of_stock = []
for product_id, stock_item in self.stock_items.items():
if stock_item.quantity == 0:
out_of_stock.append((product_id, stock_item))
return out_of_stock
def get_transaction_history(self, product_id: str = None,
start_time: float = None,
end_time: float = None) -> List[StockTransaction]:
"""获取交易历史"""
with self.lock:
transactions = self.transactions.copy()
# 过滤条件
if product_id:
transactions = [t for t in transactions if t.product_id == product_id]
if start_time:
transactions = [t for t in transactions if t.timestamp >= start_time]
if end_time:
transactions = [t for t in transactions if t.timestamp <= end_time]
return sorted(transactions, key=lambda x: x.timestamp, reverse=True)
def _check_stock_alerts(self, product_id: str):
"""检查库存预警"""
if product_id not in self.stock_items or product_id not in self.products:
return
stock_item = self.stock_items[product_id]
product = self.products[product_id]
status = stock_item.get_status(product.min_stock_level)
if status == InventoryStatus.OUT_OF_STOCK:
self.out_of_stock_alerts.add(product_id)
elif status == InventoryStatus.LOW_STOCK:
self.low_stock_alerts.add(product_id)
else:
# 移除预警
self.out_of_stock_alerts.discard(product_id)
self.low_stock_alerts.discard(product_id)
def _update_stats(self):
"""更新统计信息"""
self.stats['total_products'] = len(self.products)
self.stats['total_value'] = 0.0
self.stats['low_stock_count'] = 0
self.stats['out_of_stock_count'] = 0
for product_id, stock_item in self.stock_items.items():
if product_id in self.products:
product = self.products[product_id]
self.stats['total_value'] += stock_item.quantity * product.cost
status = stock_item.get_status(product.min_stock_level)
if status == InventoryStatus.LOW_STOCK:
self.stats['low_stock_count'] += 1
elif status == InventoryStatus.OUT_OF_STOCK:
self.stats['out_of_stock_count'] += 1
def get_stats(self) -> Dict:
"""获取统计信息"""
with self.lock:
return self.stats.copy()
def get_alerts(self) -> Dict:
"""获取预警信息"""
with self.lock:
return {
'out_of_stock': list(self.out_of_stock_alerts),
'low_stock': list(self.low_stock_alerts)
}
class AutoReplenishmentSystem:
"""自动补货系统"""
def __init__(self, inventory_manager: InventoryManager):
self.inventory_manager = inventory_manager
self.replenishment_rules = {}
self.replenishment_queue = []
self.is_running = False
self.worker_thread = None
def add_replenishment_rule(self, product_id: str,
reorder_point: int,
reorder_quantity: int,
max_order_quantity: int = 1000):
"""添加补货规则"""
self.replenishment_rules[product_id] = {
'reorder_point': reorder_point,
'reorder_quantity': reorder_quantity,
'max_order_quantity': max_order_quantity
}
def start_monitoring(self, check_interval: int = 60):
"""开始监控"""
self.is_running = True
self.worker_thread = threading.Thread(
target=self._monitoring_loop,
args=(check_interval,),
daemon=True
)
self.worker_thread.start()
def stop_monitoring(self):
"""停止监控"""
self.is_running = False
if self.worker_thread:
self.worker_thread.join(timeout=5)
def _monitoring_loop(self, check_interval: int):
"""监控循环"""
while self.is_running:
try:
self._check_replenishment()
time.sleep(check_interval)
except Exception as e:
print(f"补货监控出错: {e}")
time.sleep(10)
def _check_replenishment(self):
"""检查补货需求"""
for product_id, rule in self.replenishment_rules.items():
stock_item = self.inventory_manager.get_stock_level(product_id)
if stock_item and stock_item.quantity <= rule['reorder_point']:
# 计算补货数量
needed_quantity = rule['reorder_quantity']
# 考虑当前预留库存
total_needed = needed_quantity + stock_item.reserved_quantity
# 限制最大订单数量
order_quantity = min(total_needed, rule['max_order_quantity'])
# 创建补货订单
replenishment_order = {
'product_id': product_id,
'quantity': order_quantity,
'timestamp': time.time(),
'priority': 'high' if stock_item.quantity == 0 else 'normal'
}
self.replenishment_queue.append(replenishment_order)
print(f"生成补货订单: {replenishment_order}")
def get_replenishment_queue(self) -> List[Dict]:
"""获取补货队列"""
return self.replenishment_queue.copy()
def process_replenishment(self, order_index: int) -> bool:
"""处理补货订单"""
if order_index >= len(self.replenishment_queue):
return False
order = self.replenishment_queue.pop(order_index)
# 执行补货
success = self.inventory_manager.add_stock(
order['product_id'],
order['quantity'],
reason="自动补货"
)
if success:
print(f"补货成功: {order['product_id']} x {order['quantity']}")
else:
print(f"补货失败: {order['product_id']}")
return success
# 使用示例
if __name__ == "__main__":
# 创建库存管理器
inventory = InventoryManager()
# 添加商品
products = [
Product("p001", "iPhone 13", "电子产品", 5999, 4000, "Apple", 5, 50),
Product("p002", "运动鞋", "服装", 599, 300, "Nike", 10, 100),
Product("p003", "口红", "美妆", 299, 150, "Lancome", 20, 200),
]
for product in products:
inventory.add_product(product)
# 初始库存
inventory.add_stock("p001", 30, "初始入库")
inventory.add_stock("p002", 15, "初始入库")
inventory.add_stock("p003", 25, "初始入库")
# 查看库存状态
print("初始库存状态:")
for product_id in ["p001", "p002", "p003"]:
stock = inventory.get_stock_level(product_id)
status = inventory.get_stock_status(product_id)
print(f" {product_id}: 数量={stock.quantity}, 可用={stock.available_quantity}, 状态={status.value}")
# 预留库存
print("\n预留库存:")
inventory.reserve_stock("p001", 5, "order_001")
inventory.reserve_stock("p002", 8, "order_002")
for product_id in ["p001", "p002"]:
stock = inventory.get_stock_level(product_id)
print(f" {product_id}: 数量={stock.quantity}, 预留={stock.reserved_quantity}, 可用={stock.available_quantity}")
# 确认预留库存
print("\n确认预留库存:")
inventory.confirm_reserved_stock("p001", 3, "order_001")
stock = inventory.get_stock_level("p001")
print(f" p001: 数量={stock.quantity}, 预留={stock.reserved_quantity}, 可用={stock.available_quantity}")
# 释放预留库存
inventory.release_stock("p002", 5, "order_002")
stock = inventory.get_stock_level("p002")
print(f" p002: 数量={stock.quantity}, 预留={stock.reserved_quantity}, 可用={stock.available_quantity}")
# 查看预警
print("\n库存预警:")
alerts = inventory.get_alerts()
print(f" 缺货商品: {alerts['out_of_stock']}")
print(f" 低库存商品: {alerts['low_stock']}")
# 查看统计信息
print("\n统计信息:")
stats = inventory.get_stats()
for key, value in stats.items():
print(f" {key}: {value}")
# 自动补货系统
print("\n自动补货系统:")
auto_replenish = AutoReplenishmentSystem(inventory)
# 添加补货规则
auto_replenish.add_replenishment_rule("p001", 10, 20, 50)
auto_replenish.add_replenishment_rule("p002", 5, 15, 30)
# 消耗一些库存
inventory.remove_stock("p001", 20, "销售")
inventory.remove_stock("p002", 8, "销售")
# 检查补货需求
auto_replenish._check_replenishment()
# 查看补货队列
queue = auto_replenish.get_replenishment_queue()
print(f"补货队列: {queue}")
# 处理补货
if queue:
auto_replenish.process_replenishment(0)
# 查看最终库存
print("\n最终库存状态:")
for product_id in ["p001", "p002", "p003"]:
stock = inventory.get_stock_level(product_id)
status = inventory.get_stock_status(product_id)
print(f" {product_id}: 数量={stock.quantity}, 可用={stock.available_quantity}, 状态={status.value}")
[继续下一部分…]
🔧 第二部分:系统设计 (25题)
O2O系统 (10题)
4. 实现一个商家评分系统
import time
import threading
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import statistics
import math
class RatingType(Enum):
FOOD_QUALITY = "food_quality"
SERVICE = "service"
ENVIRONMENT = "environment"
DELIVERY = "delivery"
VALUE = "value"
@dataclass
class Rating:
rating_id: str
user_id: str
merchant_id: str
order_id: str
overall_score: float # 总体评分 1-5
detailed_scores: Dict[RatingType, float] = field(default_factory=dict)
comment: str = ""
images: List[str] = field(default_factory=list)
timestamp: float = field(default_factory=time.time)
helpful_count: int = 0
is_verified: bool = False
@dataclass
class Merchant:
merchant_id: str
name: str
category: str
location: str
registration_time: float = field(default_factory=time.time)
class RatingSystem:
"""商家评分系统"""
def __init__(self):
self.merchants: Dict[str, Merchant] = {}
self.ratings: Dict[str, Rating] = {}
self.merchant_ratings: Dict[str, List[str]] = defaultdict(list) # merchant_id -> rating_ids
self.user_ratings: Dict[str, List[str]] = defaultdict(list) # user_id -> rating_ids
# 评分统计缓存
self.rating_stats_cache: Dict[str, Dict] = {}
self.cache_lock = threading.RLock()
self.cache_ttl = 300 # 缓存5分钟
# 反垃圾评分系统
self.spam_detector = SpamDetector()
# 权重配置
self.rating_weights = {
'recency': 0.3, # 时间权重
'quality': 0.4, # 评分质量权重
'user_credibility': 0.3 # 用户可信度权重
}
def add_merchant(self, merchant: Merchant):
"""添加商家"""
self.merchants[merchant.merchant_id] = merchant
def add_rating(self, rating: Rating) -> bool:
"""添加评分"""
# 验证评分
if not self._validate_rating(rating):
return False
# 检查是否为垃圾评分
if self.spam_detector.is_spam(rating):
return False
# 保存评分
self.ratings[rating.rating_id] = rating
self.merchant_ratings[rating.merchant_id].append(rating.rating_id)
self.user_ratings[rating.user_id].append(rating.rating_id)
# 清除缓存
self._clear_cache(rating.merchant_id)
return True
def _validate_rating(self, rating: Rating) -> bool:
"""验证评分有效性"""
# 检查评分范围
if not (1 <= rating.overall_score <= 5):
return False
# 检查详细评分
for score_type, score in rating.detailed_scores.items():
if not (1 <= score <= 5):
return False
# 检查商家是否存在
if rating.merchant_id not in self.merchants:
return False
# 检查用户是否已经对该订单评分
for rating_id in self.user_ratings[rating.user_id]:
existing_rating = self.ratings[rating_id]
if existing_rating.order_id == rating.order_id:
return False # 同一订单不能重复评分
return True
def get_merchant_rating_stats(self, merchant_id: str) -> Dict:
"""获取商家评分统计"""
# 检查缓存
cached_stats = self._get_cached_stats(merchant_id)
if cached_stats:
return cached_stats
if merchant_id not in self.merchant_ratings:
return self._empty_stats()
rating_ids = self.merchant_ratings[merchant_id]
if not rating_ids:
return self._empty_stats()
ratings = [self.ratings[rating_id] for rating_id in rating_ids]
# 计算基础统计
overall_scores = [r.overall_score for r in ratings]
overall_avg = statistics.mean(overall_scores)
overall_median = statistics.median(overall_scores)
# 计算详细评分统计
detailed_stats = {}
for rating_type in RatingType:
scores = []
for r in ratings:
if rating_type in r.detailed_scores:
scores.append(r.detailed_scores[rating_type])
if scores:
detailed_stats[rating_type.value] = {
'average': statistics.mean(scores),
'median': statistics.median(scores),
'count': len(scores)
}
# 计算加权评分
weighted_score = self._calculate_weighted_score(ratings)
# 计算评分分布
score_distribution = self._calculate_score_distribution(overall_scores)
# 计算趋势
trend = self._calculate_rating_trend(ratings)
stats = {
'overall_average': overall_avg,
'overall_median': overall_median,
'weighted_score': weighted_score,
'total_ratings': len(ratings),
'detailed_scores': detailed_stats,
'score_distribution': score_distribution,
'trend': trend,
'last_updated': time.time()
}
# 缓存结果
self._cache_stats(merchant_id, stats)
return stats
def _calculate_weighted_score(self, ratings: List[Rating]) -> float:
"""计算加权评分"""
if not ratings:
return 0.0
current_time = time.time()
weighted_sum = 0.0
total_weight = 0.0
for rating in ratings:
# 时间权重(越近的评分权重越高)
days_ago = (current_time - rating.timestamp) / (24 * 3600)
recency_weight = math.exp(-days_ago / 30) # 30天衰减
# 质量权重(有详细评分和评论的权重更高)
quality_weight = 1.0
if rating.detailed_scores:
quality_weight += 0.2
if rating.comment and len(rating.comment) > 10:
quality_weight += 0.1
if rating.images:
quality_weight += 0.1
# 用户可信度权重(基于用户历史评分)
user_credibility = self._get_user_credibility(rating.user_id)
# 总权重
total_weight = (
self.rating_weights['recency'] * recency_weight +
self.rating_weights['quality'] * quality_weight +
self.rating_weights['user_credibility'] * user_credibility
)
weighted_sum += rating.overall_score * total_weight
total_weight += total_weight
return weighted_sum / total_weight if total_weight > 0 else 0.0
def _get_user_credibility(self, user_id: str) -> float:
"""获取用户可信度"""
if user_id not in self.user_ratings:
return 0.5 # 新用户默认可信度
rating_ids = self.user_ratings[user_id]
rating_count = len(rating_ids)
# 基于评分数量计算可信度
if rating_count < 3:
return 0.3
elif rating_count < 10:
return 0.5
elif rating_count < 50:
return 0.7
else:
return 0.9
def _calculate_score_distribution(self, scores: List[float]) -> Dict[int, int]:
"""计算评分分布"""
distribution = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
for score in scores:
distribution[int(score)] += 1
return distribution
def _calculate_rating_trend(self, ratings: List[Rating]) -> str:
"""计算评分趋势"""
if len(ratings) < 10:
return "insufficient_data"
# 按时间排序
sorted_ratings = sorted(ratings, key=lambda x: x.timestamp)
# 比较最近30天和前30天的平均分
current_time = time.time()
thirty_days_ago = current_time - 30 * 24 * 3600
sixty_days_ago = current_time - 60 * 24 * 3600
recent_scores = [r.overall_score for r in sorted_ratings if r.timestamp >= thirty_days_ago]
previous_scores = [r.overall_score for r in sorted_ratings
if sixty_days_ago <= r.timestamp < thirty_days_ago]
if not recent_scores or not previous_scores:
return "insufficient_data"
recent_avg = statistics.mean(recent_scores)
previous_avg = statistics.mean(previous_scores)
if recent_avg > previous_avg + 0.2:
return "improving"
elif recent_avg < previous_avg - 0.2:
return "declining"
else:
return "stable"
def get_top_merchants(self, category: str = None,
limit: int = 10) -> List[Tuple[str, float]]:
"""获取评分最高的商家"""
merchant_scores = []
for merchant_id, merchant in self.merchants.items():
if category and merchant.category != category:
continue
stats = self.get_merchant_rating_stats(merchant_id)
weighted_score = stats['weighted_score']
# 只考虑有足够评分的商家
if stats['total_ratings'] >= 5:
merchant_scores.append((merchant_id, weighted_score))
# 排序并返回top-k
merchant_scores.sort(key=lambda x: x[1], reverse=True)
return merchant_scores[:limit]
def mark_rating_helpful(self, rating_id: str, user_id: str) -> bool:
"""标记评分有用"""
if rating_id not in self.ratings:
return False
rating = self.ratings[rating_id]
rating.helpful_count += 1
# 清除相关缓存
self._clear_cache(rating.merchant_id)
return True
def verify_rating(self, rating_id: str) -> bool:
"""验证评分(管理员功能)"""
if rating_id not in self.ratings:
return False
rating = self.ratings[rating_id]
rating.is_verified = True
# 清除相关缓存
self._clear_cache(rating.merchant_id)
return True
def _empty_stats(self) -> Dict:
"""空统计"""
return {
'overall_average': 0.0,
'overall_median': 0.0,
'weighted_score': 0.0,
'total_ratings': 0,
'detailed_scores': {},
'score_distribution': {1: 0, 2: 0, 3: 0, 4: 0, 5: 0},
'trend': 'no_data',
'last_updated': time.time()
}
def _cache_stats(self, merchant_id: str, stats: Dict):
"""缓存统计结果"""
with self.cache_lock:
self.rating_stats_cache[merchant_id] = {
'stats': stats,
'timestamp': time.time()
}
def _get_cached_stats(self, merchant_id: str) -> Optional[Dict]:
"""获取缓存的统计结果"""
with self.cache_lock:
cached = self.rating_stats_cache.get(merchant_id)
if cached and time.time() - cached['timestamp'] < self.cache_ttl:
return cached['stats']
return None
def _clear_cache(self, merchant_id: str):
"""清除缓存"""
with self.cache_lock:
self.rating_stats_cache.pop(merchant_id, None)
class SpamDetector:
"""垃圾评分检测器"""
def __init__(self):
self.suspicious_patterns = [
'刷单',
'好评返现',
'五星好评',
'默认好评'
]
self.user_rating_frequency = defaultdict(list)
self.merchant_rating_frequency = defaultdict(list)
def is_spam(self, rating: Rating) -> bool:
"""检测是否为垃圾评分"""
# 检查评论内容
if self._contains_suspicious_content(rating.comment):
return True
# 检查评分频率
if self._is_abnormal_frequency(rating):
return True
# 检查评分模式
if self._is_abnormal_pattern(rating):
return True
return False
def _contains_suspicious_content(self, comment: str) -> bool:
"""检查是否包含可疑内容"""
comment_lower = comment.lower()
for pattern in self.suspicious_patterns:
if pattern in comment_lower:
return True
return False
def _is_abnormal_frequency(self, rating: Rating) -> bool:
"""检查评分频率是否异常"""
current_time = time.time()
user_ratings = self.user_rating_frequency[rating.user_id]
# 清理过期记录(24小时前)
user_ratings[:] = [t for t in user_ratings if current_time - t < 24 * 3600]
# 添加当前评分
user_ratings.append(current_time)
# 检查24小时内评分次数
if len(user_ratings) > 20: # 24小时内超过20次评分
return True
return False
def _is_abnormal_pattern(self, rating: Rating) -> bool:
"""检查评分模式是否异常"""
# 检查是否总是给5分
# 这里需要更复杂的逻辑,暂时简化
return False
# 使用示例
if __name__ == "__main__":
# 创建评分系统
rating_system = RatingSystem()
# 添加商家
merchants = [
Merchant("m001", "川味小厨", "川菜", "朝阳区"),
Merchant("m002", "粤式茶餐厅", "粤菜", "海淀区"),
Merchant("m003", "日式拉面馆", "日料", "东城区"),
]
for merchant in merchants:
rating_system.add_merchant(merchant)
# 添加评分
ratings = [
Rating("r001", "user_001", "m001", "order_001", 4.5,
{RatingType.FOOD_QUALITY: 4.0, RatingType.SERVICE: 5.0,
RatingType.DELIVERY: 4.5}, "味道不错,服务很好"),
Rating("r002", "user_002", "m001", "order_002", 3.5,
{RatingType.FOOD_QUALITY: 3.0, RatingType.SERVICE: 4.0}, "一般般"),
Rating("r003", "user_003", "m002", "order_003", 5.0,
{RatingType.FOOD_QUALITY: 5.0, RatingType.SERVICE: 5.0}, "非常满意"),
Rating("r004", "user_004", "m002", "order_004", 4.0,
{RatingType.FOOD_QUALITY: 4.0, RatingType.DELIVERY: 4.0}, "还不错"),
Rating("r005", "user_005", "m003", "order_005", 4.8,
{RatingType.FOOD_QUALITY: 5.0, RatingType.SERVICE: 4.5}, "拉面很正宗"),
]
for rating in ratings:
rating_system.add_rating(rating)
# 查看商家评分统计
print("商家评分统计:")
for merchant_id in ["m001", "m002", "m003"]:
stats = rating_system.get_merchant_rating_stats(merchant_id)
merchant = rating_system.merchants[merchant_id]
print(f"\n{merchant.name}:")
print(f" 总体平均分: {stats['overall_average']:.2f}")
print(f" 加权评分: {stats['weighted_score']:.2f}")
print(f" 评分数量: {stats['total_ratings']}")
print(f" 评分趋势: {stats['trend']}")
print(f" 评分分布: {stats['score_distribution']}")
# 获取top商家
print("\nTop商家:")
top_merchants = rating_system.get_top_merchants(limit=3)
for merchant_id, score in top_merchants:
merchant = rating_system.merchants[merchant_id]
print(f" {merchant.name}: {score:.2f}")
# 标记评分有用
rating_system.mark_rating_helpful("r001", "user_006")
# 验证评分
rating_system.verify_rating("r003")
# 查看更新后的统计
print("\n更新后的m001统计:")
stats = rating_system.get_merchant_rating_stats("m001")
print(f" 加权评分: {stats['weighted_score']:.2f}")
更多推荐
所有评论(0)