互联网大厂经典面试题100道 - 美团、拼多多篇

📋 总体提纲

🎯 第一部分:编程基础 (30题)

  • 数据结构与算法 (15题):堆、树、图、动态规划、贪心算法
  • 编程语言 (10题):Java/Python/C++、多线程、网络编程
  • 设计模式 (5题):策略、模板、观察者、工厂、单例

🔧 第二部分:系统设计 (25题)

  • O2O系统 (10题):位置服务、商家管理、订单系统、配送调度
  • 电商系统 (8题):商品管理、库存系统、价格策略、促销活动
  • 推荐系统 (7题):协同过滤、内容推荐、实时推荐、A/B测试

🌐 第三部分:公司特色题目 (45题)

  • 美团特色 (25题):外卖配送、商家入驻、用户增长、数据分析
  • 拼多多特色 (20题):社交电商、拼团模式、农产品上行、百亿补贴

📝 详细题目与答案

🎯 第一部分:编程基础 (30题)

数据结构与算法 (15题)

1. 实现一个高效的配送路径规划算法

import heapq
import math
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
import time

@dataclass
class Location:
    """位置信息"""
    lat: float  # 纬度
    lng: float  # 经度
    address: str = ""
    
    def distance_to(self, other: 'Location') -> float:
        """计算两点间距离(使用Haversine公式)"""
        R = 6371  # 地球半径(公里)
        
        lat1, lon1 = math.radians(self.lat), math.radians(self.lng)
        lat2, lon2 = math.radians(other.lat), math.radians(other.lng)
        
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        
        a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
        c = 2 * math.asin(math.sqrt(a))
        
        return R * c

@dataclass
class DeliveryOrder:
    """配送订单"""
    order_id: str
    restaurant: Location
    customer: Location
    priority: int = 1  # 优先级,数字越小优先级越高
    preparation_time: int = 0  # 准备时间(分钟)
    deadline: Optional[float] = None  # 截止时间(时间戳)

class DeliveryRouteOptimizer:
    """配送路径优化器"""
    def __init__(self, depot: Location):
        self.depot = depot  # 配送中心位置
        self.speed = 20  # 平均配送速度(公里/小时)
    
    def optimize_single_route(self, orders: List[DeliveryOrder]) -> List[DeliveryOrder]:
        """优化单个骑手的配送路径(TSP变种)"""
        if not orders:
            return []
        
        # 使用贪心算法的改进版本
        unvisited = orders.copy()
        route = []
        current_location = self.depot
        
        while unvisited:
            best_order = None
            best_score = float('inf')
            
            for order in unvisited:
                # 计算从当前位置到餐厅的距离
                to_restaurant = current_location.distance_to(order.restaurant)
                # 计算从餐厅到客户的距离
                to_customer = order.restaurant.distance_to(order.customer)
                
                # 综合评分:距离 + 优先级 + 时间约束
                distance_score = (to_restaurant + to_customer) / self.speed * 60  # 转换为分钟
                priority_score = order.priority * 10  # 优先级权重
                
                time_score = 0
                if order.deadline:
                    current_time = time.time()
                    estimated_arrival = current_time + distance_score + order.preparation_time
                    if estimated_arrival > order.deadline:
                        time_score = (estimated_arrival - order.deadline) * 100  # 超时惩罚
                
                total_score = distance_score + priority_score + time_score
                
                if total_score < best_score:
                    best_score = total_score
                    best_order = order
            
            if best_order:
                route.append(best_order)
                unvisited.remove(best_order)
                current_location = best_order.customer
        
        return route
    
    def optimize_multiple_routes(self, orders: List[DeliveryOrder], 
                               num_riders: int) -> List[List[DeliveryOrder]]:
        """优化多个骑手的配送路径(车辆路径问题)"""
        if not orders:
            return []
        
        # 按优先级和地理位置聚类
        clusters = self._cluster_orders(orders, num_riders)
        
        # 为每个聚类优化路径
        routes = []
        for cluster in clusters:
            route = self.optimize_single_route(cluster)
            if route:
                routes.append(route)
        
        return routes
    
    def _cluster_orders(self, orders: List[DeliveryOrder], 
                       num_clusters: int) -> List[List[DeliveryOrder]]:
        """使用K-means聚类订单"""
        if len(orders) <= num_clusters:
            return [[order] for order in orders]
        
        # 简化的K-means实现
        import random
        
        # 随机选择初始中心点
        centers = random.sample(orders, num_clusters)
        clusters = [[] for _ in range(num_clusters)]
        
        for _ in range(10):  # 迭代次数
            # 清空聚类
            clusters = [[] for _ in range(num_clusters)]
            
            # 分配订单到最近的聚类
            for order in orders:
                min_dist = float('inf')
                best_cluster = 0
                
                for i, center in enumerate(centers):
                    dist = order.restaurant.distance_to(center.restaurant)
                    if dist < min_dist:
                        min_dist = dist
                        best_cluster = i
                
                clusters[best_cluster].append(order)
            
            # 更新聚类中心
            for i, cluster in enumerate(clusters):
                if cluster:
                    # 计算平均位置作为新中心
                    avg_lat = sum(order.restaurant.lat for order in cluster) / len(cluster)
                    avg_lng = sum(order.restaurant.lng for order in cluster) / len(cluster)
                    
                    # 找到最接近平均位置的订单作为新中心
                    min_dist = float('inf')
                    best_order = None
                    for order in cluster:
                        dist = math.sqrt((order.restaurant.lat - avg_lat)**2 + 
                                       (order.restaurant.lng - avg_lng)**2)
                        if dist < min_dist:
                            min_dist = dist
                            best_order = order
                    
                    if best_order:
                        centers[i] = best_order
        
        return clusters
    
    def calculate_route_distance(self, route: List[DeliveryOrder]) -> float:
        """计算路径总距离"""
        if not route:
            return 0.0
        
        total_distance = 0.0
        current_location = self.depot
        
        for order in route:
            total_distance += current_location.distance_to(order.restaurant)
            total_distance += order.restaurant.distance_to(order.customer)
            current_location = order.customer
        
        # 返回配送中心
        total_distance += current_location.distance_to(self.depot)
        
        return total_distance
    
    def estimate_delivery_time(self, route: List[DeliveryOrder]) -> float:
        """估算配送时间(分钟)"""
        if not route:
            return 0.0
        
        total_time = 0.0
        current_location = self.depot
        
        for order in route:
            # 到餐厅的时间
            travel_time = current_location.distance_to(order.restaurant) / self.speed * 60
            total_time += travel_time
            
            # 等待准备时间
            total_time += max(0, order.preparation_time - total_time)
            
            # 到客户的时间
            travel_time = order.restaurant.distance_to(order.customer) / self.speed * 60
            total_time += travel_time
            
            current_location = order.customer
        
        # 返回配送中心的时间
        total_time += current_location.distance_to(self.depot) / self.speed * 60
        
        return total_time

class RealTimeRouteOptimizer:
    """实时路径优化器"""
    def __init__(self, optimizer: DeliveryRouteOptimizer):
        self.optimizer = optimizer
        self.active_routes: Dict[str, List[DeliveryOrder]] = {}  # rider_id -> route
        self.pending_orders: List[DeliveryOrder] = []
        self.completed_orders: List[str] = []
    
    def add_order(self, order: DeliveryOrder):
        """添加新订单"""
        self.pending_orders.append(order)
        self._rebalance_routes()
    
    def complete_order(self, rider_id: str, order_id: str):
        """完成订单"""
        if rider_id in self.active_routes:
            route = self.active_routes[rider_id]
            self.active_routes[rider_id] = [o for o in route if o.order_id != order_id]
            self.completed_orders.append(order_id)
        
        self._rebalance_routes()
    
    def _rebalance_routes(self):
        """重新平衡路径"""
        # 收集所有未完成的订单
        all_orders = self.pending_orders.copy()
        for route in self.active_routes.values():
            all_orders.extend(route)
        
        # 重新优化路径
        num_riders = len(self.active_routes) + max(1, len(self.pending_orders) // 5)
        new_routes = self.optimizer.optimize_multiple_routes(all_orders, num_riders)
        
        # 更新路径
        self.active_routes.clear()
        self.pending_orders.clear()
        
        for i, route in enumerate(new_routes):
            rider_id = f"rider_{i+1}"
            self.active_routes[rider_id] = route
    
    def get_route_for_rider(self, rider_id: str) -> List[DeliveryOrder]:
        """获取骑手的配送路径"""
        return self.active_routes.get(rider_id, [])
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        total_orders = len(self.completed_orders) + len(self.pending_orders)
        for route in self.active_routes.values():
            total_orders += len(route)
        
        return {
            'total_orders': total_orders,
            'completed_orders': len(self.completed_orders),
            'pending_orders': len(self.pending_orders),
            'active_riders': len(self.active_routes),
            'orders_per_rider': {
                rider_id: len(route) 
                for rider_id, route in self.active_routes.items()
            }
        }

# 使用示例
if __name__ == "__main__":
    # 创建配送中心
    depot = Location(39.9042, 116.4074, "北京市朝阳区")
    
    # 创建路径优化器
    optimizer = DeliveryRouteOptimizer(depot)
    
    # 创建一些测试订单
    orders = [
        DeliveryOrder(
            "order_1",
            Location(39.9100, 116.4100, "餐厅A"),
            Location(39.9150, 116.4150, "客户1"),
            priority=1,
            preparation_time=15
        ),
        DeliveryOrder(
            "order_2",
            Location(39.9050, 116.4050, "餐厅B"),
            Location(39.9200, 116.4200, "客户2"),
            priority=2,
            preparation_time=10
        ),
        DeliveryOrder(
            "order_3",
            Location(39.9000, 116.4000, "餐厅C"),
            Location(39.9250, 116.4250, "客户3"),
            priority=1,
            preparation_time=20
        ),
        DeliveryOrder(
            "order_4",
            Location(39.9120, 116.4080, "餐厅D"),
            Location(39.9180, 116.4120, "客户4"),
            priority=3,
            preparation_time=5
        ),
    ]
    
    # 优化单个骑手路径
    single_route = optimizer.optimize_single_route(orders)
    print("单个骑手优化路径:")
    for i, order in enumerate(single_route):
        print(f"  {i+1}. {order.order_id}: {order.restaurant.address} -> {order.customer.address}")
    
    distance = optimizer.calculate_route_distance(single_route)
    time_needed = optimizer.estimate_delivery_time(single_route)
    print(f"总距离: {distance:.2f}公里")
    print(f"预计时间: {time_needed:.2f}分钟")
    
    # 优化多个骑手路径
    print("\n多个骑手优化路径:")
    multi_routes = optimizer.optimize_multiple_routes(orders, 2)
    for i, route in enumerate(multi_routes):
        print(f"骑手 {i+1}:")
        for j, order in enumerate(route):
            print(f"  {j+1}. {order.order_id}")
    
    # 实时优化测试
    print("\n实时优化测试:")
    real_time_optimizer = RealTimeRouteOptimizer(optimizer)
    
    # 添加订单
    for order in orders[:2]:
        real_time_optimizer.add_order(order)
    
    print("初始状态:", real_time_optimizer.get_stats())
    
    # 添加更多订单
    for order in orders[2:]:
        real_time_optimizer.add_order(order)
    
    print("添加订单后:", real_time_optimizer.get_stats())
    
    # 完成订单
    real_time_optimizer.complete_order("rider_1", "order_1")
    print("完成订单后:", real_time_optimizer.get_stats())

2. 实现一个商品推荐算法

import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Set
from collections import defaultdict, Counter
import math
import time
from dataclasses import dataclass

@dataclass
class User:
    user_id: str
    age: int
    gender: str
    location: str
    interests: List[str]

@dataclass
class Product:
    product_id: str
    category: str
    brand: str
    price: float
    tags: List[str]
    rating: float
    sales_count: int

@dataclass
class Interaction:
    user_id: str
    product_id: str
    action: str  # view, click, purchase, cart, favorite
    timestamp: float
    rating: Optional[float] = None

class CollaborativeFilteringRecommender:
    """协同过滤推荐器"""
    def __init__(self, min_interactions: int = 5):
        self.min_interactions = min_interactions
        self.user_item_matrix = None
        self.user_similarity = None
        self.item_similarity = None
        self.user_mapping = {}
        self.item_mapping = {}
        self.reverse_user_mapping = {}
        self.reverse_item_mapping = {}
    
    def fit(self, interactions: List[Interaction]):
        """训练协同过滤模型"""
        # 构建用户-物品交互矩阵
        user_item_data = defaultdict(lambda: defaultdict(float))
        
        for interaction in interactions:
            weight = self._get_action_weight(interaction.action)
            if interaction.rating:
                weight = interaction.rating
            
            user_item_data[interaction.user_id][interaction.product_id] += weight
        
        # 创建用户和物品映射
        users = list(user_item_data.keys())
        items = set()
        for user_data in user_item_data.values():
            items.update(user_data.keys())
        items = list(items)
        
        self.user_mapping = {user: idx for idx, user in enumerate(users)}
        self.item_mapping = {item: idx for idx, item in enumerate(items)}
        self.reverse_user_mapping = {idx: user for user, idx in self.user_mapping.items()}
        self.reverse_item_mapping = {idx: item for item, idx in self.item_mapping.items()}
        
        # 构建矩阵
        n_users = len(users)
        n_items = len(items)
        self.user_item_matrix = np.zeros((n_users, n_items))
        
        for user_id, user_data in user_item_data.items():
            user_idx = self.user_mapping[user_id]
            for product_id, weight in user_data.items():
                item_idx = self.item_mapping[product_id]
                self.user_item_matrix[user_idx, item_idx] = weight
        
        # 计算相似度
        self._compute_similarities()
    
    def _get_action_weight(self, action: str) -> float:
        """获取行为权重"""
        weights = {
            'view': 1.0,
            'click': 2.0,
            'cart': 3.0,
            'favorite': 4.0,
            'purchase': 5.0
        }
        return weights.get(action, 1.0)
    
    def _compute_similarities(self):
        """计算用户和物品相似度"""
        # 用户相似度(余弦相似度)
        self.user_similarity = np.zeros((len(self.user_mapping), len(self.user_mapping)))
        for i in range(len(self.user_mapping)):
            for j in range(i, len(self.user_mapping)):
                if i == j:
                    self.user_similarity[i, j] = 1.0
                else:
                    sim = self._cosine_similarity(self.user_item_matrix[i], self.user_item_matrix[j])
                    self.user_similarity[i, j] = sim
                    self.user_similarity[j, i] = sim
        
        # 物品相似度
        self.item_similarity = np.zeros((len(self.item_mapping), len(self.item_mapping)))
        for i in range(len(self.item_mapping)):
            for j in range(i, len(self.item_mapping)):
                if i == j:
                    self.item_similarity[i, j] = 1.0
                else:
                    sim = self._cosine_similarity(self.user_item_matrix[:, i], self.user_item_matrix[:, j])
                    self.item_similarity[i, j] = sim
                    self.item_similarity[j, i] = sim
    
    def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """计算余弦相似度"""
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        return dot_product / (norm1 * norm2)
    
    def recommend_for_user(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """为用户推荐物品"""
        if user_id not in self.user_mapping:
            return []
        
        user_idx = self.user_mapping[user_id]
        user_vector = self.user_item_matrix[user_idx]
        
        # 找到相似用户
        similar_users = np.argsort(self.user_similarity[user_idx])[::-1][1:21]  # 前20个相似用户
        
        # 计算推荐分数
        item_scores = defaultdict(float)
        
        for similar_user_idx in similar_users:
            similarity = self.user_similarity[user_idx, similar_user_idx]
            similar_user_vector = self.user_item_matrix[similar_user_idx]
            
            for item_idx, rating in enumerate(similar_user_vector):
                if rating > 0 and user_vector[item_idx] == 0:  # 用户未交互过的物品
                    item_scores[item_idx] += similarity * rating
        
        # 排序并返回top-k
        sorted_items = sorted(item_scores.items(), key=lambda x: x[1], reverse=True)
        
        recommendations = []
        for item_idx, score in sorted_items[:top_k]:
            product_id = self.reverse_item_mapping[item_idx]
            recommendations.append((product_id, score))
        
        return recommendations
    
    def recommend_similar_items(self, product_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """推荐相似物品"""
        if product_id not in self.item_mapping:
            return []
        
        item_idx = self.item_mapping[product_id]
        item_similarities = self.item_similarity[item_idx]
        
        # 获取最相似的物品
        similar_indices = np.argsort(item_similarities)[::-1][1:top_k+1]
        
        recommendations = []
        for similar_idx in similar_indices:
            similar_product_id = self.reverse_item_mapping[similar_idx]
            similarity = item_similarities[similar_idx]
            recommendations.append((similar_product_id, similarity))
        
        return recommendations

class ContentBasedRecommender:
    """基于内容的推荐器"""
    def __init__(self):
        self.product_features = {}
        self.user_profiles = {}
        self.feature_weights = {
            'category': 0.3,
            'brand': 0.2,
            'price_range': 0.2,
            'tags': 0.3
        }
    
    def fit(self, users: List[User], products: List[Product], interactions: List[Interaction]):
        """训练基于内容的模型"""
        # 构建物品特征向量
        for product in products:
            self.product_features[product.product_id] = self._extract_product_features(product)
        
        # 构建用户画像
        user_interactions = defaultdict(list)
        for interaction in interactions:
            user_interactions[interaction.user_id].append(interaction)
        
        for user in users:
            self.user_profiles[user.user_id] = self._build_user_profile(
                user, user_interactions[user.user_id]
            )
    
    def _extract_product_features(self, product: Product) -> Dict:
        """提取物品特征"""
        features = {
            'category': product.category,
            'brand': product.brand,
            'price_range': self._get_price_range(product.price),
            'tags': set(product.tags),
            'rating': product.rating,
            'sales_count': product.sales_count
        }
        return features
    
    def _get_price_range(self, price: float) -> str:
        """获取价格区间"""
        if price < 50:
            return "low"
        elif price < 200:
            return "medium"
        else:
            return "high"
    
    def _build_user_profile(self, user: User, interactions: List[Interaction]) -> Dict:
        """构建用户画像"""
        profile = {
            'age_group': self._get_age_group(user.age),
            'gender': user.gender,
            'location': user.location,
            'interests': set(user.interests),
            'preferred_categories': Counter(),
            'preferred_brands': Counter(),
            'preferred_price_ranges': Counter(),
            'preferred_tags': Counter()
        }
        
        # 基于交互历史更新偏好
        for interaction in interactions:
            if interaction.product_id in self.product_features:
                features = self.product_features[interaction.product_id]
                weight = self._get_action_weight(interaction.action)
                
                profile['preferred_categories'][features['category']] += weight
                profile['preferred_brands'][features['brand']] += weight
                profile['preferred_price_ranges'][features['price_range']] += weight
                
                for tag in features['tags']:
                    profile['preferred_tags'][tag] += weight
        
        return profile
    
    def _get_age_group(self, age: int) -> str:
        """获取年龄组"""
        if age < 18:
            return "teen"
        elif age < 30:
            return "young_adult"
        elif age < 50:
            return "adult"
        else:
            return "senior"
    
    def _get_action_weight(self, action: str) -> float:
        """获取行为权重"""
        weights = {
            'view': 1.0,
            'click': 2.0,
            'cart': 3.0,
            'favorite': 4.0,
            'purchase': 5.0
        }
        return weights.get(action, 1.0)
    
    def recommend_for_user(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """为用户推荐物品"""
        if user_id not in self.user_profiles:
            return []
        
        user_profile = self.user_profiles[user_id]
        item_scores = []
        
        for product_id, features in self.product_features.items():
            score = self._calculate_content_score(user_profile, features)
            item_scores.append((product_id, score))
        
        # 排序并返回top-k
        item_scores.sort(key=lambda x: x[1], reverse=True)
        return item_scores[:top_k]
    
    def _calculate_content_score(self, user_profile: Dict, product_features: Dict) -> float:
        """计算内容匹配分数"""
        score = 0.0
        
        # 类别匹配
        category_score = user_profile['preferred_categories'].get(product_features['category'], 0)
        score += category_score * self.feature_weights['category']
        
        # 品牌匹配
        brand_score = user_profile['preferred_brands'].get(product_features['brand'], 0)
        score += brand_score * self.feature_weights['brand']
        
        # 价格区间匹配
        price_score = user_profile['preferred_price_ranges'].get(product_features['price_range'], 0)
        score += price_score * self.feature_weights['price_range']
        
        # 标签匹配
        tag_score = 0
        for tag in product_features['tags']:
            tag_score += user_profile['preferred_tags'].get(tag, 0)
        score += tag_score * self.feature_weights['tags']
        
        # 考虑物品质量和热度
        score += product_features['rating'] * 0.1
        score += min(product_features['sales_count'] / 1000, 1) * 0.1
        
        return score

class HybridRecommender:
    """混合推荐器"""
    def __init__(self, cf_weight: float = 0.6, content_weight: float = 0.4):
        self.cf_recommender = CollaborativeFilteringRecommender()
        self.content_recommender = ContentBasedRecommender()
        self.cf_weight = cf_weight
        self.content_weight = content_weight
    
    def fit(self, users: List[User], products: List[Product], interactions: List[Interaction]):
        """训练混合推荐模型"""
        self.cf_recommender.fit(interactions)
        self.content_recommender.fit(users, products, interactions)
    
    def recommend_for_user(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """混合推荐"""
        # 协同过滤推荐
        cf_recommendations = self.cf_recommender.recommend_for_user(user_id, top_k * 2)
        cf_scores = {item_id: score for item_id, score in cf_recommendations}
        
        # 基于内容推荐
        content_recommendations = self.content_recommender.recommend_for_user(user_id, top_k * 2)
        content_scores = {item_id: score for item_id, score in content_recommendations}
        
        # 合并分数
        combined_scores = defaultdict(float)
        all_items = set(cf_scores.keys()) | set(content_scores.keys())
        
        for item_id in all_items:
            cf_score = cf_scores.get(item_id, 0)
            content_score = content_scores.get(item_id, 0)
            
            # 归一化分数
            cf_score = min(cf_score / 5.0, 1.0)  # 假设CF分数在0-5范围
            content_score = min(content_score / 10.0, 1.0)  # 假设内容分数在0-10范围
            
            combined_scores[item_id] = (cf_score * self.cf_weight + 
                                       content_score * self.content_weight)
        
        # 排序并返回top-k
        sorted_recommendations = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_recommendations[:top_k]

class RecommendationSystem:
    """推荐系统主类"""
    def __init__(self):
        self.hybrid_recommender = HybridRecommender()
        self.users = {}
        self.products = {}
        self.interactions = []
        self.is_trained = False
    
    def add_user(self, user: User):
        """添加用户"""
        self.users[user.user_id] = user
    
    def add_product(self, product: Product):
        """添加商品"""
        self.products[product.product_id] = product
    
    def add_interaction(self, interaction: Interaction):
        """添加交互"""
        self.interactions.append(interaction)
        self.is_trained = False
    
    def train(self):
        """训练推荐模型"""
        users_list = list(self.users.values())
        products_list = list(self.products.values())
        
        self.hybrid_recommender.fit(users_list, products_list, self.interactions)
        self.is_trained = True
    
    def recommend(self, user_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """获取推荐"""
        if not self.is_trained:
            self.train()
        
        return self.hybrid_recommender.recommend_for_user(user_id, top_k)
    
    def recommend_similar_products(self, product_id: str, top_k: int = 10) -> List[Tuple[str, float]]:
        """推荐相似商品"""
        if not self.is_trained:
            self.train()
        
        return self.hybrid_recommender.cf_recommender.recommend_similar_items(product_id, top_k)
    
    def get_user_profile(self, user_id: str) -> Dict:
        """获取用户画像"""
        return self.hybrid_recommender.content_recommender.user_profiles.get(user_id, {})

# 使用示例
if __name__ == "__main__":
    # 创建推荐系统
    rec_system = RecommendationSystem()
    
    # 添加用户
    users = [
        User("user_1", 25, "male", "北京", ["电子产品", "运动"]),
        User("user_2", 30, "female", "上海", ["服装", "美妆"]),
        User("user_3", 28, "male", "广州", ["食品", "家居"])
    ]
    
    for user in users:
        rec_system.add_user(user)
    
    # 添加商品
    products = [
        Product("product_1", "电子产品", "Apple", 5999, ["手机", "5G"], 4.8, 10000),
        Product("product_2", "服装", "Nike", 599, ["运动鞋", "舒适"], 4.5, 5000),
        Product("product_3", "美妆", "Lancome", 299, ["口红", "保湿"], 4.6, 3000),
        Product("product_4", "食品", "三只松鼠", 99, ["坚果", "健康"], 4.7, 8000),
        Product("product_5", "家居", "IKEA", 199, ["收纳", "简约"], 4.4, 2000)
    ]
    
    for product in products:
        rec_system.add_product(product)
    
    # 添加交互
    interactions = [
        Interaction("user_1", "product_1", "view", time.time() - 3600),
        Interaction("user_1", "product_1", "click", time.time() - 3000),
        Interaction("user_1", "product_2", "view", time.time() - 2400),
        Interaction("user_2", "product_3", "view", time.time() - 1800),
        Interaction("user_2", "product_3", "purchase", time.time() - 1200),
        Interaction("user_3", "product_4", "view", time.time() - 600),
        Interaction("user_3", "product_5", "click", time.time() - 300)
    ]
    
    for interaction in interactions:
        rec_system.add_interaction(interaction)
    
    # 训练模型
    rec_system.train()
    
    # 获取推荐
    print("为user_1的推荐:")
    recommendations = rec_system.recommend("user_1", top_k=3)
    for product_id, score in recommendations:
        product = rec_system.products[product_id]
        print(f"  {product.name} ({product.category}): {score:.3f}")
    
    print("\n相似商品推荐 (product_1):")
    similar_products = rec_system.recommend_similar_products("product_1", top_k=3)
    for product_id, similarity in similar_products:
        product = rec_system.products[product_id]
        print(f"  {product.name}: {similarity:.3f}")
    
    # 查看用户画像
    print("\nuser_1的用户画像:")
    profile = rec_system.get_user_profile("user_1")
    print(f"  偏好类别: {dict(profile.get('preferred_categories', []))}")
    print(f"  偏好品牌: {dict(profile.get('preferred_brands', []))}")
    print(f"  偏好价格区间: {dict(profile.get('preferred_price_ranges', []))}")

3. 实现一个库存管理系统

import threading
import time
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import heapq
import json

class InventoryStatus(Enum):
    IN_STOCK = "in_stock"
    LOW_STOCK = "low_stock"
    OUT_OF_STOCK = "out_of_stock"
    DISCONTINUED = "discontinued"

@dataclass
class Product:
    product_id: str
    name: str
    category: str
    price: float
    cost: float
    supplier: str
    min_stock_level: int = 10
    max_stock_level: int = 100

@dataclass
class StockItem:
    product_id: str
    quantity: int
    reserved_quantity: int = 0
    available_quantity: int = field(init=False)
    last_updated: float = field(default_factory=time.time)
    
    def __post_init__(self):
        self.available_quantity = self.quantity - self.reserved_quantity
    
    def update_available_quantity(self):
        """更新可用数量"""
        self.available_quantity = max(0, self.quantity - self.reserved_quantity)
        self.last_updated = time.time()
    
    def get_status(self, min_stock_level: int) -> InventoryStatus:
        """获取库存状态"""
        if self.quantity == 0:
            return InventoryStatus.OUT_OF_STOCK
        elif self.quantity <= min_stock_level:
            return InventoryStatus.LOW_STOCK
        else:
            return InventoryStatus.IN_STOCK

@dataclass
class StockTransaction:
    transaction_id: str
    product_id: str
    transaction_type: str  # 'in', 'out', 'reserve', 'release'
    quantity: int
    timestamp: float = field(default_factory=time.time)
    reference_id: Optional[str] = None
    reason: str = ""

class InventoryManager:
    """库存管理器"""
    def __init__(self):
        self.products: Dict[str, Product] = {}
        self.stock_items: Dict[str, StockItem] = {}
        self.transactions: List[StockTransaction] = []
        self.lock = threading.RLock()
        
        # 预警系统
        self.low_stock_threshold = 0.2  # 低库存阈值比例
        self.out_of_stock_alerts = set()
        self.low_stock_alerts = set()
        
        # 统计信息
        self.stats = {
            'total_products': 0,
            'total_value': 0.0,
            'low_stock_count': 0,
            'out_of_stock_count': 0
        }
    
    def add_product(self, product: Product):
        """添加商品"""
        with self.lock:
            self.products[product.product_id] = product
            
            if product.product_id not in self.stock_items:
                self.stock_items[product.product_id] = StockItem(product.product_id, 0)
            
            self._update_stats()
    
    def add_stock(self, product_id: str, quantity: int, reason: str = "入库") -> bool:
        """增加库存"""
        with self.lock:
            if product_id not in self.products:
                return False
            
            stock_item = self.stock_items[product_id]
            stock_item.quantity += quantity
            stock_item.update_available_quantity()
            
            # 记录交易
            transaction = StockTransaction(
                f"IN_{int(time.time())}_{product_id}",
                product_id, 'in', quantity, reason=reason
            )
            self.transactions.append(transaction)
            
            # 检查预警
            self._check_stock_alerts(product_id)
            self._update_stats()
            
            return True
    
    def remove_stock(self, product_id: str, quantity: int, 
                    reason: str = "出库") -> bool:
        """减少库存"""
        with self.lock:
            if product_id not in self.stock_items:
                return False
            
            stock_item = self.stock_items[product_id]
            
            if stock_item.available_quantity < quantity:
                return False
            
            stock_item.quantity -= quantity
            stock_item.update_available_quantity()
            
            # 记录交易
            transaction = StockTransaction(
                f"OUT_{int(time.time())}_{product_id}",
                product_id, 'out', quantity, reason=reason
            )
            self.transactions.append(transaction)
            
            # 检查预警
            self._check_stock_alerts(product_id)
            self._update_stats()
            
            return True
    
    def reserve_stock(self, product_id: str, quantity: int, 
                      reference_id: str) -> bool:
        """预留库存"""
        with self.lock:
            if product_id not in self.stock_items:
                return False
            
            stock_item = self.stock_items[product_id]
            
            if stock_item.available_quantity < quantity:
                return False
            
            stock_item.reserved_quantity += quantity
            stock_item.update_available_quantity()
            
            # 记录交易
            transaction = StockTransaction(
                f"RESERVE_{int(time.time())}_{product_id}",
                product_id, 'reserve', quantity, reference_id=reference_id
            )
            self.transactions.append(transaction)
            
            return True
    
    def release_stock(self, product_id: str, quantity: int, 
                     reference_id: str) -> bool:
        """释放预留库存"""
        with self.lock:
            if product_id not in self.stock_items:
                return False
            
            stock_item = self.stock_items[product_id]
            
            if stock_item.reserved_quantity < quantity:
                return False
            
            stock_item.reserved_quantity -= quantity
            stock_item.update_available_quantity()
            
            # 记录交易
            transaction = StockTransaction(
                f"RELEASE_{int(time.time())}_{product_id}",
                product_id, 'release', quantity, reference_id=reference_id
            )
            self.transactions.append(transaction)
            
            return True
    
    def confirm_reserved_stock(self, product_id: str, quantity: int,
                               reference_id: str) -> bool:
        """确认预留库存(实际出库)"""
        with self.lock:
            if product_id not in self.stock_items:
                return False
            
            stock_item = self.stock_items[product_id]
            
            if stock_item.reserved_quantity < quantity:
                return False
            
            # 先释放预留,再减少库存
            stock_item.reserved_quantity -= quantity
            stock_item.quantity -= quantity
            stock_item.update_available_quantity()
            
            # 记录交易
            transaction = StockTransaction(
                f"CONFIRM_{int(time.time())}_{product_id}",
                product_id, 'out', quantity, reference_id=reference_id,
                reason="确认预留库存"
            )
            self.transactions.append(transaction)
            
            # 检查预警
            self._check_stock_alerts(product_id)
            self._update_stats()
            
            return True
    
    def get_stock_level(self, product_id: str) -> Optional[StockItem]:
        """获取库存水平"""
        with self.lock:
            return self.stock_items.get(product_id)
    
    def get_stock_status(self, product_id: str) -> Optional[InventoryStatus]:
        """获取库存状态"""
        with self.lock:
            if product_id not in self.stock_items or product_id not in self.products:
                return None
            
            stock_item = self.stock_items[product_id]
            product = self.products[product_id]
            
            return stock_item.get_status(product.min_stock_level)
    
    def get_low_stock_products(self) -> List[Tuple[str, StockItem]]:
        """获取低库存商品"""
        with self.lock:
            low_stock = []
            
            for product_id, stock_item in self.stock_items.items():
                if product_id in self.products:
                    product = self.products[product_id]
                    status = stock_item.get_status(product.min_stock_level)
                    if status == InventoryStatus.LOW_STOCK:
                        low_stock.append((product_id, stock_item))
            
            return low_stock
    
    def get_out_of_stock_products(self) -> List[Tuple[str, StockItem]]:
        """获取缺货商品"""
        with self.lock:
            out_of_stock = []
            
            for product_id, stock_item in self.stock_items.items():
                if stock_item.quantity == 0:
                    out_of_stock.append((product_id, stock_item))
            
            return out_of_stock
    
    def get_transaction_history(self, product_id: str = None, 
                               start_time: float = None, 
                               end_time: float = None) -> List[StockTransaction]:
        """获取交易历史"""
        with self.lock:
            transactions = self.transactions.copy()
            
            # 过滤条件
            if product_id:
                transactions = [t for t in transactions if t.product_id == product_id]
            
            if start_time:
                transactions = [t for t in transactions if t.timestamp >= start_time]
            
            if end_time:
                transactions = [t for t in transactions if t.timestamp <= end_time]
            
            return sorted(transactions, key=lambda x: x.timestamp, reverse=True)
    
    def _check_stock_alerts(self, product_id: str):
        """检查库存预警"""
        if product_id not in self.stock_items or product_id not in self.products:
            return
        
        stock_item = self.stock_items[product_id]
        product = self.products[product_id]
        status = stock_item.get_status(product.min_stock_level)
        
        if status == InventoryStatus.OUT_OF_STOCK:
            self.out_of_stock_alerts.add(product_id)
        elif status == InventoryStatus.LOW_STOCK:
            self.low_stock_alerts.add(product_id)
        else:
            # 移除预警
            self.out_of_stock_alerts.discard(product_id)
            self.low_stock_alerts.discard(product_id)
    
    def _update_stats(self):
        """更新统计信息"""
        self.stats['total_products'] = len(self.products)
        self.stats['total_value'] = 0.0
        self.stats['low_stock_count'] = 0
        self.stats['out_of_stock_count'] = 0
        
        for product_id, stock_item in self.stock_items.items():
            if product_id in self.products:
                product = self.products[product_id]
                self.stats['total_value'] += stock_item.quantity * product.cost
                
                status = stock_item.get_status(product.min_stock_level)
                if status == InventoryStatus.LOW_STOCK:
                    self.stats['low_stock_count'] += 1
                elif status == InventoryStatus.OUT_OF_STOCK:
                    self.stats['out_of_stock_count'] += 1
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        with self.lock:
            return self.stats.copy()
    
    def get_alerts(self) -> Dict:
        """获取预警信息"""
        with self.lock:
            return {
                'out_of_stock': list(self.out_of_stock_alerts),
                'low_stock': list(self.low_stock_alerts)
            }

class AutoReplenishmentSystem:
    """自动补货系统"""
    def __init__(self, inventory_manager: InventoryManager):
        self.inventory_manager = inventory_manager
        self.replenishment_rules = {}
        self.replenishment_queue = []
        self.is_running = False
        self.worker_thread = None
    
    def add_replenishment_rule(self, product_id: str, 
                              reorder_point: int,
                              reorder_quantity: int,
                              max_order_quantity: int = 1000):
        """添加补货规则"""
        self.replenishment_rules[product_id] = {
            'reorder_point': reorder_point,
            'reorder_quantity': reorder_quantity,
            'max_order_quantity': max_order_quantity
        }
    
    def start_monitoring(self, check_interval: int = 60):
        """开始监控"""
        self.is_running = True
        self.worker_thread = threading.Thread(
            target=self._monitoring_loop, 
            args=(check_interval,),
            daemon=True
        )
        self.worker_thread.start()
    
    def stop_monitoring(self):
        """停止监控"""
        self.is_running = False
        if self.worker_thread:
            self.worker_thread.join(timeout=5)
    
    def _monitoring_loop(self, check_interval: int):
        """监控循环"""
        while self.is_running:
            try:
                self._check_replenishment()
                time.sleep(check_interval)
            except Exception as e:
                print(f"补货监控出错: {e}")
                time.sleep(10)
    
    def _check_replenishment(self):
        """检查补货需求"""
        for product_id, rule in self.replenishment_rules.items():
            stock_item = self.inventory_manager.get_stock_level(product_id)
            
            if stock_item and stock_item.quantity <= rule['reorder_point']:
                # 计算补货数量
                needed_quantity = rule['reorder_quantity']
                
                # 考虑当前预留库存
                total_needed = needed_quantity + stock_item.reserved_quantity
                
                # 限制最大订单数量
                order_quantity = min(total_needed, rule['max_order_quantity'])
                
                # 创建补货订单
                replenishment_order = {
                    'product_id': product_id,
                    'quantity': order_quantity,
                    'timestamp': time.time(),
                    'priority': 'high' if stock_item.quantity == 0 else 'normal'
                }
                
                self.replenishment_queue.append(replenishment_order)
                print(f"生成补货订单: {replenishment_order}")
    
    def get_replenishment_queue(self) -> List[Dict]:
        """获取补货队列"""
        return self.replenishment_queue.copy()
    
    def process_replenishment(self, order_index: int) -> bool:
        """处理补货订单"""
        if order_index >= len(self.replenishment_queue):
            return False
        
        order = self.replenishment_queue.pop(order_index)
        
        # 执行补货
        success = self.inventory_manager.add_stock(
            order['product_id'], 
            order['quantity'], 
            reason="自动补货"
        )
        
        if success:
            print(f"补货成功: {order['product_id']} x {order['quantity']}")
        else:
            print(f"补货失败: {order['product_id']}")
        
        return success

# 使用示例
if __name__ == "__main__":
    # 创建库存管理器
    inventory = InventoryManager()
    
    # 添加商品
    products = [
        Product("p001", "iPhone 13", "电子产品", 5999, 4000, "Apple", 5, 50),
        Product("p002", "运动鞋", "服装", 599, 300, "Nike", 10, 100),
        Product("p003", "口红", "美妆", 299, 150, "Lancome", 20, 200),
    ]
    
    for product in products:
        inventory.add_product(product)
    
    # 初始库存
    inventory.add_stock("p001", 30, "初始入库")
    inventory.add_stock("p002", 15, "初始入库")
    inventory.add_stock("p003", 25, "初始入库")
    
    # 查看库存状态
    print("初始库存状态:")
    for product_id in ["p001", "p002", "p003"]:
        stock = inventory.get_stock_level(product_id)
        status = inventory.get_stock_status(product_id)
        print(f"  {product_id}: 数量={stock.quantity}, 可用={stock.available_quantity}, 状态={status.value}")
    
    # 预留库存
    print("\n预留库存:")
    inventory.reserve_stock("p001", 5, "order_001")
    inventory.reserve_stock("p002", 8, "order_002")
    
    for product_id in ["p001", "p002"]:
        stock = inventory.get_stock_level(product_id)
        print(f"  {product_id}: 数量={stock.quantity}, 预留={stock.reserved_quantity}, 可用={stock.available_quantity}")
    
    # 确认预留库存
    print("\n确认预留库存:")
    inventory.confirm_reserved_stock("p001", 3, "order_001")
    
    stock = inventory.get_stock_level("p001")
    print(f"  p001: 数量={stock.quantity}, 预留={stock.reserved_quantity}, 可用={stock.available_quantity}")
    
    # 释放预留库存
    inventory.release_stock("p002", 5, "order_002")
    stock = inventory.get_stock_level("p002")
    print(f"  p002: 数量={stock.quantity}, 预留={stock.reserved_quantity}, 可用={stock.available_quantity}")
    
    # 查看预警
    print("\n库存预警:")
    alerts = inventory.get_alerts()
    print(f"  缺货商品: {alerts['out_of_stock']}")
    print(f"  低库存商品: {alerts['low_stock']}")
    
    # 查看统计信息
    print("\n统计信息:")
    stats = inventory.get_stats()
    for key, value in stats.items():
        print(f"  {key}: {value}")
    
    # 自动补货系统
    print("\n自动补货系统:")
    auto_replenish = AutoReplenishmentSystem(inventory)
    
    # 添加补货规则
    auto_replenish.add_replenishment_rule("p001", 10, 20, 50)
    auto_replenish.add_replenishment_rule("p002", 5, 15, 30)
    
    # 消耗一些库存
    inventory.remove_stock("p001", 20, "销售")
    inventory.remove_stock("p002", 8, "销售")
    
    # 检查补货需求
    auto_replenish._check_replenishment()
    
    # 查看补货队列
    queue = auto_replenish.get_replenishment_queue()
    print(f"补货队列: {queue}")
    
    # 处理补货
    if queue:
        auto_replenish.process_replenishment(0)
    
    # 查看最终库存
    print("\n最终库存状态:")
    for product_id in ["p001", "p002", "p003"]:
        stock = inventory.get_stock_level(product_id)
        status = inventory.get_stock_status(product_id)
        print(f"  {product_id}: 数量={stock.quantity}, 可用={stock.available_quantity}, 状态={status.value}")

[继续下一部分…]


🔧 第二部分:系统设计 (25题)

O2O系统 (10题)

4. 实现一个商家评分系统

import time
import threading
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
import statistics
import math

class RatingType(Enum):
    FOOD_QUALITY = "food_quality"
    SERVICE = "service"
    ENVIRONMENT = "environment"
    DELIVERY = "delivery"
    VALUE = "value"

@dataclass
class Rating:
    rating_id: str
    user_id: str
    merchant_id: str
    order_id: str
    overall_score: float  # 总体评分 1-5
    detailed_scores: Dict[RatingType, float] = field(default_factory=dict)
    comment: str = ""
    images: List[str] = field(default_factory=list)
    timestamp: float = field(default_factory=time.time)
    helpful_count: int = 0
    is_verified: bool = False

@dataclass
class Merchant:
    merchant_id: str
    name: str
    category: str
    location: str
    registration_time: float = field(default_factory=time.time)

class RatingSystem:
    """商家评分系统"""
    def __init__(self):
        self.merchants: Dict[str, Merchant] = {}
        self.ratings: Dict[str, Rating] = {}
        self.merchant_ratings: Dict[str, List[str]] = defaultdict(list)  # merchant_id -> rating_ids
        self.user_ratings: Dict[str, List[str]] = defaultdict(list)  # user_id -> rating_ids
        
        # 评分统计缓存
        self.rating_stats_cache: Dict[str, Dict] = {}
        self.cache_lock = threading.RLock()
        self.cache_ttl = 300  # 缓存5分钟
        
        # 反垃圾评分系统
        self.spam_detector = SpamDetector()
        
        # 权重配置
        self.rating_weights = {
            'recency': 0.3,      # 时间权重
            'quality': 0.4,       # 评分质量权重
            'user_credibility': 0.3  # 用户可信度权重
        }
    
    def add_merchant(self, merchant: Merchant):
        """添加商家"""
        self.merchants[merchant.merchant_id] = merchant
    
    def add_rating(self, rating: Rating) -> bool:
        """添加评分"""
        # 验证评分
        if not self._validate_rating(rating):
            return False
        
        # 检查是否为垃圾评分
        if self.spam_detector.is_spam(rating):
            return False
        
        # 保存评分
        self.ratings[rating.rating_id] = rating
        self.merchant_ratings[rating.merchant_id].append(rating.rating_id)
        self.user_ratings[rating.user_id].append(rating.rating_id)
        
        # 清除缓存
        self._clear_cache(rating.merchant_id)
        
        return True
    
    def _validate_rating(self, rating: Rating) -> bool:
        """验证评分有效性"""
        # 检查评分范围
        if not (1 <= rating.overall_score <= 5):
            return False
        
        # 检查详细评分
        for score_type, score in rating.detailed_scores.items():
            if not (1 <= score <= 5):
                return False
        
        # 检查商家是否存在
        if rating.merchant_id not in self.merchants:
            return False
        
        # 检查用户是否已经对该订单评分
        for rating_id in self.user_ratings[rating.user_id]:
            existing_rating = self.ratings[rating_id]
            if existing_rating.order_id == rating.order_id:
                return False  # 同一订单不能重复评分
        
        return True
    
    def get_merchant_rating_stats(self, merchant_id: str) -> Dict:
        """获取商家评分统计"""
        # 检查缓存
        cached_stats = self._get_cached_stats(merchant_id)
        if cached_stats:
            return cached_stats
        
        if merchant_id not in self.merchant_ratings:
            return self._empty_stats()
        
        rating_ids = self.merchant_ratings[merchant_id]
        if not rating_ids:
            return self._empty_stats()
        
        ratings = [self.ratings[rating_id] for rating_id in rating_ids]
        
        # 计算基础统计
        overall_scores = [r.overall_score for r in ratings]
        overall_avg = statistics.mean(overall_scores)
        overall_median = statistics.median(overall_scores)
        
        # 计算详细评分统计
        detailed_stats = {}
        for rating_type in RatingType:
            scores = []
            for r in ratings:
                if rating_type in r.detailed_scores:
                    scores.append(r.detailed_scores[rating_type])
            
            if scores:
                detailed_stats[rating_type.value] = {
                    'average': statistics.mean(scores),
                    'median': statistics.median(scores),
                    'count': len(scores)
                }
        
        # 计算加权评分
        weighted_score = self._calculate_weighted_score(ratings)
        
        # 计算评分分布
        score_distribution = self._calculate_score_distribution(overall_scores)
        
        # 计算趋势
        trend = self._calculate_rating_trend(ratings)
        
        stats = {
            'overall_average': overall_avg,
            'overall_median': overall_median,
            'weighted_score': weighted_score,
            'total_ratings': len(ratings),
            'detailed_scores': detailed_stats,
            'score_distribution': score_distribution,
            'trend': trend,
            'last_updated': time.time()
        }
        
        # 缓存结果
        self._cache_stats(merchant_id, stats)
        
        return stats
    
    def _calculate_weighted_score(self, ratings: List[Rating]) -> float:
        """计算加权评分"""
        if not ratings:
            return 0.0
        
        current_time = time.time()
        weighted_sum = 0.0
        total_weight = 0.0
        
        for rating in ratings:
            # 时间权重(越近的评分权重越高)
            days_ago = (current_time - rating.timestamp) / (24 * 3600)
            recency_weight = math.exp(-days_ago / 30)  # 30天衰减
            
            # 质量权重(有详细评分和评论的权重更高)
            quality_weight = 1.0
            if rating.detailed_scores:
                quality_weight += 0.2
            if rating.comment and len(rating.comment) > 10:
                quality_weight += 0.1
            if rating.images:
                quality_weight += 0.1
            
            # 用户可信度权重(基于用户历史评分)
            user_credibility = self._get_user_credibility(rating.user_id)
            
            # 总权重
            total_weight = (
                self.rating_weights['recency'] * recency_weight +
                self.rating_weights['quality'] * quality_weight +
                self.rating_weights['user_credibility'] * user_credibility
            )
            
            weighted_sum += rating.overall_score * total_weight
            total_weight += total_weight
        
        return weighted_sum / total_weight if total_weight > 0 else 0.0
    
    def _get_user_credibility(self, user_id: str) -> float:
        """获取用户可信度"""
        if user_id not in self.user_ratings:
            return 0.5  # 新用户默认可信度
        
        rating_ids = self.user_ratings[user_id]
        rating_count = len(rating_ids)
        
        # 基于评分数量计算可信度
        if rating_count < 3:
            return 0.3
        elif rating_count < 10:
            return 0.5
        elif rating_count < 50:
            return 0.7
        else:
            return 0.9
    
    def _calculate_score_distribution(self, scores: List[float]) -> Dict[int, int]:
        """计算评分分布"""
        distribution = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
        for score in scores:
            distribution[int(score)] += 1
        return distribution
    
    def _calculate_rating_trend(self, ratings: List[Rating]) -> str:
        """计算评分趋势"""
        if len(ratings) < 10:
            return "insufficient_data"
        
        # 按时间排序
        sorted_ratings = sorted(ratings, key=lambda x: x.timestamp)
        
        # 比较最近30天和前30天的平均分
        current_time = time.time()
        thirty_days_ago = current_time - 30 * 24 * 3600
        sixty_days_ago = current_time - 60 * 24 * 3600
        
        recent_scores = [r.overall_score for r in sorted_ratings if r.timestamp >= thirty_days_ago]
        previous_scores = [r.overall_score for r in sorted_ratings 
                          if sixty_days_ago <= r.timestamp < thirty_days_ago]
        
        if not recent_scores or not previous_scores:
            return "insufficient_data"
        
        recent_avg = statistics.mean(recent_scores)
        previous_avg = statistics.mean(previous_scores)
        
        if recent_avg > previous_avg + 0.2:
            return "improving"
        elif recent_avg < previous_avg - 0.2:
            return "declining"
        else:
            return "stable"
    
    def get_top_merchants(self, category: str = None, 
                         limit: int = 10) -> List[Tuple[str, float]]:
        """获取评分最高的商家"""
        merchant_scores = []
        
        for merchant_id, merchant in self.merchants.items():
            if category and merchant.category != category:
                continue
            
            stats = self.get_merchant_rating_stats(merchant_id)
            weighted_score = stats['weighted_score']
            
            # 只考虑有足够评分的商家
            if stats['total_ratings'] >= 5:
                merchant_scores.append((merchant_id, weighted_score))
        
        # 排序并返回top-k
        merchant_scores.sort(key=lambda x: x[1], reverse=True)
        return merchant_scores[:limit]
    
    def mark_rating_helpful(self, rating_id: str, user_id: str) -> bool:
        """标记评分有用"""
        if rating_id not in self.ratings:
            return False
        
        rating = self.ratings[rating_id]
        rating.helpful_count += 1
        
        # 清除相关缓存
        self._clear_cache(rating.merchant_id)
        
        return True
    
    def verify_rating(self, rating_id: str) -> bool:
        """验证评分(管理员功能)"""
        if rating_id not in self.ratings:
            return False
        
        rating = self.ratings[rating_id]
        rating.is_verified = True
        
        # 清除相关缓存
        self._clear_cache(rating.merchant_id)
        
        return True
    
    def _empty_stats(self) -> Dict:
        """空统计"""
        return {
            'overall_average': 0.0,
            'overall_median': 0.0,
            'weighted_score': 0.0,
            'total_ratings': 0,
            'detailed_scores': {},
            'score_distribution': {1: 0, 2: 0, 3: 0, 4: 0, 5: 0},
            'trend': 'no_data',
            'last_updated': time.time()
        }
    
    def _cache_stats(self, merchant_id: str, stats: Dict):
        """缓存统计结果"""
        with self.cache_lock:
            self.rating_stats_cache[merchant_id] = {
                'stats': stats,
                'timestamp': time.time()
            }
    
    def _get_cached_stats(self, merchant_id: str) -> Optional[Dict]:
        """获取缓存的统计结果"""
        with self.cache_lock:
            cached = self.rating_stats_cache.get(merchant_id)
            if cached and time.time() - cached['timestamp'] < self.cache_ttl:
                return cached['stats']
            return None
    
    def _clear_cache(self, merchant_id: str):
        """清除缓存"""
        with self.cache_lock:
            self.rating_stats_cache.pop(merchant_id, None)

class SpamDetector:
    """垃圾评分检测器"""
    def __init__(self):
        self.suspicious_patterns = [
            '刷单',
            '好评返现',
            '五星好评',
            '默认好评'
        ]
        
        self.user_rating_frequency = defaultdict(list)
        self.merchant_rating_frequency = defaultdict(list)
    
    def is_spam(self, rating: Rating) -> bool:
        """检测是否为垃圾评分"""
        # 检查评论内容
        if self._contains_suspicious_content(rating.comment):
            return True
        
        # 检查评分频率
        if self._is_abnormal_frequency(rating):
            return True
        
        # 检查评分模式
        if self._is_abnormal_pattern(rating):
            return True
        
        return False
    
    def _contains_suspicious_content(self, comment: str) -> bool:
        """检查是否包含可疑内容"""
        comment_lower = comment.lower()
        for pattern in self.suspicious_patterns:
            if pattern in comment_lower:
                return True
        return False
    
    def _is_abnormal_frequency(self, rating: Rating) -> bool:
        """检查评分频率是否异常"""
        current_time = time.time()
        user_ratings = self.user_rating_frequency[rating.user_id]
        
        # 清理过期记录(24小时前)
        user_ratings[:] = [t for t in user_ratings if current_time - t < 24 * 3600]
        
        # 添加当前评分
        user_ratings.append(current_time)
        
        # 检查24小时内评分次数
        if len(user_ratings) > 20:  # 24小时内超过20次评分
            return True
        
        return False
    
    def _is_abnormal_pattern(self, rating: Rating) -> bool:
        """检查评分模式是否异常"""
        # 检查是否总是给5分
        # 这里需要更复杂的逻辑,暂时简化
        return False

# 使用示例
if __name__ == "__main__":
    # 创建评分系统
    rating_system = RatingSystem()
    
    # 添加商家
    merchants = [
        Merchant("m001", "川味小厨", "川菜", "朝阳区"),
        Merchant("m002", "粤式茶餐厅", "粤菜", "海淀区"),
        Merchant("m003", "日式拉面馆", "日料", "东城区"),
    ]
    
    for merchant in merchants:
        rating_system.add_merchant(merchant)
    
    # 添加评分
    ratings = [
        Rating("r001", "user_001", "m001", "order_001", 4.5,
               {RatingType.FOOD_QUALITY: 4.0, RatingType.SERVICE: 5.0, 
                RatingType.DELIVERY: 4.5}, "味道不错,服务很好"),
        Rating("r002", "user_002", "m001", "order_002", 3.5,
               {RatingType.FOOD_QUALITY: 3.0, RatingType.SERVICE: 4.0}, "一般般"),
        Rating("r003", "user_003", "m002", "order_003", 5.0,
               {RatingType.FOOD_QUALITY: 5.0, RatingType.SERVICE: 5.0}, "非常满意"),
        Rating("r004", "user_004", "m002", "order_004", 4.0,
               {RatingType.FOOD_QUALITY: 4.0, RatingType.DELIVERY: 4.0}, "还不错"),
        Rating("r005", "user_005", "m003", "order_005", 4.8,
               {RatingType.FOOD_QUALITY: 5.0, RatingType.SERVICE: 4.5}, "拉面很正宗"),
    ]
    
    for rating in ratings:
        rating_system.add_rating(rating)
    
    # 查看商家评分统计
    print("商家评分统计:")
    for merchant_id in ["m001", "m002", "m003"]:
        stats = rating_system.get_merchant_rating_stats(merchant_id)
        merchant = rating_system.merchants[merchant_id]
        print(f"\n{merchant.name}:")
        print(f"  总体平均分: {stats['overall_average']:.2f}")
        print(f"  加权评分: {stats['weighted_score']:.2f}")
        print(f"  评分数量: {stats['total_ratings']}")
        print(f"  评分趋势: {stats['trend']}")
        print(f"  评分分布: {stats['score_distribution']}")
    
    # 获取top商家
    print("\nTop商家:")
    top_merchants = rating_system.get_top_merchants(limit=3)
    for merchant_id, score in top_merchants:
        merchant = rating_system.merchants[merchant_id]
        print(f"  {merchant.name}: {score:.2f}")
    
    # 标记评分有用
    rating_system.mark_rating_helpful("r001", "user_006")
    
    # 验证评分
    rating_system.verify_rating("r003")
    
    # 查看更新后的统计
    print("\n更新后的m001统计:")
    stats = rating_system.get_merchant_rating_stats("m001")
    print(f"  加权评分: {stats['weighted_score']:.2f}")
Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐