📁 项目结构总览

llm-toolkit/
├── llm_toolkit/                 # 主包目录
│   ├── __init__.py
│   ├── core/                    # 核心模块
│   │   ├── __init__.py
│   │   ├── llm_manager.py      # LLM管理器
│   │   ├── prompt_manager.py   # Prompt模板管理
│   │   ├── conversation.py     # 对话管理
│   │   └── cost_tracker.py     # 成本追踪
│   ├── rag/                     # RAG模块
│   │   ├── __init__.py
│   │   ├── retriever.py        # 检索器
│   │   ├── embeddings.py       # 嵌入向量
│   │   ├── vector_store.py     # 向量数据库
│   │   └── document_loader.py  # 文档加载器
│   ├── agents/                  # 智能代理
│   │   ├── __init__.py
│   │   ├── base_agent.py
│   │   ├── tool_agent.py
│   │   └── react_agent.py
│   ├── chains/                  # LangChain链
│   │   ├── __init__.py
│   │   ├── qa_chain.py
│   │   ├── summarization.py
│   │   └── translation.py
│   ├── utils/                   # 工具函数
│   │   ├── __init__.py
│   │   ├── text_splitter.py
│   │   ├── validators.py
│   │   └── formatters.py
│   └── ui/                      # 用户界面
│       ├── __init__.py
│       ├── streamlit_app.py
│       └── gradio_app.py
├── tests/                       # 测试
├── examples/                    # 示例
├── docs/                        # 文档
├── config/                      # 配置
├── scripts/                     # 脚本
├── requirements.txt
├── setup.py
├── README.md
└── .env.example

第一部分:核心配置文件

1. setup.py - 项目安装配置

# -*- coding: utf-8 -*-
"""
LLM Toolkit - 大语言模型工具包
Setup Configuration
"""

from setuptools import setup, find_packages
import os

# 读取README
with open("README.md", "r", encoding="utf-8") as fh:
    long_description = fh.read()

# 读取requirements
with open("requirements.txt", "r", encoding="utf-8") as fh:
    requirements = [line.strip() for line in fh if line.strip() and not line.startswith("#")]

setup(
    name="llm-toolkit",
    version="1.0.0",
    author="DREAMVFIA",
    author_email="contact@dreamvfia.com",
    description="企业级LLM应用开发加速器",
    long_description=long_description,
    long_description_content_type="text/markdown",
    url="https://github.com/dreamvfia/llm-toolkit",
    packages=find_packages(),
    classifiers=[
        "Development Status :: 4 - Beta",
        "Intended Audience :: Developers",
        "Topic :: Software Development :: Libraries :: Python Modules",
        "Topic :: Scientific/Engineering :: Artificial Intelligence",
        "License :: OSI Approved :: MIT License",
        "Programming Language :: Python :: 3",
        "Programming Language :: Python :: 3.8",
        "Programming Language :: Python :: 3.9",
        "Programming Language :: Python :: 3.10",
        "Programming Language :: Python :: 3.11",
    ],
    python_requires=">=3.8",
    install_requires=requirements,
    extras_require={
        "dev": [
            "pytest>=7.0.0",
            "pytest-cov>=4.0.0",
            "black>=23.0.0",
            "flake8>=6.0.0",
            "mypy>=1.0.0",
            "isort>=5.12.0",
        ],
        "ui": [
            "streamlit>=1.28.0",
            "gradio>=4.0.0",
        ],
        "all": [
            "pytest>=7.0.0",
            "streamlit>=1.28.0",
            "gradio>=4.0.0",
        ],
    },
    entry_points={
        "console_scripts": [
            "llm-toolkit=llm_toolkit.cli:main",
            "llm-ui=llm_toolkit.ui.streamlit_app:main",
        ],
    },
    include_package_data=True,
    package_data={
        "llm_toolkit": [
            "config/*.yaml",
            "prompts/*.txt",
        ],
    },
    zip_safe=False,
)

2. requirements.txt - 依赖包

# LLM Toolkit Requirements

# Core LLM Libraries
langchain>=0.1.0
langchain-community>=0.0.10
langchain-openai>=0.0.2
openai>=1.6.0
anthropic>=0.8.0
google-generativeai>=0.3.0

# Vector Databases
chromadb>=0.4.22
faiss-cpu>=1.7.4
pinecone-client>=3.0.0
qdrant-client>=1.7.0

# Embeddings
sentence-transformers>=2.2.2
tiktoken>=0.5.2

# Document Processing
pypdf>=3.17.0
python-docx>=1.1.0
python-pptx>=0.6.23
openpyxl>=3.1.2
beautifulsoup4>=4.12.0
lxml>=4.9.0
unstructured>=0.11.0

# Data Processing
pandas>=2.0.0
numpy>=1.24.0

# API & Web
fastapi>=0.109.0
uvicorn>=0.27.0
httpx>=0.26.0
requests>=2.31.0

# Utilities
python-dotenv>=1.0.0
pydantic>=2.5.0
pyyaml>=6.0.1
jinja2>=3.1.2
tenacity>=8.2.3

# Monitoring & Logging
loguru>=0.7.2
prometheus-client>=0.19.0

# Caching
redis>=5.0.0
diskcache>=5.6.3

# Optional UI (install with pip install llm-toolkit[ui])
# streamlit>=1.28.0
# gradio>=4.0.0

# Development (install with pip install llm-toolkit[dev])
# pytest>=7.0.0
# pytest-cov>=4.0.0
# black>=23.0.0
# flake8>=6.0.0
# mypy>=1.0.0
# isort>=5.12.0

3. .env.example - 环境变量示例

# LLM Toolkit Environment Variables

# ==========================================
# LLM Provider API Keys
# ==========================================

# OpenAI
OPENAI_API_KEY=sk-your-openai-api-key-here
OPENAI_ORG_ID=org-your-org-id-here

# Anthropic (Claude)
ANTHROPIC_API_KEY=sk-ant-your-anthropic-key-here

# Google (Gemini)
GOOGLE_API_KEY=your-google-api-key-here

# Azure OpenAI
AZURE_OPENAI_API_KEY=your-azure-key-here
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_VERSION=2024-02-01

# Cohere
COHERE_API_KEY=your-cohere-key-here

# HuggingFace
HUGGINGFACE_API_KEY=hf_your-huggingface-key-here

# ==========================================
# Vector Database Configuration
# ==========================================

# ChromaDB
CHROMA_HOST=localhost
CHROMA_PORT=8000
CHROMA_PERSIST_DIRECTORY=./data/chroma

# Pinecone
PINECONE_API_KEY=your-pinecone-api-key-here
PINECONE_ENVIRONMENT=us-west1-gcp

# Qdrant
QDRANT_URL=http://localhost:6333
QDRANT_API_KEY=your-qdrant-api-key-here

# ==========================================
# Application Configuration
# ==========================================

# Environment
ENVIRONMENT=development
DEBUG=true

# API Configuration
API_HOST=0.0.0.0
API_PORT=8000
API_WORKERS=4

# Default LLM Settings
DEFAULT_LLM_PROVIDER=openai
DEFAULT_MODEL=gpt-4-turbo-preview
DEFAULT_TEMPERATURE=0.7
DEFAULT_MAX_TOKENS=2000

# RAG Settings
CHUNK_SIZE=1000
CHUNK_OVERLAP=200
TOP_K_RESULTS=5
SIMILARITY_THRESHOLD=0.7

# Embedding Settings
EMBEDDING_MODEL=text-embedding-3-small
EMBEDDING_DIMENSION=1536

# ==========================================
# Cost Tracking
# ==========================================

# Enable cost tracking
ENABLE_COST_TRACKING=true
COST_TRACKING_DB=sqlite:///./data/costs.db

# Budget limits (USD)
DAILY_BUDGET_LIMIT=100.0
MONTHLY_BUDGET_LIMIT=3000.0

# ==========================================
# Caching
# ==========================================

# Enable caching
ENABLE_CACHE=true
CACHE_TYPE=redis  # redis, disk, memory

# Redis Cache
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=

# Disk Cache
DISK_CACHE_DIR=./data/cache
DISK_CACHE_SIZE_LIMIT=1073741824  # 1GB

# Cache TTL (seconds)
CACHE_TTL=3600

# ==========================================
# Logging
# ==========================================

LOG_LEVEL=INFO
LOG_FILE=./logs/llm-toolkit.log
LOG_MAX_SIZE=10485760  # 10MB
LOG_BACKUP_COUNT=5

# ==========================================
# Security
# ==========================================

# API Security
API_KEY=your-api-key-for-securing-endpoints
ENABLE_AUTH=false

# Rate Limiting
RATE_LIMIT_ENABLED=true
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_PERIOD=60  # seconds

# ==========================================
# Monitoring
# ==========================================

# Prometheus
ENABLE_METRICS=false
METRICS_PORT=9090

# Sentry
SENTRY_DSN=

# ==========================================
# UI Configuration
# ==========================================

# Streamlit
STREAMLIT_SERVER_PORT=8501
STREAMLIT_SERVER_ADDRESS=0.0.0.0

# Gradio
GRADIO_SERVER_PORT=7860
GRADIO_SERVER_NAME=0.0.0.0

# ==========================================
# Advanced Settings
# ==========================================

# Conversation Memory
MAX_CONVERSATION_HISTORY=10
CONVERSATION_MEMORY_TYPE=buffer  # buffer, summary, window

# Document Processing
MAX_FILE_SIZE=52428800  # 50MB
SUPPORTED_FILE_TYPES=pdf,docx,txt,md,csv,xlsx

# Retry Settings
MAX_RETRIES=3
RETRY_DELAY=1.0

# Timeout Settings
REQUEST_TIMEOUT=30.0
STREAM_TIMEOUT=60.0

4. README.md - 项目说明

# 🚀 LLM Toolkit - 大语言模型工具包

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Python 3.8+](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)

**企业级LLM应用开发加速器** - 让AI应用开发更简单、更高效、更经济

---

## ✨ 核心特性

### 🎯 Prompt模板管理
- 📝 可视化Prompt设计器
- 🔄 版本控制和历史记录
- 📊 A/B测试和性能对比
- 🌐 多语言模板支持
- 💾 模板库和分享

### 🗄️ 向量数据库集成
- 🔌 支持多种向量数据库(ChromaDB, Pinecone, Qdrant, FAISS)
- 🚀 自动化嵌入向量生成
- 🔍 高效相似度搜索
- 📈 可扩展的索引管理
- 🔄 实时数据同步

### 🧠 RAG (检索增强生成)
- 📚 智能文档加载和分块
- 🎯 上下文相关性排序
- 🔗 多文档源整合
- 💡 自动引用和溯源
- ⚡ 流式响应支持

### 💬 对话管理
- 🗨️ 多轮对话上下文维护
- 🧠 智能记忆管理
- 👥 多用户会话隔离
- 📊 对话历史分析
- 🔄 对话状态持久化

### 💰 成本优化
- 📊 实时成本追踪
- 💵 预算管理和告警
- 🎯 智能模型选择
- 📉 Token使用优化
- 📈 成本分析报告

---

## 🎯 快速开始

### 安装

```bash
# 基础安装
pip install llm-toolkit

# 包含UI组件
pip install llm-toolkit[ui]

# 开发环境
pip install llm-toolkit[dev]

# 完整安装
pip install llm-toolkit[all]

配置

# 复制环境变量模板
cp .env.example .env

# 编辑配置文件,添加API密钥
nano .env

基础使用

from llm_toolkit import LLMManager, PromptManager, RAGPipeline

# 初始化LLM管理器
llm = LLMManager(provider="openai", model="gpt-4-turbo-preview")

# 简单对话
response = llm.chat("你好,请介绍一下自己")
print(response)

# 使用Prompt模板
prompt_mgr = PromptManager()
prompt = prompt_mgr.get_template("summarization")
response = llm.chat(prompt.format(text="长文本内容..."))

# RAG检索增强生成
rag = RAGPipeline(llm=llm)
rag.add_documents("./docs")
response = rag.query("什么是RAG?")
print(response)

📖 详细文档

核心组件

1. LLM管理器
from llm_toolkit.core import LLMManager

# 初始化
llm = LLMManager(
    provider="openai",  # openai, anthropic, google, azure
    model="gpt-4-turbo-preview",
    temperature=0.7,
    max_tokens=2000
)

# 简单调用
response = llm.chat("你的问题")

# 流式响应
for chunk in llm.stream("你的问题"):
    print(chunk, end="", flush=True)

# 批量处理
responses = llm.batch_chat(["问题1", "问题2", "问题3"])
2. Prompt模板管理
from llm_toolkit.core import PromptManager

# 初始化
pm = PromptManager()

# 创建模板
pm.create_template(
    name="product_description",
    template="为以下产品生成吸引人的描述:\n产品名称:{product_name}\n特点:{features}",
    variables=["product_name", "features"],
    category="marketing"
)

# 使用模板
prompt = pm.get_template("product_description")
text = prompt.format(
    product_name="智能手表",
    features="心率监测、GPS定位、防水"
)

# 版本管理
pm.create_version("product_description", "v2", new_template="...")
pm.rollback("product_description", "v1")
3. RAG检索增强生成
from llm_toolkit.rag import RAGPipeline

# 初始化
rag = RAGPipeline(
    llm=llm,
    vector_store="chromadb",
    embedding_model="text-embedding-3-small"
)

# 添加文档
rag.add_documents(
    source="./docs",
    file_types=["pdf", "docx", "txt"],
    chunk_size=1000,
    chunk_overlap=200
)

# 查询
response = rag.query(
    question="什么是向量数据库?",
    top_k=5,
    return_sources=True
)

print(response.answer)
print(response.sources)
4. 对话管理
from llm_toolkit.core import ConversationManager

# 初始化
conv = ConversationManager(
    llm=llm,
    memory_type="buffer",  # buffer, summary, window
    max_history=10
)

# 多轮对话
conv.add_message("user", "你好")
response1 = conv.chat()

conv.add_message("user", "我刚才说了什么?")
response2 = conv.chat()  # 会记住上下文

# 保存和加载对话
conv.save("conversation_001.json")
conv.load("conversation_001.json")
5. 成本追踪
from llm_toolkit.core import CostTracker

# 初始化
tracker = CostTracker(
    daily_limit=100.0,
    monthly_limit=3000.0
)

# 自动追踪(集成到LLM调用中)
llm = LLMManager(cost_tracker=tracker)
response = llm.chat("你的问题")

# 查看成本
print(f"今日成本: ${tracker.get_daily_cost():.4f}")
print(f"本月成本: ${tracker.get_monthly_cost():.4f}")

# 成本分析
report = tracker.generate_report(period="monthly")
print(report)

🎨 UI界面

Streamlit界面

# 启动Streamlit UI
llm-ui

# 或直接运行
streamlit run llm_toolkit/ui/streamlit_app.py

Gradio界面

from llm_toolkit.ui import launch_gradio

# 启动Gradio界面
launch_gradio(
    share=True,  # 创建公共链接
    port=7860
)

🔧 高级功能

智能代理 (Agents)

from llm_toolkit.agents import ToolAgent

# 创建工具代理
agent = ToolAgent(
    llm=llm,
    tools=["calculator", "web_search", "python_repl"]
)

# 执行任务
result = agent.run("计算 (123 + 456) * 789 并搜索结果的含义")

自定义链 (Chains)

from llm_toolkit.chains import QAChain, SummarizationChain

# 问答链
qa_chain = QAChain(llm=llm, rag=rag)
answer = qa_chain.run("你的问题")

# 摘要链
summary_chain = SummarizationChain(llm=llm, strategy="map_reduce")
summary = summary_chain.run(long_text)

批量处理

from llm_toolkit.utils import BatchProcessor

# 批量处理
processor = BatchProcessor(
    llm=llm,
    batch_size=10,
    max_workers=4
)

results = processor.process(
    inputs=["问题1", "问题2", ...],
    template="answer_question"
)

📊 性能优化

缓存策略

from llm_toolkit import LLMManager

# 启用缓存
llm = LLMManager(
    enable_cache=True,
    cache_type="redis",  # redis, disk, memory
    cache_ttl=3600
)

# 相同问题会从缓存返回
response1 = llm.chat("什么是AI?")  # API调用
response2 = llm.chat("什么是AI?")  # 缓存返回

并发控制

from llm_toolkit.utils import RateLimiter

# 速率限制
limiter = RateLimiter(
    max_requests=100,
    period=60  # 60秒内最多100次请求
)

llm = LLMManager(rate_limiter=limiter)

🔐 安全性

API密钥管理

from llm_toolkit.utils import SecretManager

# 安全存储API密钥
secrets = SecretManager()
secrets.set("OPENAI_API_KEY", "sk-...")

# 自动加载
llm = LLMManager()  # 自动从环境变量或密钥管理器加载

内容过滤

from llm_toolkit.utils import ContentFilter

# 内容安全检查
filter = ContentFilter()
is_safe = filter.check(user_input)

if is_safe:
    response = llm.chat(user_input)

📈 监控和日志

日志配置

from llm_toolkit.utils import setup_logging

# 配置日志
setup_logging(
    level="INFO",
    log_file="./logs/llm-toolkit.log",
    rotation="10 MB"
)

Prometheus指标

from llm_toolkit.monitoring import enable_metrics

# 启用Prometheus指标
enable_metrics(port=9090)

# 访问 http://localhost:9090/metrics

🧪 测试

# 运行所有测试
pytest

# 运行特定测试
pytest tests/test_llm_manager.py

# 生成覆盖率报告
pytest --cov=llm_toolkit --cov-report=html

由 DREAMVFIA 用 ❤️ 打造


---

### 5. `config/default.yaml` - 默认配置

```yaml
# LLM Toolkit Default Configuration

# Application Settings
app:
  name: "LLM Toolkit"
  version: "1.0.0"
  environment: "development"
  debug: true

# LLM Provider Settings
llm:
  default_provider: "openai"
  default_model: "gpt-4-turbo-preview"
  default_temperature: 0.7
  default_max_tokens: 2000
  
  providers:
    openai:
      models:
        - gpt-4-turbo-preview
        - gpt-4
        - gpt-3.5-turbo
        - gpt-3.5-turbo-16k
      pricing:
        gpt-4-turbo-preview:
          input: 0.01
          output: 0.03
        gpt-3.5-turbo:
          input: 0.0005
          output: 0.0015
    
    anthropic:
      models:
        - claude-3-opus-20240229
        - claude-3-sonnet-20240229
        - claude-3-haiku-20240307
      pricing:
        claude-3-opus-20240229:
          input: 0.015
          output: 0.075
    
    google:
      models:
        - gemini-pro
        - gemini-pro-vision
      pricing:
        gemini-pro:
          input: 0.00025
          output: 0.0005

# Vector Database Settings
vector_store:
  default: "chromadb"
  
  chromadb:
    host: "localhost"
    port: 8000
    persist_directory: "./data/chroma"
    collection_name: "default"
  
  pinecone:
    environment: "us-west1-gcp"
    index_name: "llm-toolkit"
    dimension: 1536
  
  qdrant:
    url: "http://localhost:6333"
    collection_name: "llm-toolkit"
    dimension: 1536

# Embedding Settings
embeddings:
  default_model: "text-embedding-3-small"
  dimension: 1536
  batch_size: 100
  
  models:
    text-embedding-3-small:
      dimension: 1536
      pricing: 0.00002
    text-embedding-3-large:
      dimension: 3072
      pricing: 0.00013
    text-embedding-ada-002:
      dimension: 1536
      pricing: 0.0001

# RAG Settings
rag:
  chunk_size: 1000
  chunk_overlap: 200
  top_k: 5
  similarity_threshold: 0.7
  rerank: false
  
  text_splitter:
    type: "recursive"  # recursive, character, token
    separators: ["\n\n", "\n", " ", ""]

# Conversation Settings
conversation:
  memory_type: "buffer"  # buffer, summary, window
  max_history: 10
  summary_threshold: 5
  
  window_size: 5

# Cost Tracking
cost_tracking:
  enabled: true
  database: "sqlite:///./data/costs.db"
  
  limits:
    daily: 100.0
    monthly: 3000.0
  
  alerts:
    enabled: true
    thresholds:
      warning: 0.8
      critical: 0.95

# Caching
cache:
  enabled: true
  type: "redis"  # redis, disk, memory
  ttl: 3600
  
  redis:
    host: "localhost"
    port: 6379
    db: 0
  
  disk:
    directory: "./data/cache"
    size_limit: 1073741824  # 1GB

# Logging
logging:
  level: "INFO"
  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  
  file:
    enabled: true
    path: "./logs/llm-toolkit.log"
    max_size: 10485760  # 10MB
    backup_count: 5
  
  console:
    enabled: true

# API Settings
api:
  host: "0.0.0.0"
  port: 8000
  workers: 4
  reload: false
  
  cors:
    enabled: true
    origins: ["*"]
  
  rate_limiting:
    enabled: true
    requests: 100
    period: 60

# Security
security:
  enable_auth: false
  api_key: ""
  
  content_filter:
    enabled: true
    blocked_patterns: []

# Monitoring
monitoring:
  prometheus:
    enabled: false
    port: 9090
  
  sentry:
    enabled: false
    dsn: ""

# UI Settings
ui:
  streamlit:
    port: 8501
    theme: "light"
  
  gradio:
    port: 7860
    share: false

# Document Processing
document:
  max_file_size: 52428800  # 50MB
  supported_types:
    - pdf
    - docx
    - txt
    - md
    - csv
    - xlsx
  
  ocr:
    enabled: false
    language: "eng"

# Advanced Settings
advanced:
  retry:
    max_attempts: 3
    delay: 1.0
    exponential_backoff: true
  
  timeout:
    request: 30.0
    stream: 60.0
  
  batch:
    size: 10
    max_workers: 4

LLM Toolkit - 第二部分:核心模块代码


第二部分:核心模块实现

6. llm_toolkit/__init__.py - 主包初始化

# -*- coding: utf-8 -*-
"""
LLM Toolkit - 大语言模型工具包
Enterprise-grade LLM Application Development Accelerator

Author: DREAMVFIA
License: MIT
"""

__version__ = "1.0.0"
__author__ = "DREAMVFIA"
__email__ = "contact@dreamvfia.com"

from llm_toolkit.core.llm_manager import LLMManager
from llm_toolkit.core.prompt_manager import PromptManager
from llm_toolkit.core.conversation import ConversationManager
from llm_toolkit.core.cost_tracker import CostTracker
from llm_toolkit.rag import RAGPipeline

__all__ = [
    "LLMManager",
    "PromptManager",
    "ConversationManager",
    "CostTracker",
    "RAGPipeline",
]


def get_version():
    """获取版本信息"""
    return __version__


def setup():
    """初始化设置"""
    import os
    from pathlib import Path
    
    # 创建必要的目录
    directories = [
        "data",
        "data/chroma",
        "data/cache",
        "logs",
        "prompts",
        "models",
    ]
    
    for directory in directories:
        Path(directory).mkdir(parents=True, exist_ok=True)
    
    # 加载环境变量
    from dotenv import load_dotenv
    load_dotenv()
    
    print(f"LLM Toolkit v{__version__} initialized successfully!")


# 自动设置
import os
if os.getenv("AUTO_SETUP", "true").lower() == "true":
    setup()

7. llm_toolkit/core/__init__.py

# -*- coding: utf-8 -*-
"""
Core Module - 核心模块
"""

from llm_toolkit.core.llm_manager import LLMManager
from llm_toolkit.core.prompt_manager import PromptManager
from llm_toolkit.core.conversation import ConversationManager
from llm_toolkit.core.cost_tracker import CostTracker

__all__ = [
    "LLMManager",
    "PromptManager",
    "ConversationManager",
    "CostTracker",
]

8. llm_toolkit/core/llm_manager.py - LLM管理器

# -*- coding: utf-8 -*-
"""
LLM Manager - LLM管理器
支持多种LLM提供商的统一接口
"""

import os
from typing import Optional, List, Dict, Any, Generator, Union
from dataclasses import dataclass
import time
from loguru import logger

try:
    from langchain_openai import ChatOpenAI
    from langchain_anthropic import ChatAnthropic
    from langchain_google_genai import ChatGoogleGenerativeAI
except ImportError:
    logger.warning("LangChain providers not fully installed. Some features may be unavailable.")

from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler


@dataclass
class LLMResponse:
    """LLM响应数据类"""
    content: str
    model: str
    provider: str
    tokens_used: Dict[str, int]
    cost: float
    latency: float
    metadata: Dict[str, Any]


class LLMManager:
    """
    LLM管理器
    
    支持的提供商:
    - OpenAI (GPT-4, GPT-3.5)
    - Anthropic (Claude)
    - Google (Gemini)
    - Azure OpenAI
    """
    
    # 定价信息 (USD per 1K tokens)
    PRICING = {
        "openai": {
            "gpt-4-turbo-preview": {"input": 0.01, "output": 0.03},
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
            "gpt-3.5-turbo-16k": {"input": 0.003, "output": 0.004},
        },
        "anthropic": {
            "claude-3-opus-20240229": {"input": 0.015, "output": 0.075},
            "claude-3-sonnet-20240229": {"input": 0.003, "output": 0.015},
            "claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125},
        },
        "google": {
            "gemini-pro": {"input": 0.00025, "output": 0.0005},
            "gemini-pro-vision": {"input": 0.00025, "output": 0.0005},
        }
    }
    
    def __init__(
        self,
        provider: str = "openai",
        model: Optional[str] = None,
        api_key: Optional[str] = None,
        temperature: float = 0.7,
        max_tokens: int = 2000,
        streaming: bool = False,
        cost_tracker: Optional[Any] = None,
        **kwargs
    ):
        """
        初始化LLM管理器
        
        Args:
            provider: LLM提供商 (openai, anthropic, google, azure)
            model: 模型名称
            api_key: API密钥
            temperature: 温度参数 (0-1)
            max_tokens: 最大token数
            streaming: 是否启用流式响应
            cost_tracker: 成本追踪器实例
            **kwargs: 其他参数
        """
        self.provider = provider.lower()
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.streaming = streaming
        self.cost_tracker = cost_tracker
        self.kwargs = kwargs
        
        # 设置默认模型
        if model is None:
            model = self._get_default_model()
        self.model = model
        
        # 设置API密钥
        if api_key:
            self.api_key = api_key
        else:
            self.api_key = self._get_api_key()
        
        # 初始化LLM实例
        self.llm = self._initialize_llm()
        
        logger.info(f"LLM Manager initialized: {self.provider}/{self.model}")
    
    def _get_default_model(self) -> str:
        """获取默认模型"""
        defaults = {
            "openai": "gpt-4-turbo-preview",
            "anthropic": "claude-3-sonnet-20240229",
            "google": "gemini-pro",
            "azure": "gpt-4-turbo-preview",
        }
        return defaults.get(self.provider, "gpt-3.5-turbo")
    
    def _get_api_key(self) -> str:
        """获取API密钥"""
        key_mapping = {
            "openai": "OPENAI_API_KEY",
            "anthropic": "ANTHROPIC_API_KEY",
            "google": "GOOGLE_API_KEY",
            "azure": "AZURE_OPENAI_API_KEY",
        }
        
        env_var = key_mapping.get(self.provider)
        if not env_var:
            raise ValueError(f"Unsupported provider: {self.provider}")
        
        api_key = os.getenv(env_var)
        if not api_key:
            raise ValueError(f"API key not found: {env_var}")
        
        return api_key
    
    def _initialize_llm(self):
        """初始化LLM实例"""
        callbacks = [StreamingStdOutCallbackHandler()] if self.streaming else None
        
        if self.provider == "openai":
            return ChatOpenAI(
                model=self.model,
                openai_api_key=self.api_key,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                streaming=self.streaming,
                callbacks=callbacks,
                **self.kwargs
            )
        
        elif self.provider == "anthropic":
            return ChatAnthropic(
                model=self.model,
                anthropic_api_key=self.api_key,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
                streaming=self.streaming,
                callbacks=callbacks,
                **self.kwargs
            )
        
        elif self.provider == "google":
            return ChatGoogleGenerativeAI(
                model=self.model,
                google_api_key=self.api_key,
                temperature=self.temperature,
                max_output_tokens=self.max_tokens,
                streaming=self.streaming,
                callbacks=callbacks,
                **self.kwargs
            )
        
        else:
            raise ValueError(f"Unsupported provider: {self.provider}")
    
    def chat(
        self,
        message: str,
        system_prompt: Optional[str] = None,
        **kwargs
    ) -> Union[str, LLMResponse]:
        """
        发送聊天消息
        
        Args:
            message: 用户消息
            system_prompt: 系统提示词
            **kwargs: 其他参数
        
        Returns:
            响应内容或LLMResponse对象
        """
        start_time = time.time()
        
        # 构建消息
        messages = []
        if system_prompt:
            messages.append(SystemMessage(content=system_prompt))
        messages.append(HumanMessage(content=message))
        
        # 调用LLM
        try:
            response = self.llm.invoke(messages, **kwargs)
            content = response.content
            
            # 计算成本
            tokens_used = self._estimate_tokens(message, content)
            cost = self._calculate_cost(tokens_used)
            latency = time.time() - start_time
            
            # 记录成本
            if self.cost_tracker:
                self.cost_tracker.track(
                    provider=self.provider,
                    model=self.model,
                    tokens=tokens_used,
                    cost=cost
                )
            
            # 返回详细响应
            if kwargs.get("return_full_response", False):
                return LLMResponse(
                    content=content,
                    model=self.model,
                    provider=self.provider,
                    tokens_used=tokens_used,
                    cost=cost,
                    latency=latency,
                    metadata={"response": response}
                )
            
            return content
        
        except Exception as e:
            logger.error(f"LLM call failed: {e}")
            raise
    
    def stream(
        self,
        message: str,
        system_prompt: Optional[str] = None,
        **kwargs
    ) -> Generator[str, None, None]:
        """
        流式响应
        
        Args:
            message: 用户消息
            system_prompt: 系统提示词
            **kwargs: 其他参数
        
        Yields:
            响应片段
        """
        # 构建消息
        messages = []
        if system_prompt:
            messages.append(SystemMessage(content=system_prompt))
        messages.append(HumanMessage(content=message))
        
        # 流式调用
        try:
            for chunk in self.llm.stream(messages, **kwargs):
                if hasattr(chunk, 'content'):
                    yield chunk.content
        
        except Exception as e:
            logger.error(f"LLM streaming failed: {e}")
            raise
    
    def batch_chat(
        self,
        messages: List[str],
        system_prompt: Optional[str] = None,
        **kwargs
    ) -> List[str]:
        """
        批量处理消息
        
        Args:
            messages: 消息列表
            system_prompt: 系统提示词
            **kwargs: 其他参数
        
        Returns:
            响应列表
        """
        responses = []
        
        for message in messages:
            try:
                response = self.chat(message, system_prompt, **kwargs)
                responses.append(response)
            except Exception as e:
                logger.error(f"Batch processing failed for message: {message[:50]}... Error: {e}")
                responses.append(f"Error: {str(e)}")
        
        return responses
    
    def _estimate_tokens(self, input_text: str, output_text: str) -> Dict[str, int]:
        """
        估算token使用量
        
        Args:
            input_text: 输入文本
            output_text: 输出文本
        
        Returns:
            token使用量字典
        """
        # 简单估算: 1 token ≈ 4 characters
        input_tokens = len(input_text) // 4
        output_tokens = len(output_text) // 4
        
        return {
            "input": input_tokens,
            "output": output_tokens,
            "total": input_tokens + output_tokens
        }
    
    def _calculate_cost(self, tokens: Dict[str, int]) -> float:
        """
        计算成本
        
        Args:
            tokens: token使用量
        
        Returns:
            成本 (USD)
        """
        pricing = self.PRICING.get(self.provider, {}).get(self.model, {})
        
        if not pricing:
            logger.warning(f"Pricing not found for {self.provider}/{self.model}")
            return 0.0
        
        input_cost = (tokens["input"] / 1000) * pricing.get("input", 0)
        output_cost = (tokens["output"] / 1000) * pricing.get("output", 0)
        
        return input_cost + output_cost
    
    def get_available_models(self) -> List[str]:
        """获取可用模型列表"""
        return list(self.PRICING.get(self.provider, {}).keys())
    
    def switch_model(self, model: str):
        """切换模型"""
        if model not in self.get_available_models():
            raise ValueError(f"Model {model} not available for provider {self.provider}")
        
        self.model = model
        self.llm = self._initialize_llm()
        logger.info(f"Switched to model: {model}")
    
    def __repr__(self) -> str:
        return f"LLMManager(provider={self.provider}, model={self.model})"

9. llm_toolkit/core/prompt_manager.py - Prompt模板管理器

# -*- coding: utf-8 -*-
"""
Prompt Manager - Prompt模板管理器
提供Prompt模板的创建、管理、版本控制功能
"""

import json
import os
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from jinja2 import Template
from loguru import logger


@dataclass
class PromptTemplate:
    """Prompt模板数据类"""
    name: str
    template: str
    variables: List[str]
    category: str = "general"
    description: str = ""
    version: str = "v1"
    created_at: str = ""
    updated_at: str = ""
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.now().isoformat()
        if not self.updated_at:
            self.updated_at = datetime.now().isoformat()
        if self.metadata is None:
            self.metadata = {}
    
    def format(self, **kwargs) -> str:
        """
        格式化模板
        
        Args:
            **kwargs: 模板变量
        
        Returns:
            格式化后的文本
        """
        # 检查必需变量
        missing_vars = set(self.variables) - set(kwargs.keys())
        if missing_vars:
            raise ValueError(f"Missing required variables: {missing_vars}")
        
        # 使用Jinja2渲染
        template = Template(self.template)
        return template.render(**kwargs)
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return asdict(self)


class PromptManager:
    """
    Prompt模板管理器
    
    功能:
    - 创建和管理Prompt模板
    - 版本控制
    - 模板分类
    - 导入导出
    - A/B测试支持
    """
    
    def __init__(self, storage_path: str = "./prompts"):
        """
        初始化Prompt管理器
        
        Args:
            storage_path: 模板存储路径
        """
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)
        
        self.templates: Dict[str, PromptTemplate] = {}
        self.versions: Dict[str, List[PromptTemplate]] = {}
        
        # 加载已有模板
        self._load_templates()
        
        logger.info(f"Prompt Manager initialized with {len(self.templates)} templates")
    
    def create_template(
        self,
        name: str,
        template: str,
        variables: List[str],
        category: str = "general",
        description: str = "",
        **metadata
    ) -> PromptTemplate:
        """
        创建新模板
        
        Args:
            name: 模板名称
            template: 模板内容
            variables: 变量列表
            category: 分类
            description: 描述
            **metadata: 元数据
        
        Returns:
            PromptTemplate对象
        """
        if name in self.templates:
            logger.warning(f"Template {name} already exists. Use update_template() to modify.")
            raise ValueError(f"Template {name} already exists")
        
        prompt_template = PromptTemplate(
            name=name,
            template=template,
            variables=variables,
            category=category,
            description=description,
            metadata=metadata
        )
        
        self.templates[name] = prompt_template
        self.versions[name] = [prompt_template]
        
        # 保存到文件
        self._save_template(prompt_template)
        
        logger.info(f"Created template: {name}")
        return prompt_template
    
    def get_template(self, name: str, version: Optional[str] = None) -> PromptTemplate:
        """
        获取模板
        
        Args:
            name: 模板名称
            version: 版本号 (可选)
        
        Returns:
            PromptTemplate对象
        """
        if name not in self.templates:
            raise ValueError(f"Template {name} not found")
        
        if version:
            # 获取特定版本
            versions = self.versions.get(name, [])
            for template in versions:
                if template.version == version:
                    return template
            raise ValueError(f"Version {version} not found for template {name}")
        
        return self.templates[name]
    
    def update_template(
        self,
        name: str,
        template: Optional[str] = None,
        variables: Optional[List[str]] = None,
        category: Optional[str] = None,
        description: Optional[str] = None,
        create_version: bool = True,
        **metadata
    ) -> PromptTemplate:
        """
        更新模板
        
        Args:
            name: 模板名称
            template: 新模板内容
            variables: 新变量列表
            category: 新分类
            description: 新描述
            create_version: 是否创建新版本
            **metadata: 元数据
        
        Returns:
            更新后的PromptTemplate对象
        """
        if name not in self.templates:
            raise ValueError(f"Template {name} not found")
        
        current = self.templates[name]
        
        # 创建新版本或更新当前版本
        if create_version:
            # 生成新版本号
            version_num = len(self.versions.get(name, [])) + 1
            new_version = f"v{version_num}"
            
            new_template = PromptTemplate(
                name=name,
                template=template or current.template,
                variables=variables or current.variables,
                category=category or current.category,
                description=description or current.description,
                version=new_version,
                metadata={**current.metadata, **metadata}
            )
            
            self.versions[name].append(new_template)
            self.templates[name] = new_template
        else:
            # 更新当前版本
            if template:
                current.template = template
            if variables:
                current.variables = variables
            if category:
                current.category = category
            if description:
                current.description = description
            current.metadata.update(metadata)
            current.updated_at = datetime.now().isoformat()
        
        # 保存
        self._save_template(self.templates[name])
        
        logger.info(f"Updated template: {name}")
        return self.templates[name]
    
    def delete_template(self, name: str):
        """删除模板"""
        if name not in self.templates:
            raise ValueError(f"Template {name} not found")
        
        # 删除文件
        template_file = self.storage_path / f"{name}.json"
        if template_file.exists():
            template_file.unlink()
        
        # 删除内存中的数据
        del self.templates[name]
        if name in self.versions:
            del self.versions[name]
        
        logger.info(f"Deleted template: {name}")
    
    def list_templates(
        self,
        category: Optional[str] = None
    ) -> List[PromptTemplate]:
        """
        列出所有模板
        
        Args:
            category: 按分类筛选
        
        Returns:
            模板列表
        """
        templates = list(self.templates.values())
        
        if category:
            templates = [t for t in templates if t.category == category]
        
        return templates
    
    def get_categories(self) -> List[str]:
        """获取所有分类"""
        categories = set(t.category for t in self.templates.values())
        return sorted(list(categories))
    
    def get_versions(self, name: str) -> List[PromptTemplate]:
        """获取模板的所有版本"""
        if name not in self.versions:
            raise ValueError(f"Template {name} not found")
        
        return self.versions[name]
    
    def rollback(self, name: str, version: str):
        """回滚到指定版本"""
        template = self.get_template(name, version)
        self.templates[name] = template
        self._save_template(template)
        
        logger.info(f"Rolled back template {name} to version {version}")
    
    def export_template(self, name: str, output_path: str):
        """导出模板"""
        template = self.get_template(name)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(template.to_dict(), f, indent=2, ensure_ascii=False)
        
        logger.info(f"Exported template {name} to {output_path}")
    
    def import_template(self, input_path: str) -> PromptTemplate:
        """导入模板"""
        with open(input_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        template = PromptTemplate(**data)
        self.templates[template.name] = template
        
        if template.name not in self.versions:
            self.versions[template.name] = []
        self.versions[template.name].append(template)
        
        self._save_template(template)
        
        logger.info(f"Imported template {template.name}")
        return template
    
    def _save_template(self, template: PromptTemplate):
        """保存模板到文件"""
        template_file = self.storage_path / f"{template.name}.json"
        
        # 保存所有版本
        versions_data = [v.to_dict() for v in self.versions.get(template.name, [])]
        
        with open(template_file, 'w', encoding='utf-8') as f:
            json.dump({
                "current": template.to_dict(),
                "versions": versions_data
            }, f, indent=2, ensure_ascii=False)
    
    def _load_templates(self):
        """从文件加载模板"""
        for template_file in self.storage_path.glob("*.json"):
            try:
                with open(template_file, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                
                # 加载当前版本
                current = PromptTemplate(**data["current"])
                self.templates[current.name] = current
                
                # 加载所有版本
                versions = [PromptTemplate(**v) for v in data.get("versions", [])]
                self.versions[current.name] = versions
                
            except Exception as e:
                logger.error(f"Failed to load template from {template_file}: {e}")
    
    def create_builtin_templates(self):
        """创建内置模板"""
        builtin_templates = [
            {
                "name": "summarization",
                "template": "请总结以下文本的主要内容:\n\n{{text}}\n\n总结:",
                "variables": ["text"],
                "category": "text_processing",
                "description": "文本摘要模板"
            },
            {
                "name": "translation",
                "template": "请将以下{{source_lang}}文本翻译成{{target_lang}}:\n\n{{text}}\n\n翻译:",
                "variables": ["source_lang", "target_lang", "text"],
                "category": "text_processing",
                "description": "文本翻译模板"
            },
            {
                "name": "qa",
                "template": "基于以下上下文回答问题:\n\n上下文:{{context}}\n\n问题:{{question}}\n\n答案:",
                "variables": ["context", "question"],
                "category": "question_answering",
                "description": "问答模板"
            },
            {
                "name": "code_generation",
                "template": "请用{{language}}编写代码实现以下功能:\n\n{{description}}\n\n代码:",
                "variables": ["language", "description"],
                "category": "coding",
                "description": "代码生成模板"
            },
            {
                "name": "sentiment_analysis",
                "template": "分析以下文本的情感倾向(正面/负面/中性):\n\n{{text}}\n\n情感:",
                "variables": ["text"],
                "category": "analysis",
                "description": "情感分析模板"
            },
        ]
        
        for template_data in builtin_templates:
            try:
                if template_data["name"] not in self.templates:
                    self.create_template(**template_data)
            except Exception as e:
                logger.error(f"Failed to create builtin template {template_data['name']}: {e}")
        
        logger.info("Built-in templates created")
    
    def __repr__(self) -> str:
        return f"PromptManager(templates={len(self.templates)}, categories={len(self.get_categories())})"

10. llm_toolkit/core/conversation.py - 对话管理器

# -*- coding: utf-8 -*-
"""
Conversation Manager - 对话管理器
管理多轮对话的上下文和历史记录
"""

import json
from typing import List, Dict, Optional, Any
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from loguru import logger

from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain.memory import (
    ConversationBufferMemory,
    ConversationSummaryMemory,
    ConversationBufferWindowMemory
)


@dataclass
class Message:
    """消息数据类"""
    role: str  # system, user, assistant
    content: str
    timestamp: str = ""
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()
        if self.metadata is None:
            self.metadata = {}
    
    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)
    
    def to_langchain_message(self):
        """转换为LangChain消息格式"""
        if self.role == "system":
            return SystemMessage(content=self.content)
        elif self.role == "user":
            return HumanMessage(content=self.content)
        elif self.role == "assistant":
            return AIMessage(content=self.content)
        else:
            raise ValueError(f"Unknown role: {self.role}")


class ConversationManager:
    """
    对话管理器
    
    功能:
    - 多轮对话上下文管理
    - 对话历史记录
    - 多种记忆策略
    - 对话持久化
    - 对话分析
    """
    
    def __init__(
        self,
        llm: Any,
        memory_type: str = "buffer",
        max_history: int = 10,
        session_id: Optional[str] = None,
        system_prompt: Optional[str] = None,
        **kwargs
    ):
        """
        初始化对话管理器
        
        Args:
            llm: LLM实例
            memory_type: 记忆类型 (buffer, summary, window)
            max_history: 最大历史记录数
            session_id: 会话ID
            system_prompt: 系统提示词
            **kwargs: 其他参数
        """
        self.llm = llm
        self.memory_type = memory_type
        self.max_history = max_history
        self.session_id = session_id or self._generate_session_id()
        self.system_prompt = system_prompt
        self.kwargs = kwargs
        
        # 初始化消息历史
        self.messages: List[Message] = []
        
        # 添加系统提示词
        if system_prompt:
            self.add_message("system", system_prompt)
        
        # 初始化记忆
        self.memory = self._initialize_memory()
        
        logger.info(f"Conversation Manager initialized: session={self.session_id}, memory={memory_type}")
    
    def _generate_session_id(self) -> str:
        """生成会话ID"""
        return f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    def _initialize_memory(self):
        """初始化记忆系统"""
        if self.memory_type == "buffer":
            return ConversationBufferMemory(
                return_messages=True,
                **self.kwargs
            )
        elif self.memory_type == "summary":
            return ConversationSummaryMemory(
                llm=self.llm.llm,
                return_messages=True,
                **self.kwargs
            )
        elif self.memory_type == "window":
            return ConversationBufferWindowMemory(
                k=self.max_history,
                return_messages=True,
                **self.kwargs
            )
        else:
            raise ValueError(f"Unknown memory type: {self.memory_type}")
    
    def add_message(
        self,
        role: str,
        content: str,
        **metadata
    ) -> Message:
        """
        添加消息
        
        Args:
            role: 角色 (system, user, assistant)
            content: 内容
            **metadata: 元数据
        
        Returns:
            Message对象
        """
        message = Message(
            role=role,
            content=content,
            metadata=metadata
        )
        
        self.messages.append(message)
        
        # 更新记忆
        if role == "user":
            self.memory.chat_memory.add_user_message(content)
        elif role == "assistant":
            self.memory.chat_memory.add_ai_message(content)
        
        # 限制历史长度
        if len(self.messages) > self.max_history * 2:  # user + assistant
            self.messages = self.messages[-(self.max_history * 2):]
        
        return message
    
    def chat(
        self,
        message: Optional[str] = None,
        **kwargs
    ) -> str:
        """
        发送消息并获取回复
        
        Args:
            message: 用户消息 (如果为None,使用最后一条用户消息)
            **kwargs: 其他参数
        
        Returns:
            助手回复
        """
        # 如果提供了新消息,添加到历史
        if message:
            self.add_message("user", message)
        
        # 构建消息列表
        messages = [msg.to_langchain_message() for msg in self.messages]
        
        # 调用LLM
        try:
            response = self.llm.llm.invoke(messages, **kwargs)
            assistant_message = response.content
            
            # 添加助手回复到历史
            self.add_message("assistant", assistant_message)
            
            return assistant_message
        
        except Exception as e:
            logger.error(f"Chat failed: {e}")
            raise
    
    def stream_chat(
        self,
        message: Optional[str] = None,
        **kwargs
    ):
        """
        流式对话
        
        Args:
            message: 用户消息
            **kwargs: 其他参数
        
        Yields:
            响应片段
        """
        if message:
            self.add_message("user", message)
        
        messages = [msg.to_langchain_message() for msg in self.messages]
        
        full_response = ""
        try:
            for chunk in self.llm.llm.stream(messages, **kwargs):
                if hasattr(chunk, 'content'):
                    full_response += chunk.content
                    yield chunk.content
            
            # 添加完整回复到历史
            self.add_message("assistant", full_response)
        
        except Exception as e:
            logger.error(f"Stream chat failed: {e}")
            raise
    
    def get_history(
        self,
        limit: Optional[int] = None,
        role: Optional[str] = None
    ) -> List[Message]:
        """
        获取对话历史
        
        Args:
            limit: 限制数量
            role: 按角色筛选
        
        Returns:
            消息列表
        """
        messages = self.messages
        
        if role:
            messages = [msg for msg in messages if msg.role == role]
        
        if limit:
            messages = messages[-limit:]
        
        return messages
    
    def clear_history(self):
        """清空对话历史"""
        self.messages = []
        self.memory.clear()
        
        # 重新添加系统提示词
        if self.system_prompt:
            self.add_message("system", self.system_prompt)
        
        logger.info(f"Cleared conversation history: {self.session_id}")
    
    def save(self, filepath: str):
        """
        保存对话
        
        Args:
            filepath: 保存路径
        """
        data = {
            "session_id": self.session_id,
            "memory_type": self.memory_type,
            "max_history": self.max_history,
            "system_prompt": self.system_prompt,
            "messages": [msg.to_dict() for msg in self.messages],
            "created_at": self.messages[0].timestamp if self.messages else "",
            "updated_at": self.messages[-1].timestamp if self.messages else ""
        }
        
        Path(filepath).parent.mkdir(parents=True, exist_ok=True)
        
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        logger.info(f"Saved conversation to {filepath}")
    
    def load(self, filepath: str):
        """
        加载对话
        
        Args:
            filepath: 文件路径
        """
        with open(filepath, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        self.session_id = data["session_id"]
        self.memory_type = data["memory_type"]
        self.max_history = data["max_history"]
        self.system_prompt = data.get("system_prompt")
        
        # 重建消息历史
        self.messages = []
        for msg_data in data["messages"]:
            message = Message(**msg_data)
            self.messages.append(message)
            
            # 更新记忆
            if message.role == "user":
                self.memory.chat_memory.add_user_message(message.content)
            elif message.role == "assistant":
                self.memory.chat_memory.add_ai_message(message.content)
        
        logger.info(f"Loaded conversation from {filepath}")
    
    def get_summary(self) -> str:
        """获取对话摘要"""
        if self.memory_type == "summary":
            return self.memory.buffer
        else:
            # 使用LLM生成摘要
            conversation_text = "\n".join([
                f"{msg.role}: {msg.content}"
                for msg in self.messages
            ])
            
            summary_prompt = f"请简要总结以下对话的主要内容:\n\n{conversation_text}\n\n摘要:"
            summary = self.llm.chat(summary_prompt)
            
            return summary
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取对话统计信息"""
        user_messages = [msg for msg in self.messages if msg.role == "user"]
        assistant_messages = [msg for msg in self.messages if msg.role == "assistant"]
        
        return {
            "session_id": self.session_id,
            "total_messages": len(self.messages),
            "user_messages": len(user_messages),
            "assistant_messages": len(assistant_messages),
            "avg_user_length": sum(len(msg.content) for msg in user_messages) / len(user_messages) if user_messages else 0,
            "avg_assistant_length": sum(len(msg.content) for msg in assistant_messages) / len(assistant_messages) if assistant_messages else 0,
            "duration": self._calculate_duration(),
        }
    
    def _calculate_duration(self) -> str:
        """计算对话时长"""
        if len(self.messages) < 2:
            return "0 seconds"
        
        start = datetime.fromisoformat(self.messages[0].timestamp)
        end = datetime.fromisoformat(self.messages[-1].timestamp)
        duration = end - start
        
        return str(duration)
    
    def __repr__(self) -> str:
        return f"ConversationManager(session={self.session_id}, messages={len(self.messages)})"

LLM Toolkit - 第三部分:成本追踪与RAG模块


第三部分:成本追踪器与RAG模块

11. llm_toolkit/core/cost_tracker.py - 成本追踪器

# -*- coding: utf-8 -*-
"""
Cost Tracker - 成本追踪器
追踪和管理LLM使用成本
"""

import sqlite3
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from pathlib import Path
import json
from loguru import logger


@dataclass
class CostRecord:
    """成本记录数据类"""
    id: Optional[int] = None
    timestamp: str = ""
    provider: str = ""
    model: str = ""
    input_tokens: int = 0
    output_tokens: int = 0
    total_tokens: int = 0
    input_cost: float = 0.0
    output_cost: float = 0.0
    total_cost: float = 0.0
    session_id: str = ""
    metadata: str = "{}"
    
    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()
        if not self.total_tokens:
            self.total_tokens = self.input_tokens + self.output_tokens
        if not self.total_cost:
            self.total_cost = self.input_cost + self.output_cost
    
    def to_dict(self) -> Dict[str, Any]:
        data = asdict(self)
        if isinstance(data['metadata'], str):
            data['metadata'] = json.loads(data['metadata'])
        return data


class CostTracker:
    """
    成本追踪器
    
    功能:
    - 实时成本追踪
    - 预算管理和告警
    - 成本分析和报告
    - 多维度统计
    - 成本优化建议
    """
    
    def __init__(
        self,
        db_path: str = "./data/costs.db",
        daily_limit: Optional[float] = None,
        monthly_limit: Optional[float] = None,
        alert_threshold: float = 0.8,
        enable_alerts: bool = True
    ):
        """
        初始化成本追踪器
        
        Args:
            db_path: 数据库路径
            daily_limit: 每日预算限制 (USD)
            monthly_limit: 每月预算限制 (USD)
            alert_threshold: 告警阈值 (0-1)
            enable_alerts: 是否启用告警
        """
        self.db_path = Path(db_path)
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        
        self.daily_limit = daily_limit
        self.monthly_limit = monthly_limit
        self.alert_threshold = alert_threshold
        self.enable_alerts = enable_alerts
        
        # 初始化数据库
        self._initialize_database()
        
        logger.info(f"Cost Tracker initialized: daily_limit=${daily_limit}, monthly_limit=${monthly_limit}")
    
    def _initialize_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(str(self.db_path))
        cursor = conn.cursor()
        
        # 创建成本记录表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS cost_records (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                provider TEXT NOT NULL,
                model TEXT NOT NULL,
                input_tokens INTEGER NOT NULL,
                output_tokens INTEGER NOT NULL,
                total_tokens INTEGER NOT NULL,
                input_cost REAL NOT NULL,
                output_cost REAL NOT NULL,
                total_cost REAL NOT NULL,
                session_id TEXT,
                metadata TEXT
            )
        """)
        
        # 创建索引
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_timestamp 
            ON cost_records(timestamp)
        """)
        
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_provider_model 
            ON cost_records(provider, model)
        """)
        
        conn.commit()
        conn.close()
        
        logger.info("Cost tracking database initialized")
    
    def track(
        self,
        provider: str,
        model: str,
        tokens: Dict[str, int],
        cost: float,
        session_id: str = "",
        **metadata
    ) -> CostRecord:
        """
        追踪一次LLM调用的成本
        
        Args:
            provider: 提供商
            model: 模型
            tokens: token使用量字典
            cost: 总成本
            session_id: 会话ID
            **metadata: 元数据
        
        Returns:
            CostRecord对象
        """
        # 创建成本记录
        record = CostRecord(
            provider=provider,
            model=model,
            input_tokens=tokens.get("input", 0),
            output_tokens=tokens.get("output", 0),
            total_tokens=tokens.get("total", 0),
            input_cost=cost * (tokens.get("input", 0) / tokens.get("total", 1)) if tokens.get("total", 0) > 0 else 0,
            output_cost=cost * (tokens.get("output", 0) / tokens.get("total", 1)) if tokens.get("total", 0) > 0 else 0,
            total_cost=cost,
            session_id=session_id,
            metadata=json.dumps(metadata)
        )
        
        # 保存到数据库
        self._save_record(record)
        
        # 检查预算
        if self.enable_alerts:
            self._check_budget()
        
        return record
    
    def _save_record(self, record: CostRecord):
        """保存成本记录到数据库"""
        conn = sqlite3.connect(str(self.db_path))
        cursor = conn.cursor()
        
        cursor.execute("""
            INSERT INTO cost_records (
                timestamp, provider, model,
                input_tokens, output_tokens, total_tokens,
                input_cost, output_cost, total_cost,
                session_id, metadata
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            record.timestamp,
            record.provider,
            record.model,
            record.input_tokens,
            record.output_tokens,
            record.total_tokens,
            record.input_cost,
            record.output_cost,
            record.total_cost,
            record.session_id,
            record.metadata
        ))
        
        conn.commit()
        conn.close()
    
    def get_daily_cost(self, date: Optional[datetime] = None) -> float:
        """
        获取指定日期的总成本
        
        Args:
            date: 日期 (默认今天)
        
        Returns:
            总成本 (USD)
        """
        if date is None:
            date = datetime.now()
        
        start = date.replace(hour=0, minute=0, second=0, microsecond=0)
        end = start + timedelta(days=1)
        
        return self._get_cost_in_range(start.isoformat(), end.isoformat())
    
    def get_monthly_cost(self, year: Optional[int] = None, month: Optional[int] = None) -> float:
        """
        获取指定月份的总成本
        
        Args:
            year: 年份 (默认今年)
            month: 月份 (默认本月)
        
        Returns:
            总成本 (USD)
        """
        now = datetime.now()
        year = year or now.year
        month = month or now.month
        
        start = datetime(year, month, 1)
        if month == 12:
            end = datetime(year + 1, 1, 1)
        else:
            end = datetime(year, month + 1, 1)
        
        return self._get_cost_in_range(start.isoformat(), end.isoformat())
    
    def _get_cost_in_range(self, start: str, end: str) -> float:
        """获取时间范围内的总成本"""
        conn = sqlite3.connect(str(self.db_path))
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT SUM(total_cost)
            FROM cost_records
            WHERE timestamp >= ? AND timestamp < ?
        """, (start, end))
        
        result = cursor.fetchone()[0]
        conn.close()
        
        return result or 0.0
    
    def _check_budget(self):
        """检查预算并发出告警"""
        # 检查每日预算
        if self.daily_limit:
            daily_cost = self.get_daily_cost()
            daily_usage = daily_cost / self.daily_limit
            
            if daily_usage >= 1.0:
                logger.error(f"⚠️ DAILY BUDGET EXCEEDED! Used: ${daily_cost:.4f} / ${self.daily_limit:.2f}")
            elif daily_usage >= self.alert_threshold:
                logger.warning(f"⚠️ Daily budget alert: {daily_usage:.1%} used (${daily_cost:.4f} / ${self.daily_limit:.2f})")
        
        # 检查每月预算
        if self.monthly_limit:
            monthly_cost = self.get_monthly_cost()
            monthly_usage = monthly_cost / self.monthly_limit
            
            if monthly_usage >= 1.0:
                logger.error(f"⚠️ MONTHLY BUDGET EXCEEDED! Used: ${monthly_cost:.4f} / ${self.monthly_limit:.2f}")
            elif monthly_usage >= self.alert_threshold:
                logger.warning(f"⚠️ Monthly budget alert: {monthly_usage:.1%} used (${monthly_cost:.4f} / ${self.monthly_limit:.2f})")
    
    def get_statistics(
        self,
        start_date: Optional[datetime] = None,
        end_date: Optional[datetime] = None,
        group_by: str = "provider"
    ) -> Dict[str, Any]:
        """
        获取成本统计
        
        Args:
            start_date: 开始日期
            end_date: 结束日期
            group_by: 分组维度 (provider, model, date)
        
        Returns:
            统计数据
        """
        if start_date is None:
            start_date = datetime.now() - timedelta(days=30)
        if end_date is None:
            end_date = datetime.now()
        
        conn = sqlite3.connect(str(self.db_path))
        cursor = conn.cursor()
        
        if group_by == "provider":
            cursor.execute("""
                SELECT 
                    provider,
                    COUNT(*) as calls,
                    SUM(total_tokens) as total_tokens,
                    SUM(total_cost) as total_cost,
                    AVG(total_cost) as avg_cost
                FROM cost_records
                WHERE timestamp >= ? AND timestamp <= ?
                GROUP BY provider
                ORDER BY total_cost DESC
            """, (start_date.isoformat(), end_date.isoformat()))
            
            columns = ["provider", "calls", "total_tokens", "total_cost", "avg_cost"]
        
        elif group_by == "model":
            cursor.execute("""
                SELECT 
                    provider,
                    model,
                    COUNT(*) as calls,
                    SUM(total_tokens) as total_tokens,
                    SUM(total_cost) as total_cost,
                    AVG(total_cost) as avg_cost
                FROM cost_records
                WHERE timestamp >= ? AND timestamp <= ?
                GROUP BY provider, model
                ORDER BY total_cost DESC
            """, (start_date.isoformat(), end_date.isoformat()))
            
            columns = ["provider", "model", "calls", "total_tokens", "total_cost", "avg_cost"]
        
        elif group_by == "date":
            cursor.execute("""
                SELECT 
                    DATE(timestamp) as date,
                    COUNT(*) as calls,
                    SUM(total_tokens) as total_tokens,
                    SUM(total_cost) as total_cost
                FROM cost_records
                WHERE timestamp >= ? AND timestamp <= ?
                GROUP BY DATE(timestamp)
                ORDER BY date DESC
            """, (start_date.isoformat(), end_date.isoformat()))
            
            columns = ["date", "calls", "total_tokens", "total_cost"]
        
        else:
            raise ValueError(f"Invalid group_by: {group_by}")
        
        results = cursor.fetchall()
        conn.close()
        
        # 转换为字典列表
        data = [dict(zip(columns, row)) for row in results]
        
        return {
            "period": {
                "start": start_date.isoformat(),
                "end": end_date.isoformat()
            },
            "group_by": group_by,
            "data": data,
            "summary": {
                "total_calls": sum(item.get("calls", 0) for item in data),
                "total_tokens": sum(item.get("total_tokens", 0) for item in data),
                "total_cost": sum(item.get("total_cost", 0) for item in data)
            }
        }
    
    def generate_report(
        self,
        period: str = "monthly",
        format: str = "text"
    ) -> str:
        """
        生成成本报告
        
        Args:
            period: 周期 (daily, weekly, monthly)
            format: 格式 (text, json, html)
        
        Returns:
            报告内容
        """
        now = datetime.now()
        
        if period == "daily":
            start_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
            end_date = now
            title = f"Daily Cost Report - {now.strftime('%Y-%m-%d')}"
        elif period == "weekly":
            start_date = now - timedelta(days=7)
            end_date = now
            title = f"Weekly Cost Report - {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
        elif period == "monthly":
            start_date = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
            end_date = now
            title = f"Monthly Cost Report - {now.strftime('%Y-%m')}"
        else:
            raise ValueError(f"Invalid period: {period}")
        
        # 获取统计数据
        stats_by_provider = self.get_statistics(start_date, end_date, "provider")
        stats_by_model = self.get_statistics(start_date, end_date, "model")
        
        if format == "json":
            return json.dumps({
                "title": title,
                "period": period,
                "by_provider": stats_by_provider,
                "by_model": stats_by_model
            }, indent=2)
        
        elif format == "text":
            report = f"""
{'='*80}
{title}
{'='*80}

Summary:
--------
Total Calls: {stats_by_provider['summary']['total_calls']:,}
Total Tokens: {stats_by_provider['summary']['total_tokens']:,}
Total Cost: ${stats_by_provider['summary']['total_cost']:.4f}

By Provider:
------------
"""
            for item in stats_by_provider['data']:
                report += f"""
{item['provider']}:
  Calls: {item['calls']:,}
  Tokens: {item['total_tokens']:,}
  Cost: ${item['total_cost']:.4f}
  Avg Cost/Call: ${item['avg_cost']:.4f}
"""
            
            report += "\nBy Model:\n"
            report += "-" * 80 + "\n"
            for item in stats_by_model['data']:
                report += f"{item['provider']}/{item['model']}: ${item['total_cost']:.4f} ({item['calls']} calls)\n"
            
            report += "=" * 80 + "\n"
            
            return report
        
        else:
            raise ValueError(f"Invalid format: {format}")
    
    def get_optimization_suggestions(self) -> List[str]:
        """获取成本优化建议"""
        suggestions = []
        
        # 分析最近30天的使用情况
        stats = self.get_statistics(
            start_date=datetime.now() - timedelta(days=30),
            end_date=datetime.now(),
            group_by="model"
        )
        
        for item in stats['data']:
            provider = item['provider']
            model = item['model']
            avg_cost = item['avg_cost']
            
            # 建议使用更便宜的模型
            if provider == "openai" and model == "gpt-4":
                suggestions.append(
                    f"考虑使用 gpt-3.5-turbo 替代 gpt-4,可节省约 95% 成本 "
                    f"(当前平均成本: ${avg_cost:.4f}/call)"
                )
            
            if provider == "anthropic" and model.startswith("claude-3-opus"):
                suggestions.append(
                    f"考虑使用 claude-3-sonnet 替代 claude-3-opus,可节省约 80% 成本 "
                    f"(当前平均成本: ${avg_cost:.4f}/call)"
                )
        
        # 检查缓存使用
        total_calls = stats['summary']['total_calls']
        if total_calls > 100:
            suggestions.append(
                f"启用缓存可以减少重复查询的成本 (当前总调用: {total_calls})"
            )
        
        # 检查token使用
        avg_tokens = stats['summary']['total_tokens'] / total_calls if total_calls > 0 else 0
        if avg_tokens > 2000:
            suggestions.append(
                f"考虑优化Prompt长度,当前平均使用 {avg_tokens:.0f} tokens/call"
            )
        
        return suggestions
    
    def export_records(
        self,
        output_path: str,
        start_date: Optional[datetime] = None,
        end_date: Optional[datetime] = None,
        format: str = "csv"
    ):
        """
        导出成本记录
        
        Args:
            output_path: 输出路径
            start_date: 开始日期
            end_date: 结束日期
            format: 格式 (csv, json)
        """
        if start_date is None:
            start_date = datetime.now() - timedelta(days=30)
        if end_date is None:
            end_date = datetime.now()
        
        conn = sqlite3.connect(str(self.db_path))
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT * FROM cost_records
            WHERE timestamp >= ? AND timestamp <= ?
            ORDER BY timestamp DESC
        """, (start_date.isoformat(), end_date.isoformat()))
        
        records = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        conn.close()
        
        if format == "csv":
            import csv
            with open(output_path, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow(columns)
                writer.writerows(records)
        
        elif format == "json":
            data = [dict(zip(columns, record)) for record in records]
            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
        
        else:
            raise ValueError(f"Invalid format: {format}")
        
        logger.info(f"Exported {len(records)} records to {output_path}")
    
    def __repr__(self) -> str:
        daily_cost = self.get_daily_cost()
        monthly_cost = self.get_monthly_cost()
        return f"CostTracker(daily=${daily_cost:.4f}, monthly=${monthly_cost:.4f})"

12. llm_toolkit/rag/__init__.py - RAG模块初始化

# -*- coding: utf-8 -*-
"""
RAG Module - 检索增强生成模块
"""

from llm_toolkit.rag.retriever import RAGRetriever
from llm_toolkit.rag.embeddings import EmbeddingManager
from llm_toolkit.rag.vector_store import VectorStoreManager
from llm_toolkit.rag.document_loader import DocumentLoader

# 导入RAG流水线
try:
    from llm_toolkit.rag.pipeline import RAGPipeline
except ImportError:
    pass

__all__ = [
    "RAGRetriever",
    "EmbeddingManager",
    "VectorStoreManager",
    "DocumentLoader",
    "RAGPipeline",
]

13. llm_toolkit/rag/embeddings.py - 嵌入向量管理器

# -*- coding: utf-8 -*-
"""
Embedding Manager - 嵌入向量管理器
管理文本嵌入向量的生成和缓存
"""

import os
from typing import List, Dict, Optional, Any
import numpy as np
from loguru import logger

try:
    from langchain_openai import OpenAIEmbeddings
    from langchain_community.embeddings import HuggingFaceEmbeddings
except ImportError:
    logger.warning("Embedding libraries not fully installed")


class EmbeddingManager:
    """
    嵌入向量管理器
    
    功能:
    - 多种嵌入模型支持
    - 批量嵌入生成
    - 嵌入缓存
    - 成本优化
    """
    
    # 模型配置
    MODELS = {
        "openai": {
            "text-embedding-3-small": {
                "dimension": 1536,
                "cost_per_1k": 0.00002
            },
            "text-embedding-3-large": {
                "dimension": 3072,
                "cost_per_1k": 0.00013
            },
            "text-embedding-ada-002": {
                "dimension": 1536,
                "cost_per_1k": 0.0001
            }
        },
        "huggingface": {
            "all-MiniLM-L6-v2": {
                "dimension": 384,
                "cost_per_1k": 0.0  # 免费
            },
            "all-mpnet-base-v2": {
                "dimension": 768,
                "cost_per_1k": 0.0
            }
        }
    }
    
    def __init__(
        self,
        provider: str = "openai",
        model: str = "text-embedding-3-small",
        api_key: Optional[str] = None,
        batch_size: int = 100,
        enable_cache: bool = True,
        **kwargs
    ):
        """
        初始化嵌入管理器
        
        Args:
            provider: 提供商 (openai, huggingface)
            model: 模型名称
            api_key: API密钥
            batch_size: 批处理大小
            enable_cache: 是否启用缓存
            **kwargs: 其他参数
        """
        self.provider = provider.lower()
        self.model = model
        self.batch_size = batch_size
        self.enable_cache = enable_cache
        self.kwargs = kwargs
        
        # 设置API密钥
        if api_key:
            self.api_key = api_key
        else:
            self.api_key = self._get_api_key()
        
        # 获取模型配置
        self.model_config = self._get_model_config()
        
        # 初始化嵌入模型
        self.embeddings = self._initialize_embeddings()
        
        # 初始化缓存
        self.cache: Dict[str, List[float]] = {}
        
        logger.info(f"Embedding Manager initialized: {self.provider}/{self.model}")
    
    def _get_api_key(self) -> Optional[str]:
        """获取API密钥"""
        if self.provider == "openai":
            return os.getenv("OPENAI_API_KEY")
        return None
    
    def _get_model_config(self) -> Dict[str, Any]:
        """获取模型配置"""
        provider_models = self.MODELS.get(self.provider, {})
        if self.model not in provider_models:
            raise ValueError(f"Model {self.model} not found for provider {self.provider}")
        return provider_models[self.model]
    
    def _initialize_embeddings(self):
        """初始化嵌入模型"""
        if self.provider == "openai":
            return OpenAIEmbeddings(
                model=self.model,
                openai_api_key=self.api_key,
                **self.kwargs
            )
        
        elif self.provider == "huggingface":
            return HuggingFaceEmbeddings(
                model_name=self.model,
                **self.kwargs
            )
        
        else:
            raise ValueError(f"Unsupported provider: {self.provider}")
    
    def embed_text(self, text: str) -> List[float]:
        """
        生成单个文本的嵌入向量
        
        Args:
            text: 文本
        
        Returns:
            嵌入向量
        """
        # 检查缓存
        if self.enable_cache and text in self.cache:
            return self.cache[text]
        
        # 生成嵌入
        embedding = self.embeddings.embed_query(text)
        
        # 缓存结果
        if self.enable_cache:
            self.cache[text] = embedding
        
        return embedding
    
    def embed_texts(self, texts: List[str]) -> List[List[float]]:
        """
        批量生成文本嵌入向量
        
        Args:
            texts: 文本列表
        
        Returns:
            嵌入向量列表
        """
        embeddings = []
        
        # 分批处理
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i + self.batch_size]
            
            # 检查缓存
            batch_embeddings = []
            uncached_texts = []
            uncached_indices = []
            
            for idx, text in enumerate(batch):
                if self.enable_cache and text in self.cache:
                    batch_embeddings.append(self.cache[text])
                else:
                    uncached_texts.append(text)
                    uncached_indices.append(idx)
            
            # 生成未缓存的嵌入
            if uncached_texts:
                new_embeddings = self.embeddings.embed_documents(uncached_texts)
                
                # 缓存新嵌入
                if self.enable_cache:
                    for text, embedding in zip(uncached_texts, new_embeddings):
                        self.cache[text] = embedding
                
                # 合并结果
                for idx, embedding in zip(uncached_indices, new_embeddings):
                    batch_embeddings.insert(idx, embedding)
            
            embeddings.extend(batch_embeddings)
        
        return embeddings
    
    def get_dimension(self) -> int:
        """获取嵌入向量维度"""
        return self.model_config["dimension"]
    
    def calculate_similarity(
        self,
        embedding1: List[float],
        embedding2: List[float]
    ) -> float:
        """
        计算两个嵌入向量的余弦相似度
        
        Args:
            embedding1: 嵌入向量1
            embedding2: 嵌入向量2
        
        Returns:
            相似度 (0-1)
        """
        vec1 = np.array(embedding1)
        vec2 = np.array(embedding2)
        
        # 余弦相似度
        similarity = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
        
        return float(similarity)
    
    def estimate_cost(self, num_texts: int, avg_length: int = 500) -> float:
        """
        估算嵌入成本
        
        Args:
            num_texts: 文本数量
            avg_length: 平均文本长度
        
        Returns:
            估算成本 (USD)
        """
        # 估算token数 (1 token ≈ 4 characters)
        total_tokens = num_texts * (avg_length / 4)
        
        # 计算成本
        cost_per_1k = self.model_config["cost_per_1k"]
        total_cost = (total_tokens / 1000) * cost_per_1k
        
        return total_cost
    
    def clear_cache(self):
        """清空缓存"""
        self.cache.clear()
        logger.info("Embedding cache cleared")
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        return {
            "cache_size": len(self.cache),
            "cache_enabled": self.enable_cache
        }
    
    def __repr__(self) -> str:
        return f"EmbeddingManager(provider={self.provider}, model={self.model}, dimension={self.get_dimension()})"

14. llm_toolkit/rag/vector_store.py - 向量数据库管理器

# -*- coding: utf-8 -*-
"""
Vector Store Manager - 向量数据库管理器
管理多种向量数据库的统一接口
"""

import os
from typing import List, Dict, Optional, Any, Tuple
from loguru import logger

try:
    from langchain_community.vectorstores import Chroma, FAISS, Qdrant, Pinecone
    from langchain.schema import Document
except ImportError:
    logger.warning("Vector store libraries not fully installed")


class VectorStoreManager:
    """
    向量数据库管理器
    
    支持的向量数据库:
    - ChromaDB
    - FAISS
    - Pinecone
    - Qdrant
    """
    
    def __init__(
        self,
        store_type: str = "chromadb",
        embedding_manager: Any = None,
        collection_name: str = "default",
        **kwargs
    ):
        """
        初始化向量数据库管理器
        
        Args:
            store_type: 数据库类型 (chromadb, faiss, pinecone, qdrant)
            embedding_manager: 嵌入管理器实例
            collection_name: 集合名称
            **kwargs: 其他参数
        """
        self.store_type = store_type.lower()
        self.embedding_manager = embedding_manager
        self.collection_name = collection_name
        self.kwargs = kwargs
        
        # 初始化向量数据库
        self.vector_store = self._initialize_vector_store()
        
        logger.info(f"Vector Store Manager initialized: {self.store_type}")
    
    def _initialize_vector_store(self):
        """初始化向量数据库"""
        if self.store_type == "chromadb":
            return self._initialize_chroma()
        elif self.store_type == "faiss":
            return self._initialize_faiss()
        elif self.store_type == "pinecone":
            return self._initialize_pinecone()
        elif self.store_type == "qdrant":
            return self._initialize_qdrant()
        else:
            raise ValueError(f"Unsupported vector store: {self.store_type}")
    
    def _initialize_chroma(self):
        """初始化ChromaDB"""
        persist_directory = self.kwargs.get("persist_directory", "./data/chroma")
        
        # 如果已有数据,加载现有集合
        try:
            vector_store = Chroma(
                collection_name=self.collection_name,
                embedding_function=self.embedding_manager.embeddings,
                persist_directory=persist_directory
            )
            logger.info(f"Loaded existing ChromaDB collection: {self.collection_name}")
        except:
            # 创建新集合
            vector_store = Chroma(
                collection_name=self.collection_name,
                embedding_function=self.embedding_manager.embeddings,
                persist_directory=persist_directory
            )
            logger.info(f"Created new ChromaDB collection: {self.collection_name}")
        
        return vector_store
    
    def _initialize_faiss(self):
        """初始化FAISS"""
        index_path = self.kwargs.get("index_path", "./data/faiss")
        
        # 尝试加载现有索引
        try:
            vector_store = FAISS.load_local(
                index_path,
                self.embedding_manager.embeddings
            )
            logger.info("Loaded existing FAISS index")
        except:
            # 创建新索引(需要先添加文档)
            vector_store = None
            logger.info("FAISS index will be created when documents are added")
        
        return vector_store
    
    def _initialize_pinecone(self):
        """初始化Pinecone"""
        import pinecone
        
        api_key = self.kwargs.get("api_key") or os.getenv("PINECONE_API_KEY")
        environment = self.kwargs.get("environment") or os.getenv("PINECONE_ENVIRONMENT")
        index_name = self.kwargs.get("index_name", self.collection_name)
        
        # 初始化Pinecone
        pinecone.init(api_key=api_key, environment=environment)
        
        # 检查索引是否存在
        if index_name not in pinecone.list_indexes():
            # 创建索引
            pinecone.create_index(
                name=index_name,
                dimension=self.embedding_manager.get_dimension(),
                metric="cosine"
            )
            logger.info(f"Created Pinecone index: {index_name}")
        
        # 连接到索引
        vector_store = Pinecone.from_existing_index(
            index_name=index_name,
            embedding=self.embedding_manager.embeddings
        )
        
        return vector_store
    
    def _initialize_qdrant(self):
        """初始化Qdrant"""
        from qdrant_client import QdrantClient
        
        url = self.kwargs.get("url") or os.getenv("QDRANT_URL", "http://localhost:6333")
        api_key = self.kwargs.get("api_key") or os.getenv("QDRANT_API_KEY")
        
        # 初始化客户端
        client = QdrantClient(url=url, api_key=api_key)
        
        # 创建或连接集合
        vector_store = Qdrant(
            client=client,
            collection_name=self.collection_name,
            embeddings=self.embedding_manager.embeddings
        )
        
        return vector_store
    
    def add_documents(
        self,
        documents: List[Document],
        **kwargs
    ) -> List[str]:
        """
        添加文档到向量数据库
        
        Args:
            documents: 文档列表
            **kwargs: 其他参数
        
        Returns:
            文档ID列表
        """
        if self.store_type == "faiss" and self.vector_store is None:
            # 为FAISS创建初始索引
            self.vector_store = FAISS.from_documents(
                documents,
                self.embedding_manager.embeddings
            )
            ids = [str(i) for i in range(len(documents))]
        else:
            ids = self.vector_store.add_documents(documents, **kwargs)
        
        logger.info(f"Added {len(documents)} documents to vector store")
        return ids
    
    def add_texts(
        self,
        texts: List[str],
        metadatas: Optional[List[Dict]] = None,
        **kwargs
    ) -> List[str]:
        """
        添加文本到向量数据库
        
        Args:
            texts: 文本列表
            metadatas: 元数据列表
            **kwargs: 其他参数
        
        Returns:
            文档ID列表
        """
        if self.store_type == "faiss" and self.vector_store is None:
            # 为FAISS创建初始索引
            self.vector_store = FAISS.from_texts(
                texts,
                self.embedding_manager.embeddings,
                metadatas=metadatas
            )
            ids = [str(i) for i in range(len(texts))]
        else:
            ids = self.vector_store.add_texts(texts, metadatas=metadatas, **kwargs)
        
        logger.info(f"Added {len(texts)} texts to vector store")
        return ids
    
    def similarity_search(
        self,
        query: str,
        k: int = 5,
        filter: Optional[Dict] = None,
        **kwargs
    ) -> List[Document]:
        """
        相似度搜索
        
        Args:
            query: 查询文本
            k: 返回结果数量
            filter: 过滤条件
            **kwargs: 其他参数
        
        Returns:
            相关文档列表
        """
        if self.vector_store is None:
            logger.warning("Vector store is empty")
            return []
        
        results = self.vector_store.similarity_search(
            query,
            k=k,
            filter=filter,
            **kwargs
        )
        
        return results
    
    def similarity_search_with_score(
        self,
        query: str,
        k: int = 5,
        filter: Optional[Dict] = None,
        **kwargs
    ) -> List[Tuple[Document, float]]:
        """
        带分数的相似度搜索
        
        Args:
            query: 查询文本
            k: 返回结果数量
            filter: 过滤条件
            **kwargs: 其他参数
        
        Returns:
            (文档, 分数) 元组列表
        """
        if self.vector_store is None:
            logger.warning("Vector store is empty")
            return []
        
        results = self.vector_store.similarity_search_with_score(
            query,
            k=k,
            filter=filter,
            **kwargs
        )
        
        return results
    
    def delete(self, ids: List[str]):
        """删除文档"""
        if hasattr(self.vector_store, 'delete'):
            self.vector_store.delete(ids)
            logger.info(f"Deleted {len(ids)} documents")
        else:
            logger.warning(f"Delete operation not supported for {self.store_type}")
    
    def persist(self):
        """持久化数据"""
        if self.store_type == "chromadb":
            self.vector_store.persist()
            logger.info("ChromaDB persisted")
        elif self.store_type == "faiss":
            index_path = self.kwargs.get("index_path", "./data/faiss")
            self.vector_store.save_local(index_path)
            logger.info(f"FAISS index saved to {index_path}")
        else:
            logger.info(f"Persistence handled automatically for {self.store_type}")
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        stats = {
            "store_type": self.store_type,
            "collection_name": self.collection_name
        }
        
        # 尝试获取文档数量
        try:
            if self.store_type == "chromadb":
                stats["document_count"] = self.vector_store._collection.count()
            elif hasattr(self.vector_store, 'index'):
                stats["document_count"] = self.vector_store.index.ntotal
        except:
            stats["document_count"] = "unknown"
        
        return stats
    
    def __repr__(self) -> str:
        return f"VectorStoreManager(type={self.store_type}, collection={self.collection_name})"

LLM Toolkit - 第四部分:文档加载器与RAG流水线


第四部分:文档加载器与RAG流水线

15. llm_toolkit/rag/document_loader.py - 文档加载器

# -*- coding: utf-8 -*-
"""
Document Loader - 文档加载器
支持多种文档格式的加载和处理
"""

import os
from typing import List, Dict, Optional, Any
from pathlib import Path
from loguru import logger

from langchain.schema import Document
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter
)

try:
    from langchain_community.document_loaders import (
        PyPDFLoader,
        TextLoader,
        UnstructuredWordDocumentLoader,
        UnstructuredPowerPointLoader,
        CSVLoader,
        UnstructuredExcelLoader,
        UnstructuredMarkdownLoader,
        DirectoryLoader
    )
except ImportError:
    logger.warning("Document loader libraries not fully installed")


class DocumentLoader:
    """
    文档加载器
    
    功能:
    - 支持多种文档格式
    - 智能文本分块
    - 元数据提取
    - 批量处理
    """
    
    # 支持的文件类型
    SUPPORTED_TYPES = {
        ".pdf": "pdf",
        ".txt": "text",
        ".md": "markdown",
        ".docx": "word",
        ".doc": "word",
        ".pptx": "powerpoint",
        ".ppt": "powerpoint",
        ".csv": "csv",
        ".xlsx": "excel",
        ".xls": "excel",
    }
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        text_splitter_type: str = "recursive",
        **kwargs
    ):
        """
        初始化文档加载器
        
        Args:
            chunk_size: 文本块大小
            chunk_overlap: 文本块重叠大小
            text_splitter_type: 分割器类型 (recursive, character, token)
            **kwargs: 其他参数
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter_type = text_splitter_type
        self.kwargs = kwargs
        
        # 初始化文本分割器
        self.text_splitter = self._initialize_text_splitter()
        
        logger.info(f"Document Loader initialized: chunk_size={chunk_size}, overlap={chunk_overlap}")
    
    def _initialize_text_splitter(self):
        """初始化文本分割器"""
        if self.text_splitter_type == "recursive":
            return RecursiveCharacterTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap,
                separators=["\n\n", "\n", " ", ""],
                **self.kwargs
            )
        elif self.text_splitter_type == "character":
            return CharacterTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap,
                **self.kwargs
            )
        elif self.text_splitter_type == "token":
            return TokenTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap,
                **self.kwargs
            )
        else:
            raise ValueError(f"Unknown text splitter type: {self.text_splitter_type}")
    
    def load_file(
        self,
        file_path: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> List[Document]:
        """
        加载单个文件
        
        Args:
            file_path: 文件路径
            metadata: 额外的元数据
        
        Returns:
            文档列表
        """
        file_path = Path(file_path)
        
        if not file_path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")
        
        # 获取文件类型
        file_ext = file_path.suffix.lower()
        if file_ext not in self.SUPPORTED_TYPES:
            raise ValueError(f"Unsupported file type: {file_ext}")
        
        file_type = self.SUPPORTED_TYPES[file_ext]
        
        # 加载文档
        try:
            documents = self._load_by_type(str(file_path), file_type)
            
            # 添加元数据
            for doc in documents:
                doc.metadata["source"] = str(file_path)
                doc.metadata["file_type"] = file_type
                if metadata:
                    doc.metadata.update(metadata)
            
            logger.info(f"Loaded {len(documents)} documents from {file_path}")
            return documents
        
        except Exception as e:
            logger.error(f"Failed to load {file_path}: {e}")
            raise
    
    def _load_by_type(self, file_path: str, file_type: str) -> List[Document]:
        """根据文件类型加载文档"""
        if file_type == "pdf":
            loader = PyPDFLoader(file_path)
        elif file_type == "text":
            loader = TextLoader(file_path, encoding="utf-8")
        elif file_type == "markdown":
            loader = UnstructuredMarkdownLoader(file_path)
        elif file_type == "word":
            loader = UnstructuredWordDocumentLoader(file_path)
        elif file_type == "powerpoint":
            loader = UnstructuredPowerPointLoader(file_path)
        elif file_type == "csv":
            loader = CSVLoader(file_path)
        elif file_type == "excel":
            loader = UnstructuredExcelLoader(file_path)
        else:
            raise ValueError(f"Unsupported file type: {file_type}")
        
        documents = loader.load()
        
        # 分割文本
        if self.text_splitter:
            documents = self.text_splitter.split_documents(documents)
        
        return documents
    
    def load_directory(
        self,
        directory_path: str,
        file_types: Optional[List[str]] = None,
        recursive: bool = True,
        metadata: Optional[Dict[str, Any]] = None
    ) -> List[Document]:
        """
        加载目录中的所有文档
        
        Args:
            directory_path: 目录路径
            file_types: 文件类型列表 (如 ['pdf', 'txt'])
            recursive: 是否递归加载子目录
            metadata: 额外的元数据
        
        Returns:
            文档列表
        """
        directory_path = Path(directory_path)
        
        if not directory_path.exists():
            raise FileNotFoundError(f"Directory not found: {directory_path}")
        
        all_documents = []
        
        # 确定要加载的文件扩展名
        if file_types:
            extensions = [ext for ext, ftype in self.SUPPORTED_TYPES.items() if ftype in file_types]
        else:
            extensions = list(self.SUPPORTED_TYPES.keys())
        
        # 遍历文件
        pattern = "**/*" if recursive else "*"
        for ext in extensions:
            for file_path in directory_path.glob(f"{pattern}{ext}"):
                if file_path.is_file():
                    try:
                        documents = self.load_file(str(file_path), metadata)
                        all_documents.extend(documents)
                    except Exception as e:
                        logger.error(f"Failed to load {file_path}: {e}")
        
        logger.info(f"Loaded {len(all_documents)} documents from {directory_path}")
        return all_documents
    
    def load_text(
        self,
        text: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> List[Document]:
        """
        加载纯文本
        
        Args:
            text: 文本内容
            metadata: 元数据
        
        Returns:
            文档列表
        """
        # 创建文档
        document = Document(
            page_content=text,
            metadata=metadata or {}
        )
        
        # 分割文本
        if self.text_splitter:
            documents = self.text_splitter.split_documents([document])
        else:
            documents = [document]
        
        return documents
    
    def load_texts(
        self,
        texts: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None
    ) -> List[Document]:
        """
        批量加载文本
        
        Args:
            texts: 文本列表
            metadatas: 元数据列表
        
        Returns:
            文档列表
        """
        all_documents = []
        
        for i, text in enumerate(texts):
            metadata = metadatas[i] if metadatas and i < len(metadatas) else {}
            documents = self.load_text(text, metadata)
            all_documents.extend(documents)
        
        return all_documents
    
    def load_url(
        self,
        url: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> List[Document]:
        """
        从URL加载内容
        
        Args:
            url: URL地址
            metadata: 元数据
        
        Returns:
            文档列表
        """
        try:
            from langchain_community.document_loaders import WebBaseLoader
            
            loader = WebBaseLoader(url)
            documents = loader.load()
            
            # 分割文本
            if self.text_splitter:
                documents = self.text_splitter.split_documents(documents)
            
            # 添加元数据
            for doc in documents:
                doc.metadata["source"] = url
                if metadata:
                    doc.metadata.update(metadata)
            
            logger.info(f"Loaded {len(documents)} documents from {url}")
            return documents
        
        except Exception as e:
            logger.error(f"Failed to load URL {url}: {e}")
            raise
    
    def get_supported_types(self) -> List[str]:
        """获取支持的文件类型"""
        return list(set(self.SUPPORTED_TYPES.values()))
    
    def __repr__(self) -> str:
        return f"DocumentLoader(chunk_size={self.chunk_size}, splitter={self.text_splitter_type})"


class AdvancedDocumentLoader(DocumentLoader):
    """
    高级文档加载器
    
    额外功能:
    - OCR支持
    - 表格提取
    - 图片处理
    - 自定义解析器
    """
    
    def __init__(
        self,
        enable_ocr: bool = False,
        extract_tables: bool = False,
        extract_images: bool = False,
        **kwargs
    ):
        """
        初始化高级文档加载器
        
        Args:
            enable_ocr: 启用OCR
            extract_tables: 提取表格
            extract_images: 提取图片
            **kwargs: 其他参数
        """
        super().__init__(**kwargs)
        
        self.enable_ocr = enable_ocr
        self.extract_tables = extract_tables
        self.extract_images = extract_images
        
        if enable_ocr:
            self._initialize_ocr()
    
    def _initialize_ocr(self):
        """初始化OCR"""
        try:
            import pytesseract
            from PIL import Image
            self.ocr_available = True
            logger.info("OCR initialized")
        except ImportError:
            logger.warning("OCR libraries not installed. Install pytesseract and Pillow.")
            self.ocr_available = False
    
    def load_image(
        self,
        image_path: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> List[Document]:
        """
        从图片中提取文本
        
        Args:
            image_path: 图片路径
            metadata: 元数据
        
        Returns:
            文档列表
        """
        if not self.enable_ocr or not self.ocr_available:
            raise RuntimeError("OCR is not enabled or available")
        
        try:
            import pytesseract
            from PIL import Image
            
            # 打开图片
            image = Image.open(image_path)
            
            # OCR识别
            text = pytesseract.image_to_string(image)
            
            # 创建文档
            document = Document(
                page_content=text,
                metadata={
                    "source": image_path,
                    "file_type": "image",
                    **(metadata or {})
                }
            )
            
            # 分割文本
            if self.text_splitter:
                documents = self.text_splitter.split_documents([document])
            else:
                documents = [document]
            
            logger.info(f"Extracted text from image: {image_path}")
            return documents
        
        except Exception as e:
            logger.error(f"Failed to process image {image_path}: {e}")
            raise

16. llm_toolkit/rag/retriever.py - RAG检索器

# -*- coding: utf-8 -*-
"""
RAG Retriever - RAG检索器
实现高级检索策略
"""

from typing import List, Dict, Optional, Any, Tuple
from loguru import logger

from langchain.schema import Document
from langchain.retrievers import (
    ContextualCompressionRetriever,
    MultiQueryRetriever
)
from langchain.retrievers.document_compressors import LLMChainExtractor


class RAGRetriever:
    """
    RAG检索器
    
    功能:
    - 基础相似度检索
    - 多查询检索
    - 上下文压缩
    - 重排序
    - 混合检索
    """
    
    def __init__(
        self,
        vector_store_manager: Any,
        llm_manager: Optional[Any] = None,
        top_k: int = 5,
        similarity_threshold: float = 0.7,
        enable_rerank: bool = False,
        enable_compression: bool = False,
        **kwargs
    ):
        """
        初始化RAG检索器
        
        Args:
            vector_store_manager: 向量数据库管理器
            llm_manager: LLM管理器 (用于高级检索)
            top_k: 返回结果数量
            similarity_threshold: 相似度阈值
            enable_rerank: 启用重排序
            enable_compression: 启用上下文压缩
            **kwargs: 其他参数
        """
        self.vector_store_manager = vector_store_manager
        self.llm_manager = llm_manager
        self.top_k = top_k
        self.similarity_threshold = similarity_threshold
        self.enable_rerank = enable_rerank
        self.enable_compression = enable_compression
        self.kwargs = kwargs
        
        logger.info(f"RAG Retriever initialized: top_k={top_k}, threshold={similarity_threshold}")
    
    def retrieve(
        self,
        query: str,
        top_k: Optional[int] = None,
        filter: Optional[Dict] = None,
        **kwargs
    ) -> List[Document]:
        """
        基础检索
        
        Args:
            query: 查询文本
            top_k: 返回结果数量
            filter: 过滤条件
            **kwargs: 其他参数
        
        Returns:
            相关文档列表
        """
        k = top_k or self.top_k
        
        # 相似度搜索
        results = self.vector_store_manager.similarity_search_with_score(
            query=query,
            k=k * 2,  # 获取更多结果用于过滤
            filter=filter,
            **kwargs
        )
        
        # 过滤低相似度结果
        filtered_results = [
            (doc, score) for doc, score in results
            if score >= self.similarity_threshold
        ]
        
        # 限制结果数量
        filtered_results = filtered_results[:k]
        
        # 提取文档
        documents = [doc for doc, score in filtered_results]
        
        # 重排序
        if self.enable_rerank and self.llm_manager:
            documents = self._rerank_documents(query, documents)
        
        return documents
    
    def retrieve_with_scores(
        self,
        query: str,
        top_k: Optional[int] = None,
        filter: Optional[Dict] = None,
        **kwargs
    ) -> List[Tuple[Document, float]]:
        """
        带分数的检索
        
        Args:
            query: 查询文本
            top_k: 返回结果数量
            filter: 过滤条件
            **kwargs: 其他参数
        
        Returns:
            (文档, 分数) 元组列表
        """
        k = top_k or self.top_k
        
        results = self.vector_store_manager.similarity_search_with_score(
            query=query,
            k=k * 2,
            filter=filter,
            **kwargs
        )
        
        # 过滤
        filtered_results = [
            (doc, score) for doc, score in results
            if score >= self.similarity_threshold
        ][:k]
        
        return filtered_results
    
    def multi_query_retrieve(
        self,
        query: str,
        num_queries: int = 3,
        top_k: Optional[int] = None,
        **kwargs
    ) -> List[Document]:
        """
        多查询检索
        
        Args:
            query: 原始查询
            num_queries: 生成的查询数量
            top_k: 返回结果数量
            **kwargs: 其他参数
        
        Returns:
            相关文档列表
        """
        if not self.llm_manager:
            logger.warning("LLM manager not available, falling back to basic retrieval")
            return self.retrieve(query, top_k, **kwargs)
        
        # 生成多个查询变体
        queries = self._generate_query_variants(query, num_queries)
        
        # 对每个查询进行检索
        all_documents = []
        seen_contents = set()
        
        for q in queries:
            docs = self.retrieve(q, top_k, **kwargs)
            
            # 去重
            for doc in docs:
                if doc.page_content not in seen_contents:
                    all_documents.append(doc)
                    seen_contents.add(doc.page_content)
        
        # 限制结果数量
        k = top_k or self.top_k
        return all_documents[:k]
    
    def _generate_query_variants(
        self,
        query: str,
        num_variants: int = 3
    ) -> List[str]:
        """生成查询变体"""
        prompt = f"""给定以下问题,生成{num_variants}个不同的查询变体,用于检索相关文档。
每个变体应该从不同角度表达相同的信息需求。

原始问题: {query}

请只返回{num_variants}个查询变体,每行一个:"""
        
        try:
            response = self.llm_manager.chat(prompt)
            variants = [line.strip() for line in response.strip().split('\n') if line.strip()]
            
            # 确保包含原始查询
            if query not in variants:
                variants = [query] + variants
            
            return variants[:num_variants + 1]
        
        except Exception as e:
            logger.error(f"Failed to generate query variants: {e}")
            return [query]
    
    def contextual_compression_retrieve(
        self,
        query: str,
        top_k: Optional[int] = None,
        **kwargs
    ) -> List[Document]:
        """
        上下文压缩检索
        
        Args:
            query: 查询文本
            top_k: 返回结果数量
            **kwargs: 其他参数
        
        Returns:
            压缩后的相关文档列表
        """
        if not self.llm_manager:
            logger.warning("LLM manager not available, falling back to basic retrieval")
            return self.retrieve(query, top_k, **kwargs)
        
        # 基础检索
        documents = self.retrieve(query, top_k=(top_k or self.top_k) * 2, **kwargs)
        
        # 压缩上下文
        compressed_docs = self._compress_documents(query, documents)
        
        k = top_k or self.top_k
        return compressed_docs[:k]
    
    def _compress_documents(
        self,
        query: str,
        documents: List[Document]
    ) -> List[Document]:
        """压缩文档内容"""
        compressed_docs = []
        
        for doc in documents:
            prompt = f"""给定以下问题和文档内容,提取与问题最相关的部分。
保留关键信息,去除无关内容。

问题: {query}

文档内容:
{doc.page_content}

相关内容:"""
            
            try:
                compressed_content = self.llm_manager.chat(prompt)
                
                compressed_doc = Document(
                    page_content=compressed_content,
                    metadata={
                        **doc.metadata,
                        "compressed": True,
                        "original_length": len(doc.page_content)
                    }
                )
                
                compressed_docs.append(compressed_doc)
            
            except Exception as e:
                logger.error(f"Failed to compress document: {e}")
                compressed_docs.append(doc)
        
        return compressed_docs
    
    def _rerank_documents(
        self,
        query: str,
        documents: List[Document]
    ) -> List[Document]:
        """重排序文档"""
        if not documents:
            return documents
        
        # 为每个文档计算相关性分数
        scored_docs = []
        
        for doc in documents:
            prompt = f"""评估以下文档与问题的相关性,给出0-10的分数。
只返回数字分数。

问题: {query}

文档: {doc.page_content[:500]}...

相关性分数:"""
            
            try:
                response = self.llm_manager.chat(prompt)
                score = float(response.strip())
                scored_docs.append((doc, score))
            except:
                scored_docs.append((doc, 5.0))  # 默认分数
        
        # 按分数排序
        scored_docs.sort(key=lambda x: x[1], reverse=True)
        
        return [doc for doc, score in scored_docs]
    
    def hybrid_retrieve(
        self,
        query: str,
        top_k: Optional[int] = None,
        use_multi_query: bool = True,
        use_compression: bool = True,
        **kwargs
    ) -> List[Document]:
        """
        混合检索策略
        
        Args:
            query: 查询文本
            top_k: 返回结果数量
            use_multi_query: 使用多查询
            use_compression: 使用压缩
            **kwargs: 其他参数
        
        Returns:
            相关文档列表
        """
        # 多查询检索
        if use_multi_query and self.llm_manager:
            documents = self.multi_query_retrieve(query, top_k=top_k, **kwargs)
        else:
            documents = self.retrieve(query, top_k=top_k, **kwargs)
        
        # 上下文压缩
        if use_compression and self.llm_manager:
            documents = self._compress_documents(query, documents)
        
        return documents
    
    def __repr__(self) -> str:
        return f"RAGRetriever(top_k={self.top_k}, threshold={self.similarity_threshold})"

17. llm_toolkit/rag/pipeline.py - RAG完整流水线

# -*- coding: utf-8 -*-
"""
RAG Pipeline - RAG完整流水线
整合所有RAG组件的端到端流水线
"""

from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from loguru import logger

from langchain.schema import Document

from llm_toolkit.core.llm_manager import LLMManager
from llm_toolkit.rag.embeddings import EmbeddingManager
from llm_toolkit.rag.vector_store import VectorStoreManager
from llm_toolkit.rag.document_loader import DocumentLoader
from llm_toolkit.rag.retriever import RAGRetriever


@dataclass
class RAGResponse:
    """RAG响应数据类"""
    answer: str
    sources: List[Document]
    query: str
    metadata: Dict[str, Any]


class RAGPipeline:
    """
    RAG完整流水线
    
    功能:
    - 端到端RAG流程
    - 文档管理
    - 智能检索
    - 答案生成
    - 来源追踪
    """
    
    def __init__(
        self,
        llm: Optional[LLMManager] = None,
        embedding_provider: str = "openai",
        embedding_model: str = "text-embedding-3-small",
        vector_store: str = "chromadb",
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        top_k: int = 5,
        **kwargs
    ):
        """
        初始化RAG流水线
        
        Args:
            llm: LLM管理器
            embedding_provider: 嵌入提供商
            embedding_model: 嵌入模型
            vector_store: 向量数据库类型
            chunk_size: 文本块大小
            chunk_overlap: 文本块重叠
            top_k: 检索结果数量
            **kwargs: 其他参数
        """
        # 初始化LLM
        if llm is None:
            self.llm = LLMManager()
        else:
            self.llm = llm
        
        # 初始化嵌入管理器
        self.embedding_manager = EmbeddingManager(
            provider=embedding_provider,
            model=embedding_model
        )
        
        # 初始化向量数据库
        self.vector_store_manager = VectorStoreManager(
            store_type=vector_store,
            embedding_manager=self.embedding_manager,
            **kwargs
        )
        
        # 初始化文档加载器
        self.document_loader = DocumentLoader(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        
        # 初始化检索器
        self.retriever = RAGRetriever(
            vector_store_manager=self.vector_store_manager,
            llm_manager=self.llm,
            top_k=top_k
        )
        
        logger.info("RAG Pipeline initialized successfully")
    
    def add_documents(
        self,
        source: str,
        file_types: Optional[List[str]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> int:
        """
        添加文档到知识库
        
        Args:
            source: 文件路径或目录路径
            file_types: 文件类型列表
            metadata: 元数据
            **kwargs: 其他参数
        
        Returns:
            添加的文档数量
        """
        import os
        
        # 判断是文件还是目录
        if os.path.isfile(source):
            documents = self.document_loader.load_file(source, metadata)
        elif os.path.isdir(source):
            documents = self.document_loader.load_directory(
                source,
                file_types=file_types,
                metadata=metadata,
                **kwargs
            )
        else:
            raise ValueError(f"Invalid source: {source}")
        
        # 添加到向量数据库
        self.vector_store_manager.add_documents(documents)
        
        # 持久化
        self.vector_store_manager.persist()
        
        logger.info(f"Added {len(documents)} documents to knowledge base")
        return len(documents)
    
    def add_texts(
        self,
        texts: List[str],
        metadatas: Optional[List[Dict[str, Any]]] = None,
        **kwargs
    ) -> int:
        """
        添加文本到知识库
        
        Args:
            texts: 文本列表
            metadatas: 元数据列表
            **kwargs: 其他参数
        
        Returns:
            添加的文档数量
        """
        documents = self.document_loader.load_texts(texts, metadatas)
        self.vector_store_manager.add_documents(documents)
        self.vector_store_manager.persist()
        
        logger.info(f"Added {len(documents)} text documents to knowledge base")
        return len(documents)
    
    def add_url(
        self,
        url: str,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> int:
        """
        从URL添加内容
        
        Args:
            url: URL地址
            metadata: 元数据
            **kwargs: 其他参数
        
        Returns:
            添加的文档数量
        """
        documents = self.document_loader.load_url(url, metadata)
        self.vector_store_manager.add_documents(documents)
        self.vector_store_manager.persist()
        
        logger.info(f"Added {len(documents)} documents from URL: {url}")
        return len(documents)
    
    def query(
        self,
        question: str,
        top_k: Optional[int] = None,
        return_sources: bool = True,
        use_hybrid: bool = False,
        **kwargs
    ) -> RAGResponse:
        """
        查询知识库
        
        Args:
            question: 问题
            top_k: 检索结果数量
            return_sources: 是否返回来源
            use_hybrid: 使用混合检索
            **kwargs: 其他参数
        
        Returns:
            RAGResponse对象
        """
        # 检索相关文档
        if use_hybrid:
            sources = self.retriever.hybrid_retrieve(question, top_k=top_k, **kwargs)
        else:
            sources = self.retriever.retrieve(question, top_k=top_k, **kwargs)
        
        if not sources:
            return RAGResponse(
                answer="抱歉,我在知识库中没有找到相关信息。",
                sources=[],
                query=question,
                metadata={"found_sources": False}
            )
        
        # 构建上下文
        context = self._build_context(sources)
        
        # 生成答案
        answer = self._generate_answer(question, context)
        
        return RAGResponse(
            answer=answer,
            sources=sources if return_sources else [],
            query=question,
            metadata={
                "found_sources": True,
                "num_sources": len(sources),
                "context_length": len(context)
            }
        )
    
    def _build_context(self, documents: List[Document]) -> str:
        """构建上下文"""
        context_parts = []
        
        for i, doc in enumerate(documents, 1):
            source = doc.metadata.get("source", "Unknown")
            content = doc.page_content
            
            context_parts.append(f"[来源 {i}: {source}]\n{content}")
        
        return "\n\n".join(context_parts)
    
    def _generate_answer(self, question: str, context: str) -> str:
        """生成答案"""
        prompt = f"""基于以下上下文回答问题。如果上下文中没有相关信息,请明确说明。

上下文:
{context}

问题: {question}

请提供详细且准确的答案,并在适当时引用来源编号:"""
        
        answer = self.llm.chat(prompt)
        return answer
    
    def stream_query(
        self,
        question: str,
        top_k: Optional[int] = None,
        **kwargs
    ):
        """
        流式查询
        
        Args:
            question: 问题
            top_k: 检索结果数量
            **kwargs: 其他参数
        
        Yields:
            答案片段
        """
        # 检索
        sources = self.retriever.retrieve(question, top_k=top_k, **kwargs)
        
        if not sources:
            yield "抱歉,我在知识库中没有找到相关信息。"
            return
        
        # 构建上下文
        context = self._build_context(sources)
        
        # 生成答案
        prompt = f"""基于以下上下文回答问题。

上下文:
{context}

问题: {question}

答案:"""
        
        for chunk in self.llm.stream(prompt):
            yield chunk
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "vector_store": self.vector_store_manager.get_stats(),
            "embedding": {
                "provider": self.embedding_manager.provider,
                "model": self.embedding_manager.model,
                "dimension": self.embedding_manager.get_dimension()
            },
            "retriever": {
                "top_k": self.retriever.top_k,
                "threshold": self.retriever.similarity_threshold
            }
        }
    
    def clear_knowledge_base(self):
        """清空知识库"""
        # 这个功能取决于向量数据库的实现
        logger.warning("Clear knowledge base - implementation depends on vector store")
    
    def __repr__(self) -> str:
        return f"RAGPipeline(llm={self.llm.model}, vector_store={self.vector_store_manager.store_type})"

LLM Toolkit - 第五部分:工具函数、UI界面与示例


第五部分:工具函数模块

18. llm_toolkit/utils/__init__.py

# -*- coding: utf-8 -*-
"""
Utils Module - 工具函数模块
"""

from llm_toolkit.utils.text_splitter import SmartTextSplitter
from llm_toolkit.utils.validators import InputValidator
from llm_toolkit.utils.formatters import ResponseFormatter

__all__ = [
    "SmartTextSplitter",
    "InputValidator",
    "ResponseFormatter",
]

19. llm_toolkit/utils/text_splitter.py - 智能文本分割器

# -*- coding: utf-8 -*-
"""
Smart Text Splitter - 智能文本分割器
提供多种文本分割策略
"""

import re
from typing import List, Optional
from loguru import logger


class SmartTextSplitter:
    """
    智能文本分割器
    
    功能:
    - 按语义分割
    - 保留上下文
    - 智能边界检测
    """
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        separators: Optional[List[str]] = None
    ):
        """
        初始化文本分割器
        
        Args:
            chunk_size: 块大小
            chunk_overlap: 重叠大小
            separators: 分隔符列表
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " "]
    
    def split_text(self, text: str) -> List[str]:
        """
        分割文本
        
        Args:
            text: 输入文本
        
        Returns:
            文本块列表
        """
        if len(text) <= self.chunk_size:
            return [text]
        
        chunks = []
        current_chunk = ""
        
        # 按分隔符分割
        parts = self._split_by_separators(text)
        
        for part in parts:
            # 如果添加这部分不会超过限制
            if len(current_chunk) + len(part) <= self.chunk_size:
                current_chunk += part
            else:
                # 保存当前块
                if current_chunk:
                    chunks.append(current_chunk.strip())
                
                # 开始新块,包含重叠部分
                if self.chunk_overlap > 0 and current_chunk:
                    overlap = current_chunk[-self.chunk_overlap:]
                    current_chunk = overlap + part
                else:
                    current_chunk = part
        
        # 添加最后一块
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def _split_by_separators(self, text: str) -> List[str]:
        """按分隔符分割文本"""
        parts = [text]
        
        for separator in self.separators:
            new_parts = []
            for part in parts:
                if separator in part:
                    split_parts = part.split(separator)
                    # 保留分隔符
                    for i, sp in enumerate(split_parts[:-1]):
                        new_parts.append(sp + separator)
                    new_parts.append(split_parts[-1])
                else:
                    new_parts.append(part)
            parts = new_parts
        
        return [p for p in parts if p.strip()]
    
    def split_by_sentences(self, text: str) -> List[str]:
        """按句子分割"""
        # 中英文句子分割
        sentences = re.split(r'([。!?.!?]+)', text)
        
        result = []
        for i in range(0, len(sentences) - 1, 2):
            sentence = sentences[i] + sentences[i + 1]
            if sentence.strip():
                result.append(sentence.strip())
        
        # 处理最后一个句子
        if len(sentences) % 2 == 1 and sentences[-1].strip():
            result.append(sentences[-1].strip())
        
        return result
    
    def split_by_paragraphs(self, text: str) -> List[str]:
        """按段落分割"""
        paragraphs = text.split('\n\n')
        return [p.strip() for p in paragraphs if p.strip()]


class SemanticTextSplitter(SmartTextSplitter):
    """
    语义文本分割器
    
    基于语义相似度进行分割
    """
    
    def __init__(
        self,
        embedding_manager: Optional[Any] = None,
        similarity_threshold: float = 0.8,
        **kwargs
    ):
        """
        初始化语义分割器
        
        Args:
            embedding_manager: 嵌入管理器
            similarity_threshold: 相似度阈值
            **kwargs: 其他参数
        """
        super().__init__(**kwargs)
        self.embedding_manager = embedding_manager
        self.similarity_threshold = similarity_threshold
    
    def split_text(self, text: str) -> List[str]:
        """基于语义分割文本"""
        if not self.embedding_manager:
            logger.warning("Embedding manager not available, using basic splitting")
            return super().split_text(text)
        
        # 先按句子分割
        sentences = self.split_by_sentences(text)
        
        if len(sentences) <= 1:
            return sentences
        
        # 计算句子嵌入
        embeddings = self.embedding_manager.embed_texts(sentences)
        
        # 基于相似度分组
        chunks = []
        current_chunk = [sentences[0]]
        current_embedding = embeddings[0]
        
        for i in range(1, len(sentences)):
            similarity = self.embedding_manager.calculate_similarity(
                current_embedding,
                embeddings[i]
            )
            
            # 如果相似度高且不超过大小限制,添加到当前块
            chunk_text = ' '.join(current_chunk + [sentences[i]])
            if similarity >= self.similarity_threshold and len(chunk_text) <= self.chunk_size:
                current_chunk.append(sentences[i])
            else:
                # 保存当前块,开始新块
                chunks.append(' '.join(current_chunk))
                current_chunk = [sentences[i]]
                current_embedding = embeddings[i]
        
        # 添加最后一块
        if current_chunk:
            chunks.append(' '.join(current_chunk))
        
        return chunks

20. llm_toolkit/utils/validators.py - 输入验证器

# -*- coding: utf-8 -*-
"""
Input Validator - 输入验证器
验证和清理用户输入
"""

import re
from typing import Optional, List
from loguru import logger


class InputValidator:
    """
    输入验证器
    
    功能:
    - 输入长度验证
    - 内容安全检查
    - 格式验证
    - 敏感词过滤
    """
    
    def __init__(
        self,
        max_length: int = 10000,
        min_length: int = 1,
        blocked_patterns: Optional[List[str]] = None,
        enable_profanity_filter: bool = False
    ):
        """
        初始化验证器
        
        Args:
            max_length: 最大长度
            min_length: 最小长度
            blocked_patterns: 禁止的模式列表
            enable_profanity_filter: 启用脏话过滤
        """
        self.max_length = max_length
        self.min_length = min_length
        self.blocked_patterns = blocked_patterns or []
        self.enable_profanity_filter = enable_profanity_filter
        
        # 默认敏感词列表(示例)
        self.profanity_list = self._load_profanity_list()
    
    def _load_profanity_list(self) -> List[str]:
        """加载敏感词列表"""
        # 这里应该从文件或数据库加载
        return []
    
    def validate(self, text: str) -> tuple[bool, Optional[str]]:
        """
        验证输入
        
        Args:
            text: 输入文本
        
        Returns:
            (是否有效, 错误信息)
        """
        # 长度检查
        if len(text) < self.min_length:
            return False, f"输入太短,最少需要{self.min_length}个字符"
        
        if len(text) > self.max_length:
            return False, f"输入太长,最多允许{self.max_length}个字符"
        
        # 空白检查
        if not text.strip():
            return False, "输入不能为空"
        
        # 禁止模式检查
        for pattern in self.blocked_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                return False, f"输入包含禁止的内容"
        
        # 脏话过滤
        if self.enable_profanity_filter:
            if self._contains_profanity(text):
                return False, "输入包含不当内容"
        
        return True, None
    
    def _contains_profanity(self, text: str) -> bool:
        """检查是否包含敏感词"""
        text_lower = text.lower()
        for word in self.profanity_list:
            if word.lower() in text_lower:
                return True
        return False
    
    def sanitize(self, text: str) -> str:
        """
        清理输入
        
        Args:
            text: 输入文本
        
        Returns:
            清理后的文本
        """
        # 移除多余空白
        text = re.sub(r'\s+', ' ', text)
        
        # 移除特殊字符(保留基本标点)
        text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:,。!?;:]', '', text)
        
        # 截断过长内容
        if len(text) > self.max_length:
            text = text[:self.max_length]
        
        return text.strip()
    
    def validate_email(self, email: str) -> bool:
        """验证邮箱格式"""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    def validate_url(self, url: str) -> bool:
        """验证URL格式"""
        pattern = r'^https?://[^\s/$.?#].[^\s]*$'
        return bool(re.match(pattern, url))


class PromptInjectionDetector:
    """
    Prompt注入检测器
    
    检测和防止Prompt注入攻击
    """
    
    # 常见的注入模式
    INJECTION_PATTERNS = [
        r'ignore\s+previous\s+instructions',
        r'disregard\s+all\s+previous',
        r'forget\s+everything',
        r'you\s+are\s+now',
        r'new\s+instructions',
        r'system\s*:\s*',
        r'admin\s+mode',
        r'developer\s+mode',
    ]
    
    def __init__(self, sensitivity: float = 0.7):
        """
        初始化检测器
        
        Args:
            sensitivity: 敏感度 (0-1)
        """
        self.sensitivity = sensitivity
    
    def detect(self, text: str) -> tuple[bool, float, List[str]]:
        """
        检测注入攻击
        
        Args:
            text: 输入文本
        
        Returns:
            (是否检测到, 置信度, 匹配的模式)
        """
        text_lower = text.lower()
        matches = []
        
        for pattern in self.INJECTION_PATTERNS:
            if re.search(pattern, text_lower):
                matches.append(pattern)
        
        confidence = len(matches) / len(self.INJECTION_PATTERNS)
        is_injection = confidence >= self.sensitivity
        
        if is_injection:
            logger.warning(f"Potential prompt injection detected: {matches}")
        
        return is_injection, confidence, matches

21. llm_toolkit/utils/formatters.py - 响应格式化器

# -*- coding: utf-8 -*-
"""
Response Formatter - 响应格式化器
格式化LLM响应
"""

import re
import json
from typing import Any, Dict, List, Optional
from datetime import datetime


class ResponseFormatter:
    """
    响应格式化器
    
    功能:
    - Markdown格式化
    - JSON格式化
    - 代码高亮
    - 引用格式化
    """
    
    @staticmethod
    def format_markdown(text: str) -> str:
        """格式化为Markdown"""
        # 确保代码块正确格式化
        text = re.sub(r'```(\w+)?\n', r'```\1\n', text)
        
        # 确保列表格式正确
        text = re.sub(r'^(\d+\.|\*|-)\s+', r'\1 ', text, flags=re.MULTILINE)
        
        return text
    
    @staticmethod
    def extract_code_blocks(text: str) -> List[Dict[str, str]]:
        """提取代码块"""
        pattern = r'```(\w+)?\n(.*?)```'
        matches = re.findall(pattern, text, re.DOTALL)
        
        code_blocks = []
        for language, code in matches:
            code_blocks.append({
                "language": language or "text",
                "code": code.strip()
            })
        
        return code_blocks
    
    @staticmethod
    def format_json(data: Any, indent: int = 2) -> str:
        """格式化JSON"""
        return json.dumps(data, indent=indent, ensure_ascii=False)
    
    @staticmethod
    def format_list(items: List[str], style: str = "bullet") -> str:
        """
        格式化列表
        
        Args:
            items: 列表项
            style: 样式 (bullet, numbered)
        
        Returns:
            格式化后的列表
        """
        if style == "bullet":
            return "\n".join(f"• {item}" for item in items)
        elif style == "numbered":
            return "\n".join(f"{i+1}. {item}" for i, item in enumerate(items))
        else:
            return "\n".join(items)
    
    @staticmethod
    def format_table(data: List[Dict[str, Any]]) -> str:
        """格式化为Markdown表格"""
        if not data:
            return ""
        
        # 获取列名
        columns = list(data[0].keys())
        
        # 表头
        header = "| " + " | ".join(columns) + " |"
        separator = "| " + " | ".join(["---"] * len(columns)) + " |"
        
        # 数据行
        rows = []
        for row in data:
            row_str = "| " + " | ".join(str(row.get(col, "")) for col in columns) + " |"
            rows.append(row_str)
        
        return "\n".join([header, separator] + rows)
    
    @staticmethod
    def add_citations(text: str, sources: List[str]) -> str:
        """添加引用"""
        citation_text = text + "\n\n**参考来源:**\n"
        for i, source in enumerate(sources, 1):
            citation_text += f"{i}. {source}\n"
        
        return citation_text
    
    @staticmethod
    def truncate(text: str, max_length: int = 100, suffix: str = "...") -> str:
        """截断文本"""
        if len(text) <= max_length:
            return text
        
        return text[:max_length - len(suffix)] + suffix
    
    @staticmethod
    def highlight_keywords(text: str, keywords: List[str]) -> str:
        """高亮关键词(Markdown格式)"""
        for keyword in keywords:
            pattern = re.compile(re.escape(keyword), re.IGNORECASE)
            text = pattern.sub(f"**{keyword}**", text)
        
        return text


class ConversationFormatter:
    """对话格式化器"""
    
    @staticmethod
    def format_conversation(messages: List[Dict[str, str]]) -> str:
        """格式化对话历史"""
        formatted = []
        
        for msg in messages:
            role = msg.get("role", "unknown")
            content = msg.get("content", "")
            timestamp = msg.get("timestamp", "")
            
            if role == "user":
                formatted.append(f"👤 **用户** ({timestamp}):\n{content}\n")
            elif role == "assistant":
                formatted.append(f"🤖 **助手** ({timestamp}):\n{content}\n")
            elif role == "system":
                formatted.append(f"⚙️ **系统** ({timestamp}):\n{content}\n")
        
        return "\n".join(formatted)
    
    @staticmethod
    def export_conversation(
        messages: List[Dict[str, str]],
        format: str = "markdown"
    ) -> str:
        """
        导出对话
        
        Args:
            messages: 消息列表
            format: 格式 (markdown, json, text)
        
        Returns:
            导出内容
        """
        if format == "json":
            return json.dumps(messages, indent=2, ensure_ascii=False)
        
        elif format == "markdown":
            header = f"# 对话记录\n\n生成时间: {datetime.now().isoformat()}\n\n---\n\n"
            content = ConversationFormatter.format_conversation(messages)
            return header + content
        
        elif format == "text":
            lines = []
            for msg in messages:
                role = msg.get("role", "unknown").upper()
                content = msg.get("content", "")
                lines.append(f"{role}: {content}\n")
            return "\n".join(lines)
        
        else:
            raise ValueError(f"Unsupported format: {format}")

第六部分:Streamlit UI界面

22. llm_toolkit/ui/streamlit_app.py - Streamlit应用

# -*- coding: utf-8 -*-
"""
Streamlit UI - LLM Toolkit Web界面
"""

import streamlit as st
import os
from pathlib import Path
import json
from datetime import datetime

# 设置页面配置
st.set_page_config(
    page_title="LLM Toolkit",
    page_icon="🤖",
    layout="wide",
    initial_sidebar_state="expanded"
)

# 导入核心模块
try:
    from llm_toolkit import (
        LLMManager,
        PromptManager,
        ConversationManager,
        CostTracker,
        RAGPipeline
    )
except ImportError:
    st.error("请先安装 LLM Toolkit: pip install llm-toolkit")
    st.stop()


def init_session_state():
    """初始化会话状态"""
    if "llm" not in st.session_state:
        st.session_state.llm = None
    
    if "conversation" not in st.session_state:
        st.session_state.conversation = None
    
    if "rag" not in st.session_state:
        st.session_state.rag = None
    
    if "cost_tracker" not in st.session_state:
        st.session_state.cost_tracker = CostTracker(
            daily_limit=100.0,
            monthly_limit=3000.0
        )
    
    if "messages" not in st.session_state:
        st.session_state.messages = []


def sidebar_config():
    """侧边栏配置"""
    with st.sidebar:
        st.title("⚙️ 配置")
        
        # LLM配置
        st.subheader("LLM设置")
        
        provider = st.selectbox(
            "提供商",
            ["openai", "anthropic", "google"],
            key="provider"
        )
        
        # 根据提供商显示模型选项
        if provider == "openai":
            models = ["gpt-4-turbo-preview", "gpt-4", "gpt-3.5-turbo"]
        elif provider == "anthropic":
            models = ["claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307"]
        else:
            models = ["gemini-pro"]
        
        model = st.selectbox("模型", models, key="model")
        
        temperature = st.slider(
            "Temperature",
            min_value=0.0,
            max_value=1.0,
            value=0.7,
            step=0.1,
            key="temperature"
        )
        
        max_tokens = st.number_input(
            "Max Tokens",
            min_value=100,
            max_value=4000,
            value=2000,
            step=100,
            key="max_tokens"
        )
        
        # 初始化LLM
        if st.button("初始化LLM", type="primary"):
            try:
                st.session_state.llm = LLMManager(
                    provider=provider,
                    model=model,
                    temperature=temperature,
                    max_tokens=max_tokens,
                    cost_tracker=st.session_state.cost_tracker
                )
                st.success(f"✅ LLM初始化成功: {provider}/{model}")
            except Exception as e:
                st.error(f"❌ 初始化失败: {e}")
        
        # 成本追踪
        st.divider()
        st.subheader("💰 成本追踪")
        
        if st.session_state.cost_tracker:
            daily_cost = st.session_state.cost_tracker.get_daily_cost()
            monthly_cost = st.session_state.cost_tracker.get_monthly_cost()
            
            col1, col2 = st.columns(2)
            with col1:
                st.metric("今日成本", f"${daily_cost:.4f}")
            with col2:
                st.metric("本月成本", f"${monthly_cost:.4f}")


def chat_page():
    """聊天页面"""
    st.title("💬 智能对话")
    
    if st.session_state.llm is None:
        st.warning("⚠️ 请先在侧边栏初始化LLM")
        return
    
    # 初始化对话管理器
    if st.session_state.conversation is None:
        st.session_state.conversation = ConversationManager(
            llm=st.session_state.llm,
            memory_type="buffer",
            max_history=10
        )
    
    # 显示对话历史
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])
    
    # 用户输入
    if prompt := st.chat_input("输入您的问题..."):
        # 添加用户消息
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)
        
        # 生成回复
        with st.chat_message("assistant"):
            message_placeholder = st.empty()
            full_response = ""
            
            # 流式响应
            for chunk in st.session_state.conversation.stream_chat(prompt):
                full_response += chunk
                message_placeholder.markdown(full_response + "▌")
            
            message_placeholder.markdown(full_response)
        
        # 添加助手消息
        st.session_state.messages.append({"role": "assistant", "content": full_response})
    
    # 清空对话
    if st.button("🗑️ 清空对话"):
        st.session_state.messages = []
        st.session_state.conversation.clear_history()
        st.rerun()


def rag_page():
    """RAG页面"""
    st.title("📚 知识库问答 (RAG)")
    
    if st.session_state.llm is None:
        st.warning("⚠️ 请先在侧边栏初始化LLM")
        return
    
    # 初始化RAG
    if st.session_state.rag is None:
        with st.spinner("初始化RAG系统..."):
            st.session_state.rag = RAGPipeline(
                llm=st.session_state.llm,
                vector_store="chromadb",
                top_k=5
            )
        st.success("✅ RAG系统初始化成功")
    
    # 文档管理
    with st.expander("📁 文档管理", expanded=True):
        tab1, tab2, tab3 = st.tabs(["上传文件", "添加文本", "从URL"])
        
        with tab1:
            uploaded_files = st.file_uploader(
                "选择文件",
                accept_multiple_files=True,
                type=["pdf", "txt", "md", "docx", "csv"]
            )
            
            if uploaded_files and st.button("上传文档"):
                for uploaded_file in uploaded_files:
                    # 保存临时文件
                    temp_path = Path("temp") / uploaded_file.name
                    temp_path.parent.mkdir(exist_ok=True)
                    
                    with open(temp_path, "wb") as f:
                        f.write(uploaded_file.getbuffer())
                    
                    # 添加到知识库
                    try:
                        num_docs = st.session_state.rag.add_documents(str(temp_path))
                        st.success(f"✅ {uploaded_file.name}: 添加了 {num_docs} 个文档块")
                    except Exception as e:
                        st.error(f"❌ {uploaded_file.name}: {e}")
                    finally:
                        # 删除临时文件
                        if temp_path.exists():
                            temp_path.unlink()
        
        with tab2:
            text_input = st.text_area("输入文本", height=200)
            if text_input and st.button("添加文本"):
                try:
                    num_docs = st.session_state.rag.add_texts([text_input])
                    st.success(f"✅ 添加了 {num_docs} 个文档块")
                except Exception as e:
                    st.error(f"❌ 添加失败: {e}")
        
        with tab3:
            url_input = st.text_input("输入URL")
            if url_input and st.button("从URL添加"):
                try:
                    with st.spinner("正在加载URL内容..."):
                        num_docs = st.session_state.rag.add_url(url_input)
                    st.success(f"✅ 添加了 {num_docs} 个文档块")
                except Exception as e:
                    st.error(f"❌ 添加失败: {e}")
    
    # 问答
    st.divider()
    st.subheader("🔍 知识库问答")
    
    question = st.text_input("输入您的问题")
    
    col1, col2 = st.columns([1, 4])
    with col1:
        top_k = st.number_input("检索数量", min_value=1, max_value=10, value=5)
    with col2:
        use_hybrid = st.checkbox("使用混合检索", value=False)
    
    if question and st.button("🔍 查询", type="primary"):
        with st.spinner("正在检索和生成答案..."):
            try:
                response = st.session_state.rag.query(
                    question,
                    top_k=top_k,
                    use_hybrid=use_hybrid
                )
                
                # 显示答案
                st.markdown("### 📝 答案")
                st.markdown(response.answer)
                
                # 显示来源
                if response.sources:
                    st.markdown("### 📚 参考来源")
                    for i, source in enumerate(response.sources, 1):
                        with st.expander(f"来源 {i}: {source.metadata.get('source', 'Unknown')}"):
                            st.markdown(source.page_content)
            
            except Exception as e:
                st.error(f"❌ 查询失败: {e}")


def prompt_page():
    """Prompt管理页面"""
    st.title("📝 Prompt模板管理")
    
    # 初始化Prompt管理器
    if "prompt_manager" not in st.session_state:
        st.session_state.prompt_manager = PromptManager()
        st.session_state.prompt_manager.create_builtin_templates()
    
    pm = st.session_state.prompt_manager
    
    # 模板列表
    st.subheader("📋 模板列表")
    
    templates = pm.list_templates()
    
    if templates:
        # 按分类分组
        categories = pm.get_categories()
        
        for category in categories:
            with st.expander(f"📁 {category}", expanded=True):
                category_templates = [t for t in templates if t.category == category]
                
                for template in category_templates:
                    col1, col2, col3 = st.columns([3, 1, 1])
                    
                    with col1:
                        st.markdown(f"**{template.name}**")
                        st.caption(template.description)
                    
                    with col2:
                        if st.button("使用", key=f"use_{template.name}"):
                            st.session_state.selected_template = template
                    
                    with col3:
                        if st.button("删除", key=f"del_{template.name}"):
                            pm.delete_template(template.name)
                            st.rerun()
    else:
        st.info("暂无模板")
    
    # 创建新模板
    st.divider()
    st.subheader("➕ 创建新模板")
    
    with st.form("create_template"):
        name = st.text_input("模板名称")
        category = st.text_input("分类", value="general")
        description = st.text_area("描述")
        template_text = st.text_area("模板内容", height=200)
        variables = st.text_input("变量(逗号分隔)", placeholder="例如: name, age, city")
        
        if st.form_submit_button("创建模板"):
            if name and template_text:
                try:
                    var_list = [v.strip() for v in variables.split(",") if v.strip()]
                    pm.create_template(
                        name=name,
                        template=template_text,
                        variables=var_list,
                        category=category,
                        description=description
                    )
                    st.success(f"✅ 模板 '{name}' 创建成功")
                    st.rerun()
                except Exception as e:
                    st.error(f"❌ 创建失败: {e}")
            else:
                st.warning("请填写模板名称和内容")
    
    # 使用模板
    if "selected_template" in st.session_state:
        st.divider()
        st.subheader("🎯 使用模板")
        
        template = st.session_state.selected_template
        st.markdown(f"**模板:** {template.name}")
        
        # 输入变量
        var_values = {}
        for var in template.variables:
            var_values[var] = st.text_input(f"{var}", key=f"var_{var}")
        
        if st.button("生成"):
            try:
                result = template.format(**var_values)
                st.markdown("### 生成结果")
                st.code(result)
                
                # 如果有LLM,可以直接执行
                if st.session_state.llm and st.button("执行"):
                    with st.spinner("正在生成..."):
                        response = st.session_state.llm.chat(result)
                        st.markdown("### LLM响应")
                        st.markdown(response)
            
            except Exception as e:
                st.error(f"❌ 生成失败: {e}")


def analytics_page():
    """分析页面"""
    st.title("📊 使用分析")
    
    if st.session_state.cost_tracker is None:
        st.warning("⚠️ 成本追踪器未初始化")
        return
    
    tracker = st.session_state.cost_tracker
    
    # 成本概览
    st.subheader("💰 成本概览")
    
    col1, col2, col3 = st.columns(3)
    
    with col1:
        daily_cost = tracker.get_daily_cost()
        st.metric(
            "今日成本",
            f"${daily_cost:.4f}",
            delta=f"{(daily_cost / tracker.daily_limit * 100):.1f}% 预算" if tracker.daily_limit else None
        )
    
    with col2:
        monthly_cost = tracker.get_monthly_cost()
        st.metric(
            "本月成本",
            f"${monthly_cost:.4f}",
            delta=f"{(monthly_cost / tracker.monthly_limit * 100):.1f}% 预算" if tracker.monthly_limit else None
        )
    
    with col3:
        st.metric("预算限制", f"${tracker.monthly_limit:.2f}/月" if tracker.monthly_limit else "未设置")
    
    # 统计数据
    st.divider()
    st.subheader("📈 使用统计")
    
    period = st.selectbox("时间周期", ["daily", "weekly", "monthly"])
    
    if st.button("生成报告"):
        report = tracker.generate_report(period=period, format="text")
        st.text(report)
    
    # 优化建议
    st.divider()
    st.subheader("💡 优化建议")
    
    suggestions = tracker.get_optimization_suggestions()
    
    if suggestions:
        for suggestion in suggestions:
            st.info(suggestion)
    else:
        st.success("✅ 当前使用已优化,暂无建议")


def main():
    """主函数"""
    # 初始化
    init_session_state()
    
    # 侧边栏
    sidebar_config()
    
    # 主页面导航
    page = st.sidebar.radio(
        "导航",
        ["💬 智能对话", "📚 知识库问答", "📝 Prompt管理", "📊 使用分析"]
    )
    
    # 路由
    if page == "💬 智能对话":
        chat_page()
    elif page == "📚 知识库问答":
        rag_page()
    elif page == "📝 Prompt管理":
        prompt_page()
    elif page == "📊 使用分析":
        analytics_page()
    
    # 页脚
    st.sidebar.divider()
    st.sidebar.markdown("---")
    st.sidebar.caption("LLM Toolkit v1.0.0")
    st.sidebar.caption("由 DREAMVFIA 用 ❤️ 打造")


if __name__ == "__main__":
    main()

LLM Toolkit - 第六部分:CLI工具、示例与文档


第六部分:CLI命令行工具

23. llm_toolkit/cli.py - 命令行接口

# -*- coding: utf-8 -*-
"""
CLI - 命令行接口
提供命令行交互功能
"""

import click
import os
from pathlib import Path
from loguru import logger

from llm_toolkit import (
    LLMManager,
    PromptManager,
    ConversationManager,
    CostTracker,
    RAGPipeline
)


@click.group()
@click.version_option(version="1.0.0")
def cli():
    """LLM Toolkit - 大语言模型工具包命令行工具"""
    pass


@cli.command()
@click.option("--provider", default="openai", help="LLM提供商")
@click.option("--model", default="gpt-4-turbo-preview", help="模型名称")
@click.option("--temperature", default=0.7, help="温度参数")
def chat(provider, model, temperature):
    """启动交互式聊天"""
    click.echo(f"🤖 启动聊天模式: {provider}/{model}")
    
    try:
        # 初始化LLM
        llm = LLMManager(
            provider=provider,
            model=model,
            temperature=temperature
        )
        
        # 初始化对话管理器
        conversation = ConversationManager(llm=llm)
        
        click.echo("✅ 初始化成功!输入 'exit' 或 'quit' 退出\n")
        
        while True:
            # 获取用户输入
            user_input = click.prompt("👤 You", type=str)
            
            if user_input.lower() in ["exit", "quit", "q"]:
                click.echo("👋 再见!")
                break
            
            # 生成回复
            click.echo("🤖 Assistant: ", nl=False)
            
            full_response = ""
            for chunk in conversation.stream_chat(user_input):
                click.echo(chunk, nl=False)
                full_response += chunk
            
            click.echo("\n")
    
    except Exception as e:
        click.echo(f"❌ 错误: {e}", err=True)


@cli.command()
@click.argument("source", type=click.Path(exists=True))
@click.option("--provider", default="openai", help="LLM提供商")
@click.option("--model", default="gpt-4-turbo-preview", help="模型名称")
@click.option("--vector-store", default="chromadb", help="向量数据库类型")
@click.option("--top-k", default=5, help="检索结果数量")
def rag(source, provider, model, vector_store, top_k):
    """启动RAG问答系统"""
    click.echo(f"📚 启动RAG系统: {source}")
    
    try:
        # 初始化LLM
        llm = LLMManager(provider=provider, model=model)
        
        # 初始化RAG
        rag_pipeline = RAGPipeline(
            llm=llm,
            vector_store=vector_store,
            top_k=top_k
        )
        
        # 加载文档
        click.echo(f"📄 正在加载文档...")
        num_docs = rag_pipeline.add_documents(source)
        click.echo(f"✅ 加载了 {num_docs} 个文档块\n")
        
        # 问答循环
        while True:
            question = click.prompt("❓ Question", type=str)
            
            if question.lower() in ["exit", "quit", "q"]:
                click.echo("👋 再见!")
                break
            
            # 查询
            click.echo("🔍 正在检索和生成答案...\n")
            response = rag_pipeline.query(question)
            
            # 显示答案
            click.echo("📝 Answer:")
            click.echo(response.answer)
            
            # 显示来源
            if response.sources:
                click.echo("\n📚 Sources:")
                for i, source in enumerate(response.sources, 1):
                    source_name = source.metadata.get("source", "Unknown")
                    click.echo(f"  {i}. {source_name}")
            
            click.echo()
    
    except Exception as e:
        click.echo(f"❌ 错误: {e}", err=True)


@cli.command()
@click.option("--name", prompt="模板名称", help="模板名称")
@click.option("--category", default="general", help="分类")
@click.option("--description", default="", help="描述")
def create_prompt(name, category, description):
    """创建Prompt模板"""
    click.echo(f"📝 创建Prompt模板: {name}")
    
    try:
        pm = PromptManager()
        
        # 获取模板内容
        click.echo("请输入模板内容(输入空行结束):")
        template_lines = []
        while True:
            line = input()
            if not line:
                break
            template_lines.append(line)
        
        template = "\n".join(template_lines)
        
        # 获取变量
        variables_str = click.prompt("变量(逗号分隔)", default="", type=str)
        variables = [v.strip() for v in variables_str.split(",") if v.strip()]
        
        # 创建模板
        pm.create_template(
            name=name,
            template=template,
            variables=variables,
            category=category,
            description=description
        )
        
        click.echo(f"✅ 模板 '{name}' 创建成功!")
    
    except Exception as e:
        click.echo(f"❌ 错误: {e}", err=True)


@cli.command()
def list_prompts():
    """列出所有Prompt模板"""
    try:
        pm = PromptManager()
        templates = pm.list_templates()
        
        if not templates:
            click.echo("暂无模板")
            return
        
        # 按分类分组
        categories = pm.get_categories()
        
        for category in categories:
            click.echo(f"\n📁 {category}")
            click.echo("=" * 50)
            
            category_templates = [t for t in templates if t.category == category]
            for template in category_templates:
                click.echo(f"  📝 {template.name}")
                click.echo(f"     {template.description}")
                click.echo(f"     变量: {', '.join(template.variables)}")
    
    except Exception as e:
        click.echo(f"❌ 错误: {e}", err=True)


@cli.command()
@click.option("--period", type=click.Choice(["daily", "weekly", "monthly"]), default="monthly")
@click.option("--format", type=click.Choice(["text", "json"]), default="text")
def cost_report(period, format):
    """生成成本报告"""
    try:
        tracker = CostTracker()
        report = tracker.generate_report(period=period, format=format)
        
        click.echo(report)
    
    except Exception as e:
        click.echo(f"❌ 错误: {e}", err=True)


@cli.command()
def init():
    """初始化项目"""
    click.echo("🚀 初始化 LLM Toolkit 项目...")
    
    # 创建目录
    directories = [
        "data",
        "data/chroma",
        "data/cache",
        "logs",
        "prompts",
        "models",
        "temp"
    ]
    
    for directory in directories:
        Path(directory).mkdir(parents=True, exist_ok=True)
        click.echo(f"✅ 创建目录: {directory}")
    
    # 创建.env文件
    if not Path(".env").exists():
        with open(".env.example", "r") as src, open(".env", "w") as dst:
            dst.write(src.read())
        click.echo("✅ 创建配置文件: .env")
    
    click.echo("\n🎉 初始化完成!")
    click.echo("请编辑 .env 文件配置您的API密钥")


@cli.command()
@click.argument("input_file", type=click.Path(exists=True))
@click.argument("output_file", type=click.Path())
@click.option("--provider", default="openai", help="LLM提供商")
@click.option("--model", default="gpt-4-turbo-preview", help="模型名称")
@click.option("--task", type=click.Choice(["summarize", "translate", "qa"]), required=True)
def batch_process(input_file, output_file, provider, model, task):
    """批量处理文件"""
    click.echo(f"⚙️ 批量处理: {task}")
    
    try:
        # 初始化LLM
        llm = LLMManager(provider=provider, model=model)
        
        # 读取输入
        with open(input_file, "r", encoding="utf-8") as f:
            lines = f.readlines()
        
        results = []
        
        with click.progressbar(lines, label="处理中") as bar:
            for line in bar:
                line = line.strip()
                if not line:
                    continue
                
                # 根据任务类型处理
                if task == "summarize":
                    prompt = f"请简要总结以下内容:\n\n{line}\n\n摘要:"
                elif task == "translate":
                    prompt = f"请将以下文本翻译成英文:\n\n{line}\n\n翻译:"
                elif task == "qa":
                    prompt = line
                
                response = llm.chat(prompt)
                results.append(response)
        
        # 保存结果
        with open(output_file, "w", encoding="utf-8") as f:
            for result in results:
                f.write(result + "\n\n")
        
        click.echo(f"✅ 处理完成!结果保存到: {output_file}")
    
    except Exception as e:
        click.echo(f"❌ 错误: {e}", err=True)


def main():
    """主入口"""
    cli()


if __name__ == "__main__":
    main()

第七部分:示例代码

24. examples/basic_usage.py - 基础使用示例

# -*- coding: utf-8 -*-
"""
基础使用示例
演示LLM Toolkit的基本功能
"""

from llm_toolkit import LLMManager, PromptManager, ConversationManager


def example_basic_chat():
    """示例1: 基础对话"""
    print("=" * 60)
    print("示例1: 基础对话")
    print("=" * 60)
    
    # 初始化LLM
    llm = LLMManager(
        provider="openai",
        model="gpt-3.5-turbo",
        temperature=0.7
    )
    
    # 简单对话
    response = llm.chat("你好,请介绍一下自己")
    print(f"回复: {response}\n")


def example_streaming():
    """示例2: 流式响应"""
    print("=" * 60)
    print("示例2: 流式响应")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    
    print("回复: ", end="", flush=True)
    for chunk in llm.stream("讲一个简短的笑话"):
        print(chunk, end="", flush=True)
    print("\n")


def example_conversation():
    """示例3: 多轮对话"""
    print("=" * 60)
    print("示例3: 多轮对话")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    
    # 创建对话管理器
    conversation = ConversationManager(
        llm=llm,
        memory_type="buffer",
        max_history=5
    )
    
    # 第一轮
    response1 = conversation.chat("我的名字是Alice")
    print(f"回复1: {response1}")
    
    # 第二轮(会记住上下文)
    response2 = conversation.chat("我刚才说我叫什么?")
    print(f"回复2: {response2}\n")


def example_prompt_template():
    """示例4: Prompt模板"""
    print("=" * 60)
    print("示例4: Prompt模板")
    print("=" * 60)
    
    # 初始化Prompt管理器
    pm = PromptManager()
    
    # 创建模板
    pm.create_template(
        name="product_review",
        template="请为以下产品写一篇评论:\n产品名称:{product_name}\n特点:{features}\n\n评论:",
        variables=["product_name", "features"],
        category="marketing"
    )
    
    # 使用模板
    template = pm.get_template("product_review")
    prompt = template.format(
        product_name="智能手表",
        features="心率监测、GPS定位、防水、长续航"
    )
    
    print(f"生成的Prompt:\n{prompt}\n")
    
    # 使用LLM生成内容
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    response = llm.chat(prompt)
    print(f"生成的评论:\n{response}\n")


def example_batch_processing():
    """示例5: 批量处理"""
    print("=" * 60)
    print("示例5: 批量处理")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    
    questions = [
        "什么是人工智能?",
        "什么是机器学习?",
        "什么是深度学习?"
    ]
    
    responses = llm.batch_chat(questions)
    
    for q, r in zip(questions, responses):
        print(f"Q: {q}")
        print(f"A: {r}\n")


def main():
    """运行所有示例"""
    print("\n" + "=" * 60)
    print("LLM Toolkit 基础使用示例")
    print("=" * 60 + "\n")
    
    # 运行示例
    example_basic_chat()
    example_streaming()
    example_conversation()
    example_prompt_template()
    example_batch_processing()
    
    print("=" * 60)
    print("所有示例运行完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()

25. examples/rag_example.py - RAG示例

# -*- coding: utf-8 -*-
"""
RAG使用示例
演示检索增强生成功能
"""

from llm_toolkit import LLMManager, RAGPipeline
from pathlib import Path


def example_rag_basic():
    """示例1: 基础RAG"""
    print("=" * 60)
    print("示例1: 基础RAG")
    print("=" * 60)
    
    # 初始化LLM
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    
    # 初始化RAG
    rag = RAGPipeline(
        llm=llm,
        vector_store="chromadb",
        top_k=3
    )
    
    # 添加文本
    texts = [
        "Python是一种高级编程语言,由Guido van Rossum于1991年创建。",
        "Python具有简洁的语法和强大的功能,广泛应用于Web开发、数据分析、人工智能等领域。",
        "Python的包管理器pip可以方便地安装第三方库。",
    ]
    
    rag.add_texts(texts)
    
    # 查询
    response = rag.query("Python是什么时候创建的?")
    
    print(f"问题: Python是什么时候创建的?")
    print(f"答案: {response.answer}")
    print(f"\n来源数量: {len(response.sources)}\n")


def example_rag_documents():
    """示例2: 从文档构建RAG"""
    print("=" * 60)
    print("示例2: 从文档构建RAG")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    rag = RAGPipeline(llm=llm)
    
    # 创建示例文档
    doc_dir = Path("temp/docs")
    doc_dir.mkdir(parents=True, exist_ok=True)
    
    # 创建示例文件
    with open(doc_dir / "python_intro.txt", "w", encoding="utf-8") as f:
        f.write("""
Python简介

Python是一种解释型、面向对象、动态数据类型的高级程序设计语言。

主要特点:
1. 简单易学:Python语法简洁清晰
2. 免费开源:Python是开源软件
3. 可移植性:Python可以在多种平台上运行
4. 丰富的库:Python有大量的标准库和第三方库

应用领域:
- Web开发
- 数据分析
- 人工智能
- 自动化脚本
- 科学计算
        """)
    
    # 加载文档
    num_docs = rag.add_documents(str(doc_dir))
    print(f"加载了 {num_docs} 个文档块")
    
    # 查询
    questions = [
        "Python有哪些特点?",
        "Python可以用于哪些领域?"
    ]
    
    for question in questions:
        response = rag.query(question)
        print(f"\n问题: {question}")
        print(f"答案: {response.answer}")


def example_rag_url():
    """示例3: 从URL构建RAG"""
    print("=" * 60)
    print("示例3: 从URL构建RAG")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    rag = RAGPipeline(llm=llm)
    
    # 从URL加载(示例)
    # url = "https://example.com/article"
    # num_docs = rag.add_url(url)
    # print(f"从URL加载了 {num_docs} 个文档块")
    
    print("URL加载示例(需要实际URL)\n")


def example_rag_hybrid():
    """示例4: 混合检索"""
    print("=" * 60)
    print("示例4: 混合检索")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    rag = RAGPipeline(llm=llm, top_k=5)
    
    # 添加更多文本
    texts = [
        "机器学习是人工智能的一个分支。",
        "深度学习是机器学习的一个子领域。",
        "神经网络是深度学习的基础。",
        "卷积神经网络(CNN)常用于图像识别。",
        "循环神经网络(RNN)常用于序列数据处理。",
    ]
    
    rag.add_texts(texts)
    
    # 使用混合检索
    response = rag.query(
        "深度学习和机器学习有什么关系?",
        use_hybrid=True
    )
    
    print(f"问题: 深度学习和机器学习有什么关系?")
    print(f"答案: {response.answer}\n")


def main():
    """运行所有示例"""
    print("\n" + "=" * 60)
    print("RAG使用示例")
    print("=" * 60 + "\n")
    
    example_rag_basic()
    example_rag_documents()
    example_rag_url()
    example_rag_hybrid()
    
    print("=" * 60)
    print("所有示例运行完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()

26. examples/cost_tracking_example.py - 成本追踪示例

# -*- coding: utf-8 -*-
"""
成本追踪示例
演示成本管理功能
"""

from llm_toolkit import LLMManager, CostTracker


def example_cost_tracking():
    """示例1: 基础成本追踪"""
    print("=" * 60)
    print("示例1: 基础成本追踪")
    print("=" * 60)
    
    # 创建成本追踪器
    tracker = CostTracker(
        daily_limit=10.0,
        monthly_limit=300.0,
        alert_threshold=0.8
    )
    
    # 创建LLM(关联成本追踪器)
    llm = LLMManager(
        provider="openai",
        model="gpt-3.5-turbo",
        cost_tracker=tracker
    )
    
    # 进行一些对话
    questions = [
        "什么是人工智能?",
        "什么是机器学习?",
        "什么是深度学习?"
    ]
    
    for question in questions:
        response = llm.chat(question)
        print(f"Q: {question}")
        print(f"A: {response[:100]}...\n")
    
    # 查看成本
    daily_cost = tracker.get_daily_cost()
    monthly_cost = tracker.get_monthly_cost()
    
    print(f"今日成本: ${daily_cost:.4f}")
    print(f"本月成本: ${monthly_cost:.4f}\n")


def example_cost_report():
    """示例2: 成本报告"""
    print("=" * 60)
    print("示例2: 成本报告")
    print("=" * 60)
    
    tracker = CostTracker()
    
    # 生成报告
    report = tracker.generate_report(period="monthly", format="text")
    print(report)


def example_optimization_suggestions():
    """示例3: 优化建议"""
    print("=" * 60)
    print("示例3: 优化建议")
    print("=" * 60)
    
    tracker = CostTracker()
    
    # 获取优化建议
    suggestions = tracker.get_optimization_suggestions()
    
    if suggestions:
        print("💡 成本优化建议:")
        for i, suggestion in enumerate(suggestions, 1):
            print(f"{i}. {suggestion}")
    else:
        print("✅ 当前使用已优化\n")


def main():
    """运行所有示例"""
    print("\n" + "=" * 60)
    print("成本追踪示例")
    print("=" * 60 + "\n")
    
    example_cost_tracking()
    example_cost_report()
    example_optimization_suggestions()
    
    print("=" * 60)
    print("所有示例运行完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()

27. examples/advanced_example.py - 高级功能示例

# -*- coding: utf-8 -*-
"""
高级功能示例
演示LLM Toolkit的高级特性
"""

from llm_toolkit import (
    LLMManager,
    PromptManager,
    ConversationManager,
    RAGPipeline
)


def example_multi_provider():
    """示例1: 多提供商切换"""
    print("=" * 60)
    print("示例1: 多提供商切换")
    print("=" * 60)
    
    providers = [
        ("openai", "gpt-3.5-turbo"),
        # ("anthropic", "claude-3-haiku-20240307"),
        # ("google", "gemini-pro")
    ]
    
    question = "用一句话解释什么是AI"
    
    for provider, model in providers:
        try:
            llm = LLMManager(provider=provider, model=model)
            response = llm.chat(question)
            print(f"\n{provider}/{model}:")
            print(response)
        except Exception as e:
            print(f"\n{provider}/{model}: 跳过 ({e})")
    
    print()


def example_conversation_export():
    """示例2: 对话导出"""
    print("=" * 60)
    print("示例2: 对话导出")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    conversation = ConversationManager(llm=llm)
    
    # 进行对话
    conversation.chat("你好")
    conversation.chat("我想学习Python")
    conversation.chat("从哪里开始?")
    
    # 保存对话
    conversation.save("temp/conversation.json")
    print("✅ 对话已保存到 temp/conversation.json")
    
    # 加载对话
    new_conversation = ConversationManager(llm=llm)
    new_conversation.load("temp/conversation.json")
    print(f"✅ 加载了 {len(new_conversation.messages)} 条消息\n")


def example_prompt_versioning():
    """示例3: Prompt版本管理"""
    print("=" * 60)
    print("示例3: Prompt版本管理")
    print("=" * 60)
    
    pm = PromptManager()
    
    # 创建初始模板
    pm.create_template(
        name="email_template",
        template="写一封{tone}的邮件给{recipient},主题是{subject}。",
        variables=["tone", "recipient", "subject"],
        category="communication"
    )
    
    print("✅ 创建了模板 v1")
    
    # 更新模板(创建新版本)
    pm.update_template(
        name="email_template",
        template="写一封{tone}的{language}邮件给{recipient},主题是{subject}。请保持专业和礼貌。",
        variables=["tone", "language", "recipient", "subject"],
        create_version=True
    )
    
    print("✅ 创建了模板 v2")
    
    # 查看版本
    versions = pm.get_versions("email_template")
    print(f"✅ 共有 {len(versions)} 个版本")
    
    # 回滚到v1
    pm.rollback("email_template", "v1")
    print("✅ 回滚到 v1\n")


def example_rag_with_metadata():
    """示例4: 带元数据的RAG"""
    print("=" * 60)
    print("示例4: 带元数据的RAG")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    rag = RAGPipeline(llm=llm)
    
    # 添加带元数据的文本
    texts = [
        "Python 3.9发布于2020年10月",
        "Python 3.10发布于2021年10月",
        "Python 3.11发布于2022年10月"
    ]
    
    metadatas = [
        {"version": "3.9", "year": 2020},
        {"version": "3.10", "year": 2021},
        {"version": "3.11", "year": 2022}
    ]
    
    rag.add_texts(texts, metadatas=metadatas)
    
    # 查询
    response = rag.query("Python 3.10什么时候发布的?")
    
    print(f"问题: Python 3.10什么时候发布的?")
    print(f"答案: {response.answer}")
    
    if response.sources:
        print("\n来源:")
        for source in response.sources:
            print(f"  版本: {source.metadata.get('version')}, 年份: {source.metadata.get('year')}")
    
    print()


def example_streaming_with_callback():
    """示例5: 带回调的流式响应"""
    print("=" * 60)
    print("示例5: 带回调的流式响应")
    print("=" * 60)
    
    llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
    
    # 定义回调函数
    chunks = []
    
    def callback(chunk):
        chunks.append(chunk)
        print(chunk, end="", flush=True)
    
    print("生成中: ", end="", flush=True)
    
    for chunk in llm.stream("写一首关于AI的短诗"):
        callback(chunk)
    
    print(f"\n\n生成了 {len(chunks)} 个片段\n")


def main():
    """运行所有示例"""
    print("\n" + "=" * 60)
    print("高级功能示例")
    print("=" * 60 + "\n")
    
    example_multi_provider()
    example_conversation_export()
    example_prompt_versioning()
    example_rag_with_metadata()
    example_streaming_with_callback()
    
    print("=" * 60)
    print("所有示例运行完成!")
    print("=" * 60)


if __name__ == "__main__":
    main()

LLM Toolkit - 第八部分:配置和环境设置

配置文件和环境设置


28. .env.example - 环境变量示例

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT 环境配置文件
# 请复制此文件为 .env 并填入您的实际配置
# ═══════════════════════════════════════════════════════════════════

# ───────────────────────────────────────────────────────────────────
# OpenAI 配置
# ───────────────────────────────────────────────────────────────────
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_API_BASE=https://api.openai.com/v1
OPENAI_ORG_ID=
OPENAI_DEFAULT_MODEL=gpt-4-turbo-preview

# ───────────────────────────────────────────────────────────────────
# Anthropic (Claude) 配置
# ───────────────────────────────────────────────────────────────────
ANTHROPIC_API_KEY=your_anthropic_api_key_here
ANTHROPIC_DEFAULT_MODEL=claude-3-opus-20240229

# ───────────────────────────────────────────────────────────────────
# Google (Gemini) 配置
# ───────────────────────────────────────────────────────────────────
GOOGLE_API_KEY=your_google_api_key_here
GOOGLE_DEFAULT_MODEL=gemini-pro

# ───────────────────────────────────────────────────────────────────
# Cohere 配置
# ───────────────────────────────────────────────────────────────────
COHERE_API_KEY=your_cohere_api_key_here
COHERE_DEFAULT_MODEL=command

# ───────────────────────────────────────────────────────────────────
# Hugging Face 配置
# ───────────────────────────────────────────────────────────────────
HUGGINGFACE_API_KEY=your_huggingface_api_key_here
HUGGINGFACE_DEFAULT_MODEL=meta-llama/Llama-2-70b-chat-hf

# ───────────────────────────────────────────────────────────────────
# 本地模型配置 (Ollama)
# ───────────────────────────────────────────────────────────────────
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_DEFAULT_MODEL=llama2

# ───────────────────────────────────────────────────────────────────
# 向量数据库配置
# ───────────────────────────────────────────────────────────────────
# ChromaDB
CHROMA_PERSIST_DIRECTORY=./data/chroma
CHROMA_COLLECTION_NAME=llm_toolkit

# Pinecone
PINECONE_API_KEY=your_pinecone_api_key_here
PINECONE_ENVIRONMENT=us-west1-gcp
PINECONE_INDEX_NAME=llm-toolkit

# Weaviate
WEAVIATE_URL=http://localhost:8080
WEAVIATE_API_KEY=

# ───────────────────────────────────────────────────────────────────
# 嵌入模型配置
# ───────────────────────────────────────────────────────────────────
EMBEDDING_PROVIDER=openai
EMBEDDING_MODEL=text-embedding-3-small
EMBEDDING_DIMENSION=1536

# ───────────────────────────────────────────────────────────────────
# 缓存配置
# ───────────────────────────────────────────────────────────────────
CACHE_ENABLED=true
CACHE_TYPE=redis  # redis, memory, disk
CACHE_TTL=3600
CACHE_MAX_SIZE=1000

# Redis配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=

# 磁盘缓存配置
DISK_CACHE_DIR=./data/cache

# ───────────────────────────────────────────────────────────────────
# 日志配置
# ───────────────────────────────────────────────────────────────────
LOG_LEVEL=INFO  # DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_FILE=./logs/llm_toolkit.log
LOG_ROTATION=1 day
LOG_RETENTION=30 days
LOG_FORMAT=<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>

# ───────────────────────────────────────────────────────────────────
# 成本追踪配置
# ───────────────────────────────────────────────────────────────────
COST_TRACKING_ENABLED=true
DAILY_COST_LIMIT=50.0
MONTHLY_COST_LIMIT=1000.0
COST_ALERT_THRESHOLD=0.8
COST_DB_PATH=./data/costs.db

# ───────────────────────────────────────────────────────────────────
# 性能配置
# ───────────────────────────────────────────────────────────────────
MAX_RETRIES=3
RETRY_DELAY=1.0
REQUEST_TIMEOUT=60
MAX_CONCURRENT_REQUESTS=10
RATE_LIMIT_PER_MINUTE=60

# ───────────────────────────────────────────────────────────────────
# RAG配置
# ───────────────────────────────────────────────────────────────────
CHUNK_SIZE=1000
CHUNK_OVERLAP=200
TOP_K=5
SIMILARITY_THRESHOLD=0.7
RERANK_ENABLED=false

# ───────────────────────────────────────────────────────────────────
# 对话配置
# ───────────────────────────────────────────────────────────────────
MAX_HISTORY_LENGTH=10
CONVERSATION_SAVE_DIR=./data/conversations

# ───────────────────────────────────────────────────────────────────
# Prompt模板配置
# ───────────────────────────────────────────────────────────────────
PROMPT_TEMPLATES_DIR=./prompts
ENABLE_PROMPT_VERSIONING=true

# ───────────────────────────────────────────────────────────────────
# 安全配置
# ───────────────────────────────────────────────────────────────────
ENABLE_CONTENT_FILTER=true
MAX_INPUT_LENGTH=10000
MAX_OUTPUT_LENGTH=4000
ALLOWED_DOMAINS=*

# ───────────────────────────────────────────────────────────────────
# 开发配置
# ───────────────────────────────────────────────────────────────────
DEBUG_MODE=false
VERBOSE=false
ENABLE_PROFILING=false

# ───────────────────────────────────────────────────────────────────
# 数据库配置
# ───────────────────────────────────────────────────────────────────
DATABASE_URL=sqlite:///./data/llm_toolkit.db
# 或使用PostgreSQL: postgresql://user:password@localhost/dbname

# ───────────────────────────────────────────────────────────────────
# 监控配置
# ───────────────────────────────────────────────────────────────────
ENABLE_METRICS=true
METRICS_PORT=9090
ENABLE_TRACING=false

# ───────────────────────────────────────────────────────────────────
# 代理配置
# ───────────────────────────────────────────────────────────────────
HTTP_PROXY=
HTTPS_PROXY=
NO_PROXY=localhost,127.0.0.1

# ───────────────────────────────────────────────────────────────────
# 其他配置
# ───────────────────────────────────────────────────────────────────
TEMP_DIR=./temp
ENABLE_AUTO_UPDATE=false
TELEMETRY_ENABLED=false

29. config/default.yaml - 默认配置文件

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT 默认配置
# ═══════════════════════════════════════════════════════════════════

# ───────────────────────────────────────────────────────────────────
# 应用配置
# ───────────────────────────────────────────────────────────────────
app:
  name: "LLM Toolkit"
  version: "1.0.0"
  description: "大语言模型工具包"
  author: "DREAMVFIA"
  license: "MIT"

# ───────────────────────────────────────────────────────────────────
# LLM提供商配置
# ───────────────────────────────────────────────────────────────────
llm:
  default_provider: "openai"
  default_temperature: 0.7
  default_max_tokens: 2000
  default_top_p: 1.0
  default_frequency_penalty: 0.0
  default_presence_penalty: 0.0
  
  providers:
    openai:
      models:
        - name: "gpt-4-turbo-preview"
          max_tokens: 128000
          cost_per_1k_input: 0.01
          cost_per_1k_output: 0.03
        - name: "gpt-4"
          max_tokens: 8192
          cost_per_1k_input: 0.03
          cost_per_1k_output: 0.06
        - name: "gpt-3.5-turbo"
          max_tokens: 16385
          cost_per_1k_input: 0.0005
          cost_per_1k_output: 0.0015
        - name: "gpt-3.5-turbo-16k"
          max_tokens: 16385
          cost_per_1k_input: 0.003
          cost_per_1k_output: 0.004
    
    anthropic:
      models:
        - name: "claude-3-opus-20240229"
          max_tokens: 200000
          cost_per_1k_input: 0.015
          cost_per_1k_output: 0.075
        - name: "claude-3-sonnet-20240229"
          max_tokens: 200000
          cost_per_1k_input: 0.003
          cost_per_1k_output: 0.015
        - name: "claude-3-haiku-20240307"
          max_tokens: 200000
          cost_per_1k_input: 0.00025
          cost_per_1k_output: 0.00125
    
    google:
      models:
        - name: "gemini-pro"
          max_tokens: 32768
          cost_per_1k_input: 0.00025
          cost_per_1k_output: 0.0005
        - name: "gemini-pro-vision"
          max_tokens: 16384
          cost_per_1k_input: 0.00025
          cost_per_1k_output: 0.0005

# ───────────────────────────────────────────────────────────────────
# 嵌入模型配置
# ───────────────────────────────────────────────────────────────────
embedding:
  default_provider: "openai"
  default_model: "text-embedding-3-small"
  
  models:
    openai:
      - name: "text-embedding-3-small"
        dimension: 1536
        cost_per_1k: 0.00002
      - name: "text-embedding-3-large"
        dimension: 3072
        cost_per_1k: 0.00013
      - name: "text-embedding-ada-002"
        dimension: 1536
        cost_per_1k: 0.0001
    
    huggingface:
      - name: "sentence-transformers/all-MiniLM-L6-v2"
        dimension: 384
        cost_per_1k: 0.0
      - name: "sentence-transformers/all-mpnet-base-v2"
        dimension: 768
        cost_per_1k: 0.0

# ───────────────────────────────────────────────────────────────────
# 向量数据库配置
# ───────────────────────────────────────────────────────────────────
vector_store:
  default_type: "chromadb"
  
  chromadb:
    persist_directory: "./data/chroma"
    collection_name: "llm_toolkit"
    distance_metric: "cosine"
  
  pinecone:
    environment: "us-west1-gcp"
    index_name: "llm-toolkit"
    dimension: 1536
    metric: "cosine"
  
  weaviate:
    url: "http://localhost:8080"
    class_name: "Document"

# ───────────────────────────────────────────────────────────────────
# RAG配置
# ───────────────────────────────────────────────────────────────────
rag:
  chunk_size: 1000
  chunk_overlap: 200
  top_k: 5
  similarity_threshold: 0.7
  
  text_splitter:
    type: "recursive"  # recursive, character, token
    separators: ["\n\n", "\n", " ", ""]
  
  retrieval:
    method: "similarity"  # similarity, mmr, similarity_score_threshold
    mmr_lambda: 0.5
    fetch_k: 20
  
  reranking:
    enabled: false
    model: "cross-encoder/ms-marco-MiniLM-L-6-v2"
    top_n: 3

# ───────────────────────────────────────────────────────────────────
# 缓存配置
# ───────────────────────────────────────────────────────────────────
cache:
  enabled: true
  type: "memory"  # memory, redis, disk
  ttl: 3600
  max_size: 1000
  
  redis:
    host: "localhost"
    port: 6379
    db: 0
    password: null
    prefix: "llm_toolkit:"
  
  disk:
    directory: "./data/cache"
    max_size_mb: 1000

# ───────────────────────────────────────────────────────────────────
# 日志配置
# ───────────────────────────────────────────────────────────────────
logging:
  level: "INFO"
  file: "./logs/llm_toolkit.log"
  rotation: "1 day"
  retention: "30 days"
  format: "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
  
  handlers:
    console:
      enabled: true
      level: "INFO"
    
    file:
      enabled: true
      level: "DEBUG"
    
    error_file:
      enabled: true
      level: "ERROR"
      file: "./logs/errors.log"

# ───────────────────────────────────────────────────────────────────
# 成本追踪配置
# ───────────────────────────────────────────────────────────────────
cost_tracking:
  enabled: true
  database: "./data/costs.db"
  
  limits:
    daily: 50.0
    weekly: 300.0
    monthly: 1000.0
  
  alerts:
    enabled: true
    threshold: 0.8
    email: null
    webhook: null

# ───────────────────────────────────────────────────────────────────
# 性能配置
# ───────────────────────────────────────────────────────────────────
performance:
  max_retries: 3
  retry_delay: 1.0
  request_timeout: 60
  max_concurrent_requests: 10
  rate_limit_per_minute: 60
  
  batch_processing:
    enabled: true
    batch_size: 10
    max_workers: 4

# ───────────────────────────────────────────────────────────────────
# 对话配置
# ───────────────────────────────────────────────────────────────────
conversation:
  max_history_length: 10
  save_directory: "./data/conversations"
  auto_save: true
  
  memory_types:
    - buffer
    - summary
    - token_buffer
    - conversation_kg

# ───────────────────────────────────────────────────────────────────
# Prompt模板配置
# ───────────────────────────────────────────────────────────────────
prompts:
  templates_directory: "./prompts"
  enable_versioning: true
  default_category: "general"
  
  categories:
    - general
    - coding
    - writing
    - analysis
    - translation
    - summarization
    - qa

# ───────────────────────────────────────────────────────────────────
# 安全配置
# ───────────────────────────────────────────────────────────────────
security:
  content_filter:
    enabled: true
    block_harmful: true
    block_pii: true
  
  input_validation:
    max_length: 10000
    allowed_chars: null
  
  output_validation:
    max_length: 4000
    sanitize: true
  
  rate_limiting:
    enabled: true
    requests_per_minute: 60
    requests_per_hour: 1000

# ───────────────────────────────────────────────────────────────────
# 监控配置
# ───────────────────────────────────────────────────────────────────
monitoring:
  metrics:
    enabled: true
    port: 9090
    path: "/metrics"
  
  tracing:
    enabled: false
    provider: "jaeger"
    endpoint: "http://localhost:14268/api/traces"
  
  health_check:
    enabled: true
    interval: 60
    endpoint: "/health"

# ───────────────────────────────────────────────────────────────────
# 开发配置
# ───────────────────────────────────────────────────────────────────
development:
  debug_mode: false
  verbose: false
  profiling: false
  hot_reload: false
  
  testing:
    mock_llm: false
    mock_embedding: false
    mock_vector_store: false

# ───────────────────────────────────────────────────────────────────
# 功能开关
# ───────────────────────────────────────────────────────────────────
features:
  streaming: true
  async_processing: true
  batch_processing: true
  rag: true
  conversation_memory: true
  cost_tracking: true
  caching: true
  auto_retry: true
  content_filtering: true
  telemetry: false

30. config/production.yaml - 生产环境配置

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT 生产环境配置
# ═══════════════════════════════════════════════════════════════════

# 继承默认配置
_extends: default.yaml

# ───────────────────────────────────────────────────────────────────
# 生产环境特定配置
# ───────────────────────────────────────────────────────────────────

# 日志配置
logging:
  level: "WARNING"
  handlers:
    console:
      enabled: false
    file:
      enabled: true
      level: "INFO"
    error_file:
      enabled: true
      level: "ERROR"

# 缓存配置
cache:
  type: "redis"
  ttl: 7200
  max_size: 10000

# 性能配置
performance:
  max_concurrent_requests: 50
  rate_limit_per_minute: 300
  batch_processing:
    batch_size: 20
    max_workers: 8

# 成本追踪
cost_tracking:
  limits:
    daily: 100.0
    weekly: 600.0
    monthly: 2000.0
  alerts:
    enabled: true
    threshold: 0.9

# 安全配置
security:
  content_filter:
    enabled: true
    block_harmful: true
    block_pii: true
  rate_limiting:
    enabled: true
    requests_per_minute: 300
    requests_per_hour: 10000

# 监控配置
monitoring:
  metrics:
    enabled: true
  tracing:
    enabled: true
  health_check:
    enabled: true
    interval: 30

# 开发配置
development:
  debug_mode: false
  verbose: false
  profiling: false

31. config/development.yaml - 开发环境配置

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT 开发环境配置
# ═══════════════════════════════════════════════════════════════════

# 继承默认配置
_extends: default.yaml

# ───────────────────────────────────────────────────────────────────
# 开发环境特定配置
# ───────────────────────────────────────────────────────────────────

# 日志配置
logging:
  level: "DEBUG"
  handlers:
    console:
      enabled: true
      level: "DEBUG"
    file:
      enabled: true
      level: "DEBUG"

# 缓存配置
cache:
  type: "memory"
  ttl: 300
  max_size: 100

# 成本追踪
cost_tracking:
  limits:
    daily: 10.0
    weekly: 50.0
    monthly: 200.0

# 安全配置
security:
  rate_limiting:
    enabled: false

# 监控配置
monitoring:
  metrics:
    enabled: false
  tracing:
    enabled: false

# 开发配置
development:
  debug_mode: true
  verbose: true
  profiling: true
  hot_reload: true
  testing:
    mock_llm: false
    mock_embedding: false
    mock_vector_store: false

32. config/config_loader.py - 配置加载器

# -*- coding: utf-8 -*-
"""
配置加载器
负责加载和合并配置文件
"""

import os
import yaml
from pathlib import Path
from typing import Dict, Any, Optional
from dotenv import load_dotenv
from loguru import logger


class ConfigLoader:
    """配置加载器"""
    
    def __init__(self, env: str = None):
        """
        初始化配置加载器
        
        Args:
            env: 环境名称 (development, production, testing)
        """
        self.env = env or os.getenv("ENV", "development")
        self.config_dir = Path(__file__).parent
        self.config: Dict[str, Any] = {}
        
        # 加载配置
        self._load_config()
        
        # 加载环境变量
        self._load_env()
    
    def _load_config(self):
        """加载配置文件"""
        # 加载默认配置
        default_config_path = self.config_dir / "default.yaml"
        if default_config_path.exists():
            with open(default_config_path, "r", encoding="utf-8") as f:
                self.config = yaml.safe_load(f) or {}
        
        # 加载环境特定配置
        env_config_path = self.config_dir / f"{self.env}.yaml"
        if env_config_path.exists():
            with open(env_config_path, "r", encoding="utf-8") as f:
                env_config = yaml.safe_load(f) or {}
                self._merge_config(self.config, env_config)
        
        logger.info(f"配置加载完成: {self.env}")
    
    def _load_env(self):
        """加载环境变量"""
        # 加载.env文件
        env_file = Path(".env")
        if env_file.exists():
            load_dotenv(env_file)
            logger.info("环境变量加载完成")
        
        # 覆盖配置
        self._override_from_env()
    
    def _override_from_env(self):
        """从环境变量覆盖配置"""
        # LLM配置
        if os.getenv("OPENAI_API_KEY"):
            self.set("llm.providers.openai.api_key", os.getenv("OPENAI_API_KEY"))
        
        if os.getenv("ANTHROPIC_API_KEY"):
            self.set("llm.providers.anthropic.api_key", os.getenv("ANTHROPIC_API_KEY"))
        
        if os.getenv("GOOGLE_API_KEY"):
            self.set("llm.providers.google.api_key", os.getenv("GOOGLE_API_KEY"))
        
        # 缓存配置
        if os.getenv("CACHE_TYPE"):
            self.set("cache.type", os.getenv("CACHE_TYPE"))
        
        if os.getenv("REDIS_HOST"):
            self.set("cache.redis.host", os.getenv("REDIS_HOST"))
        
        # 日志配置
        if os.getenv("LOG_LEVEL"):
            self.set("logging.level", os.getenv("LOG_LEVEL"))
        
        # 成本追踪
        if os.getenv("DAILY_COST_LIMIT"):
            self.set("cost_tracking.limits.daily", float(os.getenv("DAILY_COST_LIMIT")))
    
    def _merge_config(self, base: Dict, override: Dict):
        """
        合并配置
        
        Args:
            base: 基础配置
            override: 覆盖配置
        """
        for key, value in override.items():
            if key in base and isinstance(base[key], dict) and isinstance(value, dict):
                self._merge_config(base[key], value)
            else:
                base[key] = value
    
    def get(self, key: str, default: Any = None) -> Any:
        """
        获取配置值
        
        Args:
            key: 配置键(支持点号分隔的路径)
            default: 默认值
            
        Returns:
            配置值
        """
        keys = key.split(".")
        value = self.config
        
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        
        return value
    
    def set(self, key: str, value: Any):
        """
        设置配置值
        
        Args:
            key: 配置键
            value: 配置值
        """
        keys = key.split(".")
        config = self.config
        
        for k in keys[:-1]:
            if k not in config:
                config[k] = {}
            config = config[k]
        
        config[keys[-1]] = value
    
    def get_all(self) -> Dict[str, Any]:
        """获取所有配置"""
        return self.config.copy()
    
    def save(self, path: Optional[Path] = None):
        """
        保存配置到文件
        
        Args:
            path: 保存路径
        """
        if path is None:
            path = self.config_dir / f"{self.env}_runtime.yaml"
        
        with open(path, "w", encoding="utf-8") as f:
            yaml.dump(self.config, f, default_flow_style=False, allow_unicode=True)
        
        logger.info(f"配置已保存到: {path}")


# 全局配置实例
_config_instance: Optional[ConfigLoader] = None


def get_config(env: str = None) -> ConfigLoader:
    """
    获取全局配置实例
    
    Args:
        env: 环境名称
        
    Returns:
        配置加载器实例
    """
    global _config_instance
    
    if _config_instance is None:
        _config_instance = ConfigLoader(env=env)
    
    return _config_instance


def reload_config(env: str = None):
    """
    重新加载配置
    
    Args:
        env: 环境名称
    """
    global _config_instance
    _config_instance = ConfigLoader(env=env)

33. setup.py - 安装配置

# -*- coding: utf-8 -*-
"""
LLM Toolkit 安装配置
"""

from setuptools import setup, find_packages
from pathlib import Path

# 读取README
readme_file = Path(__file__).parent / "README.md"
long_description = readme_file.read_text(encoding="utf-8") if readme_file.exists() else ""

# 读取requirements
requirements_file = Path(__file__).parent / "requirements.txt"
requirements = []
if requirements_file.exists():
    requirements = [
        line.strip()
        for line in requirements_file.read_text(encoding="utf-8").splitlines()
        if line.strip() and not line.startswith("#")
    ]

setup(
    name="llm-toolkit",
    version="1.0.0",
    author="DREAMVFIA",
    author_email="contact@dreamvfia.com",
    description="大语言模型工具包 - 统一的LLM开发框架",
    long_description=long_description,
    long_description_content_type="text/markdown",
    url="https://github.com/dreamvfia/llm-toolkit",
    packages=find_packages(exclude=["tests", "examples", "docs"]),
    classifiers=[
        "Development Status :: 4 - Beta",
        "Intended Audience :: Developers",
        "Topic :: Software Development :: Libraries :: Python Modules",
        "Topic :: Scientific/Engineering :: Artificial Intelligence",
        "License :: OSI Approved :: MIT License",
        "Programming Language :: Python :: 3",
        "Programming Language :: Python :: 3.8",
        "Programming Language :: Python :: 3.9",
        "Programming Language :: Python :: 3.10",
        "Programming Language :: Python :: 3.11",
        "Programming Language :: Python :: 3.12",
    ],
    python_requires=">=3.8",
    install_requires=requirements,
    extras_require={
        "dev": [
            "pytest>=7.0.0",
            "pytest-cov>=4.0.0",
            "pytest-asyncio>=0.21.0",
            "black>=23.0.0",
            "flake8>=6.0.0",
            "mypy>=1.0.0",
            "isort>=5.12.0",
        ],
        "docs": [
            "sphinx>=6.0.0",
            "sphinx-rtd-theme>=1.2.0",
            "sphinx-autodoc-typehints>=1.22.0",
        ],
        "all": [
            "redis>=4.5.0",
            "pinecone-client>=2.2.0",
            "weaviate-client>=3.20.0",
            "cohere>=4.0.0",
            "together>=0.2.0",
        ],
    },
    entry_points={
        "console_scripts": [
            "llm-toolkit=llm_toolkit.cli:main",
        ],
    },
    include_package_data=True,
    package_data={
        "llm_toolkit": [
            "config/*.yaml",
            "prompts/*.yaml",
        ],
    },
    zip_safe=False,
    keywords=[
        "llm",
        "large language model",
        "openai",
        "gpt",
        "claude",
        "gemini",
        "rag",
        "retrieval augmented generation",
        "ai",
        "machine learning",
        "nlp",
    ],
    project_urls={
        "Bug Reports": "https://github.com/dreamvfia/llm-toolkit/issues",
        "Source": "https://github.com/dreamvfia/llm-toolkit",
        "Documentation": "https://llm-toolkit.readthedocs.io",
    },
)

34. Makefile - 构建脚本

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT MAKEFILE
# ═══════════════════════════════════════════════════════════════════

.PHONY: help install dev-install test lint format clean build docs

# 默认目标
.DEFAULT_GOAL := help

# ───────────────────────────────────────────────────────────────────
# 帮助信息
# ───────────────────────────────────────────────────────────────────
help:
	@echo "LLM Toolkit - 可用命令:"
	@echo ""
	@echo "  make install      - 安装项目依赖"
	@echo "  make dev-install  - 安装开发依赖"
	@echo "  make test         - 运行测试"
	@echo "  make lint         - 代码检查"
	@echo "  make format       - 代码格式化"
	@echo "  make clean        - 清理临时文件"
	@echo "  make build        - 构建项目"
	@echo "  make docs         - 生成文档"
	@echo "  make init         - 初始化项目"
	@echo "  make run-examples - 运行示例"
	@echo ""

# ───────────────────────────────────────────────────────────────────
# 安装
# ───────────────────────────────────────────────────────────────────
install:
	@echo "📦 安装项目依赖..."
	pip install -r requirements.txt
	@echo "✅ 安装完成"

dev-install: install
	@echo "📦 安装开发依赖..."
	pip install -r requirements-dev.txt
	pip install -e .
	@echo "✅ 开发环境安装完成"

# ───────────────────────────────────────────────────────────────────
# 测试
# ───────────────────────────────────────────────────────────────────
test:
	@echo "🧪 运行测试..."
	pytest tests/ -v --cov=llm_toolkit --cov-report=html --cov-report=term
	@echo "✅ 测试完成"

test-fast:
	@echo "🧪 运行快速测试..."
	pytest tests/ -v -m "not slow"
	@echo "✅ 快速测试完成"

# ───────────────────────────────────────────────────────────────────
# 代码质量
# ───────────────────────────────────────────────────────────────────
lint:
	@echo "🔍 代码检查..."
	flake8 llm_toolkit/ tests/ examples/
	mypy llm_toolkit/
	@echo "✅ 代码检查完成"

format:
	@echo "✨ 代码格式化..."
	black llm_toolkit/ tests/ examples/
	isort llm_toolkit/ tests/ examples/
	@echo "✅ 格式化完成"

# ───────────────────────────────────────────────────────────────────
# 清理
# ───────────────────────────────────────────────────────────────────
clean:
	@echo "🧹 清理临时文件..."
	find . -type d -name "__pycache__" -exec rm -rf {} +
	find . -type f -name "*.pyc" -delete
	find . -type f -name "*.pyo" -delete
	find . -type d -name "*.egg-info" -exec rm -rf {} +
	find . -type d -name ".pytest_cache" -exec rm -rf {} +
	find . -type d -name ".mypy_cache" -exec rm -rf {} +
	rm -rf build/ dist/ htmlcov/ .coverage
	@echo "✅ 清理完成"

# ───────────────────────────────────────────────────────────────────
# 构建
# ───────────────────────────────────────────────────────────────────
build: clean
	@echo "🏗️  构建项目..."
	python setup.py sdist bdist_wheel
	@echo "✅ 构建完成"

# ───────────────────────────────────────────────────────────────────
# 文档
# ───────────────────────────────────────────────────────────────────
docs:
	@echo "📚 生成文档..."
	cd docs && make html
	@echo "✅ 文档生成完成"

# ───────────────────────────────────────────────────────────────────
# 初始化
# ───────────────────────────────────────────────────────────────────
init:
	@echo "🚀 初始化项目..."
	mkdir -p data/chroma data/cache logs prompts models temp
	cp .env.example .env
	@echo "✅ 初始化完成"
	@echo "⚠️  请编辑 .env 文件配置您的API密钥"

# ───────────────────────────────────────────────────────────────────
# 运行示例
# ───────────────────────────────────────────────────────────────────
run-examples:
	@echo "🎯 运行示例..."
	python examples/basic_usage.py
	@echo "✅ 示例运行完成"

# ───────────────────────────────────────────────────────────────────
# 发布
# ───────────────────────────────────────────────────────────────────
publish-test: build
	@echo "📤 发布到 TestPyPI..."
	twine upload --repository testpypi dist/*
	@echo "✅ 发布到 TestPyPI 完成"

publish: build
	@echo "📤 发布到 PyPI..."
	twine upload dist/*
	@echo "✅ 发布到 PyPI 完成"

LLM Toolkit - 第九部分:Docker配置和容器化

Docker配置文件


35. Dockerfile - 主Dockerfile

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT DOCKERFILE
# 多阶段构建,优化镜像大小
# ═══════════════════════════════════════════════════════════════════

# ───────────────────────────────────────────────────────────────────
# 阶段1: 构建阶段
# ───────────────────────────────────────────────────────────────────
FROM python:3.11-slim as builder

# 设置工作目录
WORKDIR /build

# 安装构建依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    g++ \
    make \
    git \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
COPY requirements-dev.txt .

# 创建虚拟环境并安装依赖
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# 升级pip
RUN pip install --no-cache-dir --upgrade pip setuptools wheel

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# ───────────────────────────────────────────────────────────────────
# 阶段2: 运行阶段
# ───────────────────────────────────────────────────────────────────
FROM python:3.11-slim

# 设置元数据
LABEL maintainer="DREAMVFIA <contact@dreamvfia.com>"
LABEL description="LLM Toolkit - 大语言模型工具包"
LABEL version="1.0.0"

# 设置环境变量
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1 \
    PATH="/opt/venv/bin:$PATH" \
    ENV=production

# 创建非root用户
RUN groupadd -r llmtoolkit && \
    useradd -r -g llmtoolkit -s /bin/bash llmtoolkit

# 设置工作目录
WORKDIR /app

# 从构建阶段复制虚拟环境
COPY --from=builder /opt/venv /opt/venv

# 复制应用代码
COPY --chown=llmtoolkit:llmtoolkit . .

# 创建必要的目录
RUN mkdir -p \
    /app/data/chroma \
    /app/data/cache \
    /app/logs \
    /app/prompts \
    /app/models \
    /app/temp && \
    chown -R llmtoolkit:llmtoolkit /app

# 安装应用
RUN pip install --no-cache-dir -e .

# 切换到非root用户
USER llmtoolkit

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import llm_toolkit; print('OK')" || exit 1

# 暴露端口(如果需要)
EXPOSE 8000

# 默认命令
CMD ["python", "-m", "llm_toolkit.cli"]

36. Dockerfile.dev - 开发环境Dockerfile

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT DEVELOPMENT DOCKERFILE
# 开发环境专用,包含开发工具
# ═══════════════════════════════════════════════════════════════════

FROM python:3.11-slim

# 设置元数据
LABEL maintainer="DREAMVFIA <contact@dreamvfia.com>"
LABEL description="LLM Toolkit - Development Environment"
LABEL version="1.0.0-dev"

# 设置环境变量
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=1 \
    ENV=development

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    g++ \
    make \
    git \
    curl \
    vim \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt requirements-dev.txt ./

# 升级pip
RUN pip install --upgrade pip setuptools wheel

# 安装依赖
RUN pip install -r requirements.txt && \
    pip install -r requirements-dev.txt

# 复制应用代码
COPY . .

# 创建必要的目录
RUN mkdir -p \
    /app/data/chroma \
    /app/data/cache \
    /app/logs \
    /app/prompts \
    /app/models \
    /app/temp

# 安装应用(开发模式)
RUN pip install -e .

# 暴露端口
EXPOSE 8000 8888

# 默认命令(启动Jupyter)
CMD ["jupyter", "lab", "--ip=0.0.0.0", "--port=8888", "--no-browser", "--allow-root"]

37. docker-compose.yml - Docker Compose配置

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT DOCKER COMPOSE
# 完整的开发和生产环境编排
# ═══════════════════════════════════════════════════════════════════

version: '3.8'

# ───────────────────────────────────────────────────────────────────
# 服务定义
# ───────────────────────────────────────────────────────────────────
services:
  # ─────────────────────────────────────────────────────────────────
  # LLM Toolkit 主应用
  # ─────────────────────────────────────────────────────────────────
  llm-toolkit:
    build:
      context: .
      dockerfile: Dockerfile
    image: llm-toolkit:latest
    container_name: llm-toolkit-app
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - ENV=production
      - REDIS_HOST=redis
      - CHROMA_PERSIST_DIRECTORY=/app/data/chroma
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
      - ./prompts:/app/prompts
      - ./temp:/app/temp
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - chromadb
    networks:
      - llm-toolkit-network
    healthcheck:
      test: ["CMD", "python", "-c", "import llm_toolkit; print('OK')"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  # ─────────────────────────────────────────────────────────────────
  # Redis 缓存
  # ─────────────────────────────────────────────────────────────────
  redis:
    image: redis:7-alpine
    container_name: llm-toolkit-redis
    restart: unless-stopped
    command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD:-}
    volumes:
      - redis-data:/data
    ports:
      - "6379:6379"
    networks:
      - llm-toolkit-network
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  # ─────────────────────────────────────────────────────────────────
  # ChromaDB 向量数据库
  # ─────────────────────────────────────────────────────────────────
  chromadb:
    image: chromadb/chroma:latest
    container_name: llm-toolkit-chromadb
    restart: unless-stopped
    environment:
      - IS_PERSISTENT=TRUE
      - PERSIST_DIRECTORY=/chroma/chroma
      - ANONYMIZED_TELEMETRY=FALSE
    volumes:
      - chroma-data:/chroma/chroma
    ports:
      - "8001:8000"
    networks:
      - llm-toolkit-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/heartbeat"]
      interval: 30s
      timeout: 10s
      retries: 3

  # ─────────────────────────────────────────────────────────────────
  # PostgreSQL 数据库(可选)
  # ─────────────────────────────────────────────────────────────────
  postgres:
    image: postgres:15-alpine
    container_name: llm-toolkit-postgres
    restart: unless-stopped
    environment:
      - POSTGRES_DB=${POSTGRES_DB:-llm_toolkit}
      - POSTGRES_USER=${POSTGRES_USER:-llmtoolkit}
      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-changeme}
    volumes:
      - postgres-data:/var/lib/postgresql/data
    ports:
      - "5432:5432"
    networks:
      - llm-toolkit-network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-llmtoolkit}"]
      interval: 10s
      timeout: 5s
      retries: 5

  # ─────────────────────────────────────────────────────────────────
  # Prometheus 监控(可选)
  # ─────────────────────────────────────────────────────────────────
  prometheus:
    image: prom/prometheus:latest
    container_name: llm-toolkit-prometheus
    restart: unless-stopped
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
    volumes:
      - ./docker/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus-data:/prometheus
    ports:
      - "9090:9090"
    networks:
      - llm-toolkit-network
    profiles:
      - monitoring

  # ─────────────────────────────────────────────────────────────────
  # Grafana 可视化(可选)
  # ─────────────────────────────────────────────────────────────────
  grafana:
    image: grafana/grafana:latest
    container_name: llm-toolkit-grafana
    restart: unless-stopped
    environment:
      - GF_SECURITY_ADMIN_USER=${GRAFANA_USER:-admin}
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
      - GF_INSTALL_PLUGINS=
    volumes:
      - grafana-data:/var/lib/grafana
      - ./docker/grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./docker/grafana/datasources:/etc/grafana/provisioning/datasources
    ports:
      - "3000:3000"
    networks:
      - llm-toolkit-network
    depends_on:
      - prometheus
    profiles:
      - monitoring

  # ─────────────────────────────────────────────────────────────────
  # Jupyter Lab(开发环境)
  # ─────────────────────────────────────────────────────────────────
  jupyter:
    build:
      context: .
      dockerfile: Dockerfile.dev
    image: llm-toolkit:dev
    container_name: llm-toolkit-jupyter
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - ENV=development
      - JUPYTER_ENABLE_LAB=yes
    volumes:
      - .:/app
      - ./notebooks:/app/notebooks
    ports:
      - "8888:8888"
    networks:
      - llm-toolkit-network
    profiles:
      - dev

# ───────────────────────────────────────────────────────────────────
# 网络定义
# ───────────────────────────────────────────────────────────────────
networks:
  llm-toolkit-network:
    driver: bridge
    name: llm-toolkit-network

# ───────────────────────────────────────────────────────────────────
# 卷定义
# ───────────────────────────────────────────────────────────────────
volumes:
  redis-data:
    name: llm-toolkit-redis-data
  chroma-data:
    name: llm-toolkit-chroma-data
  postgres-data:
    name: llm-toolkit-postgres-data
  prometheus-data:
    name: llm-toolkit-prometheus-data
  grafana-data:
    name: llm-toolkit-grafana-data

38. docker-compose.dev.yml - 开发环境Docker Compose

# ═══════════════════════════════════════════════════════════════════
# LLM TOOLKIT DOCKER COMPOSE - DEVELOPMENT
# 开发环境专用配置
# ═══════════════════════════════════════════════════════════════════

version: '3.8'

services:
  # ─────────────────────────────────────────────────────────────────
  # LLM Toolkit 开发环境
  # ─────────────────────────────────────────────────────────────────
  llm-toolkit-dev:
    build:
      context: .
      dockerfile: Dockerfile.dev
    image: llm-toolkit:dev
    container_name: llm-toolkit-dev
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - ENV=development
      - DEBUG_MODE=true
      - REDIS_HOST=redis
    volumes:
      - .:/app
      - /app/__pycache__
      - /app/.pytest_cache
    ports:
      - "8000:8000"
      - "8888:8888"
    depends_on:
      - redis
      - chromadb
    networks:
      - llm-toolkit-network
    command: python -m llm_toolkit.cli

  # ─────────────────────────────────────────────────────────────────
  # Redis
  # ─────────────────────────────────────────────────────────────────
  redis:
    image: redis:7-alpine
    container_name: llm-toolkit-redis-dev
    restart: unless-stopped
    ports:
      - "6379:6379"
    networks:
      - llm-toolkit-network

  # ─────────────────────────────────────────────────────────────────
  # ChromaDB
  # ─────────────────────────────────────────────────────────────────
  chromadb:
    image: chromadb/chroma:latest
    container_name: llm-toolkit-chromadb-dev
    restart: unless-stopped
    environment:
      - IS_PERSISTENT=TRUE
      - ANONYMIZED_TELEMETRY=FALSE
    volumes:
      - ./data/chroma:/chroma/chroma
    ports:
      - "8001:8000"
    networks:
      - llm-toolkit-network

networks:
  llm-toolkit-network:
    driver: bridge

39. .dockerignore - Docker忽略文件

# ═══════════════════════════════════════════════════════════════════
# DOCKER IGNORE FILE
# ═══════════════════════════════════════════════════════════════════

# ───────────────────────────────────────────────────────────────────
# Python
# ───────────────────────────────────────────────────────────────────
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# ───────────────────────────────────────────────────────────────────
# 虚拟环境
# ───────────────────────────────────────────────────────────────────
venv/
env/
ENV/
.venv/

# ───────────────────────────────────────────────────────────────────
# IDE
# ───────────────────────────────────────────────────────────────────
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store

# ───────────────────────────────────────────────────────────────────
# 测试和覆盖率
# ───────────────────────────────────────────────────────────────────
.pytest_cache/
.coverage
htmlcov/
.tox/
.hypothesis/

# ───────────────────────────────────────────────────────────────────
# 日志和数据
# ───────────────────────────────────────────────────────────────────
logs/
*.log
data/
temp/
*.db
*.sqlite

# ───────────────────────────────────────────────────────────────────
# Git
# ───────────────────────────────────────────────────────────────────
.git/
.gitignore
.gitattributes

# ───────────────────────────────────────────────────────────────────
# Docker
# ───────────────────────────────────────────────────────────────────
Dockerfile*
docker-compose*.yml
.dockerignore

# ───────────────────────────────────────────────────────────────────
# 文档
# ───────────────────────────────────────────────────────────────────
docs/
*.md
LICENSE

# ───────────────────────────────────────────────────────────────────
# CI/CD
# ───────────────────────────────────────────────────────────────────
.github/
.gitlab-ci.yml
.travis.yml

# ───────────────────────────────────────────────────────────────────
# 环境变量
# ───────────────────────────────────────────────────────────────────
.env
.env.*
!.env.example

# ───────────────────────────────────────────────────────────────────
# 其他
# ───────────────────────────────────────────────────────────────────
*.bak
*.tmp
node_modules/

40. docker/prometheus/prometheus.yml - Prometheus配置

# ═══════════════════════════════════════════════════════════════════
# PROMETHEUS CONFIGURATION
# ═══════════════════════════════════════════════════════════════════

global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    monitor: 'llm-toolkit-monitor'

# ───────────────────────────────────────────────────────────────────
# 抓取配置
# ───────────────────────────────────────────────────────────────────
scrape_configs:
  # LLM Toolkit应用指标
  - job_name: 'llm-toolkit'
    static_configs:
      - targets: ['llm-toolkit:9090']
        labels:
          service: 'llm-toolkit'
          env: 'production'

  # Redis指标
  - job_name: 'redis'
    static_configs:
      - targets: ['redis:6379']
        labels:
          service: 'redis'

  # ChromaDB指标
  - job_name: 'chromadb'
    static_configs:
      - targets: ['chromadb:8000']
        labels:
          service: 'chromadb'

  # Prometheus自身指标
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

# ───────────────────────────────────────────────────────────────────
# 告警规则
# ───────────────────────────────────────────────────────────────────
rule_files:
  - 'alerts.yml'

41. docker/prometheus/alerts.yml - Prometheus告警规则

# ═══════════════════════════════════════════════════════════════════
# PROMETHEUS ALERT RULES
# ═══════════════════════════════════════════════════════════════════

groups:
  - name: llm_toolkit_alerts
    interval: 30s
    rules:
      # ─────────────────────────────────────────────────────────────
      # 高错误率告警
      # ─────────────────────────────────────────────────────────────
      - alert: HighErrorRate
        expr: rate(llm_toolkit_errors_total[5m]) > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "高错误率检测"
          description: "错误率超过5%,当前值: {{ $value }}"

      # ─────────────────────────────────────────────────────────────
      # 高成本告警
      # ─────────────────────────────────────────────────────────────
      - alert: HighCost
        expr: llm_toolkit_daily_cost > 50
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "每日成本过高"
          description: "每日成本超过$50,当前值: ${{ $value }}"

      # ─────────────────────────────────────────────────────────────
      # 服务不可用告警
      # ─────────────────────────────────────────────────────────────
      - alert: ServiceDown
        expr: up{job="llm-toolkit"} == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "服务不可用"
          description: "LLM Toolkit服务已停止响应"

      # ─────────────────────────────────────────────────────────────
      # 高延迟告警
      # ─────────────────────────────────────────────────────────────
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(llm_toolkit_request_duration_seconds_bucket[5m])) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "请求延迟过高"
          description: "95分位延迟超过10秒,当前值: {{ $value }}秒"

      # ─────────────────────────────────────────────────────────────
      # Redis连接失败告警
      # ─────────────────────────────────────────────────────────────
      - alert: RedisDown
        expr: up{job="redis"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Redis服务不可用"
          description: "Redis连接失败"

42. docker/grafana/datasources/prometheus.yml - Grafana数据源配置

# ═══════════════════════════════════════════════════════════════════
# GRAFANA DATASOURCE CONFIGURATION
# ═══════════════════════════════════════════════════════════════════

apiVersion: 1

datasources:
  - name: Prometheus
    type: prometheus
    access: proxy
    url: http://prometheus:9090
    isDefault: true
    editable: true
    jsonData:
      timeInterval: "15s"
      queryTimeout: "60s"

43. docker/grafana/dashboards/dashboard.yml - Grafana仪表板配置

# ═══════════════════════════════════════════════════════════════════
# GRAFANA DASHBOARD PROVISIONING
# ═══════════════════════════════════════════════════════════════════

apiVersion: 1

providers:
  - name: 'LLM Toolkit Dashboards'
    orgId: 1
    folder: ''
    type: file
    disableDeletion: false
    updateIntervalSeconds: 10
    allowUiUpdates: true
    options:
      path: /etc/grafana/provisioning/dashboards

44. scripts/docker-build.sh - Docker构建脚本

#!/bin/bash
# ═══════════════════════════════════════════════════════════════════
# DOCKER BUILD SCRIPT
# 构建Docker镜像
# ═══════════════════════════════════════════════════════════════════

set -e

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# 配置
IMAGE_NAME="llm-toolkit"
VERSION=${1:-latest}
REGISTRY=${DOCKER_REGISTRY:-}

echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"
echo -e "${GREEN}LLM Toolkit Docker 构建脚本${NC}"
echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"

# 构建生产镜像
echo -e "\n${YELLOW}📦 构建生产镜像...${NC}"
docker build \
    -t ${IMAGE_NAME}:${VERSION} \
    -t ${IMAGE_NAME}:latest \
    -f Dockerfile \
    .

echo -e "${GREEN}✅ 生产镜像构建完成${NC}"

# 构建开发镜像
echo -e "\n${YELLOW}📦 构建开发镜像...${NC}"
docker build \
    -t ${IMAGE_NAME}:${VERSION}-dev \
    -t ${IMAGE_NAME}:dev \
    -f Dockerfile.dev \
    .

echo -e "${GREEN}✅ 开发镜像构建完成${NC}"

# 显示镜像信息
echo -e "\n${YELLOW}📊 镜像信息:${NC}"
docker images | grep ${IMAGE_NAME}

# 推送到仓库(可选)
if [ ! -z "$REGISTRY" ]; then
    echo -e "\n${YELLOW}📤 推送镜像到仓库...${NC}"
    
    docker tag ${IMAGE_NAME}:${VERSION} ${REGISTRY}/${IMAGE_NAME}:${VERSION}
    docker tag ${IMAGE_NAME}:latest ${REGISTRY}/${IMAGE_NAME}:latest
    
    docker push ${REGISTRY}/${IMAGE_NAME}:${VERSION}
    docker push ${REGISTRY}/${IMAGE_NAME}:latest
    
    echo -e "${GREEN}✅ 镜像推送完成${NC}"
fi

echo -e "\n${GREEN}════════════════════════════════════════════════════════${NC}"
echo -e "${GREEN}构建完成!${NC}"
echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"

45. scripts/docker-run.sh - Docker运行脚本

#!/bin/bash
# ═══════════════════════════════════════════════════════════════════
# DOCKER RUN SCRIPT
# 运行Docker容器
# ═══════════════════════════════════════════════════════════════════

set -e

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# 配置
CONTAINER_NAME="llm-toolkit-app"
IMAGE_NAME="llm-toolkit:latest"
ENV=${1:-production}

echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"
echo -e "${GREEN}LLM Toolkit Docker 运行脚本${NC}"
echo -e "${GREEN}环境: ${ENV}${NC}"
echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"

# 停止并删除旧容器
if [ "$(docker ps -aq -f name=${CONTAINER_NAME})" ]; then
    echo -e "\n${YELLOW}🛑 停止旧容器...${NC}"
    docker stop ${CONTAINER_NAME}
    docker rm ${CONTAINER_NAME}
fi

# 创建必要的目录
echo -e "\n${YELLOW}📁 创建目录...${NC}"
mkdir -p data/chroma data/cache logs prompts temp

# 运行容器
echo -e "\n${YELLOW}🚀 启动容器...${NC}"

if [ "$ENV" == "development" ]; then
    # 开发环境
    docker run -d \
        --name ${CONTAINER_NAME} \
        --env-file .env \
        -e ENV=development \
        -v $(pwd)/data:/app/data \
        -v $(pwd)/logs:/app/logs \
        -v $(pwd):/app \
        -p 8000:8000 \
        -p 8888:8888 \
        --network llm-toolkit-network \
        llm-toolkit:dev
else
    # 生产环境
    docker run -d \
        --name ${CONTAINER_NAME} \
        --env-file .env \
        -e ENV=production \
        -v $(pwd)/data:/app/data \
        -v $(pwd)/logs:/app/logs \
        -p 8000:8000 \
        --network llm-toolkit-network \
        --restart unless-stopped \
        ${IMAGE_NAME}
fi

echo -e "${GREEN}✅ 容器启动成功${NC}"

# 显示容器状态
echo -e "\n${YELLOW}📊 容器状态:${NC}"
docker ps -f name=${CONTAINER_NAME}

# 显示日志
echo -e "\n${YELLOW}📝 容器日志:${NC}"
docker logs -f ${CONTAINER_NAME}

46. scripts/docker-compose-up.sh - Docker Compose启动脚本

#!/bin/bash
# ═══════════════════════════════════════════════════════════════════
# DOCKER COMPOSE UP SCRIPT
# 启动完整的Docker Compose环境
# ═══════════════════════════════════════════════════════════════════

set -e

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# 配置
ENV=${1:-production}
PROFILE=${2:-}

echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"
echo -e "${GREEN}LLM Toolkit Docker Compose 启动脚本${NC}"
echo -e "${GREEN}环境: ${ENV}${NC}"
echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"

# 检查.env文件
if [ ! -f .env ]; then
    echo -e "${YELLOW}⚠️  .env文件不存在,从示例创建...${NC}"
    cp .env.example .env
    echo -e "${RED}❗ 请编辑.env文件配置您的API密钥${NC}"
    exit 1
fi

# 创建必要的目录
echo -e "\n${YELLOW}📁 创建目录...${NC}"
mkdir -p data/chroma data/cache logs prompts temp docker/prometheus docker/grafana/dashboards docker/grafana/datasources

# 选择compose文件
COMPOSE_FILE="docker-compose.yml"
if [ "$ENV" == "development" ]; then
    COMPOSE_FILE="docker-compose.dev.yml"
fi

# 构建镜像
echo -e "\n${YELLOW}🏗️  构建镜像...${NC}"
docker-compose -f ${COMPOSE_FILE} build

# 启动服务
echo -e "\n${YELLOW}🚀 启动服务...${NC}"

if [ ! -z "$PROFILE" ]; then
    docker-compose -f ${COMPOSE_FILE} --profile ${PROFILE} up -d
else
    docker-compose -f ${COMPOSE_FILE} up -d
fi

echo -e "${GREEN}✅ 服务启动成功${NC}"

# 等待服务就绪
echo -e "\n${YELLOW}⏳ 等待服务就绪...${NC}"
sleep 5

# 显示服务状态
echo -e "\n${YELLOW}📊 服务状态:${NC}"
docker-compose -f ${COMPOSE_FILE} ps

# 显示访问信息
echo -e "\n${GREEN}════════════════════════════════════════════════════════${NC}"
echo -e "${GREEN}服务访问信息:${NC}"
echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"
echo -e "LLM Toolkit:    http://localhost:8000"
echo -e "ChromaDB:       http://localhost:8001"
echo -e "Redis:          localhost:6379"

if [ "$PROFILE" == "monitoring" ]; then
    echo -e "Prometheus:     http://localhost:9090"
    echo -e "Grafana:        http://localhost:3000 (admin/admin)"
fi

if [ "$ENV" == "development" ] || [ "$PROFILE" == "dev" ]; then
    echo -e "Jupyter Lab:    http://localhost:8888"
fi

echo -e "${GREEN}════════════════════════════════════════════════════════${NC}"

# 显示日志
echo -e "\n${YELLOW}📝 查看日志:${NC}"
echo -e "docker-compose -f ${COMPOSE_FILE} logs -f"

LLM Toolkit - 第十部分:CI/CD配置文件

持续集成和持续部署配置


47. .github/workflows/ci.yml - GitHub Actions CI配置

# ═══════════════════════════════════════════════════════════════════
# GITHUB ACTIONS CI WORKFLOW
# 持续集成工作流
# ═══════════════════════════════════════════════════════════════════

name: CI

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main, develop ]
  schedule:
    # 每天UTC时间0点运行
    - cron: '0 0 * * *'

env:
  PYTHON_VERSION: '3.11'
  POETRY_VERSION: '1.7.0'

jobs:
  # ─────────────────────────────────────────────────────────────────
  # 代码质量检查
  # ─────────────────────────────────────────────────────────────────
  lint:
    name: 代码质量检查
    runs-on: ubuntu-latest
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
          cache: 'pip'
      
      - name: 📦 安装依赖
        run: |
          python -m pip install --upgrade pip
          pip install flake8 black isort mypy
          pip install -r requirements.txt
      
      - name: 🔍 Flake8检查
        run: |
          flake8 llm_toolkit/ tests/ examples/ \
            --count \
            --select=E9,F63,F7,F82 \
            --show-source \
            --statistics
      
      - name: ✨ Black格式检查
        run: |
          black --check llm_toolkit/ tests/ examples/
      
      - name: 📐 isort导入排序检查
        run: |
          isort --check-only llm_toolkit/ tests/ examples/
      
      - name: 🔬 MyPy类型检查
        run: |
          mypy llm_toolkit/ --ignore-missing-imports
        continue-on-error: true

  # ─────────────────────────────────────────────────────────────────
  # 单元测试
  # ─────────────────────────────────────────────────────────────────
  test:
    name: 单元测试
    runs-on: ${{ matrix.os }}
    strategy:
      fail-fast: false
      matrix:
        os: [ubuntu-latest, windows-latest, macos-latest]
        python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}
          cache: 'pip'
      
      - name: 📦 安装依赖
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
          pip install -e .
      
      - name: 🧪 运行测试
        run: |
          pytest tests/ \
            -v \
            --cov=llm_toolkit \
            --cov-report=xml \
            --cov-report=html \
            --cov-report=term
      
      - name: 📊 上传覆盖率报告
        uses: codecov/codecov-action@v3
        if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.11'
        with:
          file: ./coverage.xml
          flags: unittests
          name: codecov-umbrella
          fail_ci_if_error: false

  # ─────────────────────────────────────────────────────────────────
  # 安全扫描
  # ─────────────────────────────────────────────────────────────────
  security:
    name: 安全扫描
    runs-on: ubuntu-latest
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
      
      - name: 📦 安装依赖
        run: |
          python -m pip install --upgrade pip
          pip install safety bandit
          pip install -r requirements.txt
      
      - name: 🔒 Safety依赖安全检查
        run: |
          safety check --json
        continue-on-error: true
      
      - name: 🛡️ Bandit代码安全检查
        run: |
          bandit -r llm_toolkit/ -f json -o bandit-report.json
        continue-on-error: true
      
      - name: 📤 上传安全报告
        uses: actions/upload-artifact@v3
        if: always()
        with:
          name: security-reports
          path: |
            bandit-report.json

  # ─────────────────────────────────────────────────────────────────
  # Docker构建测试
  # ─────────────────────────────────────────────────────────────────
  docker:
    name: Docker构建测试
    runs-on: ubuntu-latest
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐳 设置Docker Buildx
        uses: docker/setup-buildx-action@v3
      
      - name: 🏗️ 构建Docker镜像
        uses: docker/build-push-action@v5
        with:
          context: .
          file: ./Dockerfile
          push: false
          tags: llm-toolkit:test
          cache-from: type=gha
          cache-to: type=gha,mode=max
      
      - name: 🧪 测试Docker镜像
        run: |
          docker run --rm llm-toolkit:test python -c "import llm_toolkit; print('OK')"

  # ─────────────────────────────────────────────────────────────────
  # 文档构建
  # ─────────────────────────────────────────────────────────────────
  docs:
    name: 文档构建
    runs-on: ubuntu-latest
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
      
      - name: 📦 安装依赖
        run: |
          python -m pip install --upgrade pip
          pip install sphinx sphinx-rtd-theme sphinx-autodoc-typehints
          pip install -r requirements.txt
      
      - name: 📚 构建文档
        run: |
          cd docs
          make html
      
      - name: 📤 上传文档
        uses: actions/upload-artifact@v3
        with:
          name: documentation
          path: docs/_build/html/

  # ─────────────────────────────────────────────────────────────────
  # 集成测试
  # ─────────────────────────────────────────────────────────────────
  integration:
    name: 集成测试
    runs-on: ubuntu-latest
    
    services:
      redis:
        image: redis:7-alpine
        ports:
          - 6379:6379
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
      
      chromadb:
        image: chromadb/chroma:latest
        ports:
          - 8000:8000
        env:
          IS_PERSISTENT: TRUE
          ANONYMIZED_TELEMETRY: FALSE
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
      
      - name: 📦 安装依赖
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
          pip install -e .
      
      - name: 🧪 运行集成测试
        env:
          REDIS_HOST: localhost
          REDIS_PORT: 6379
          CHROMA_HOST: localhost
          CHROMA_PORT: 8000
        run: |
          pytest tests/integration/ -v -m integration
        continue-on-error: true

  # ─────────────────────────────────────────────────────────────────
  # 性能测试
  # ─────────────────────────────────────────────────────────────────
  performance:
    name: 性能测试
    runs-on: ubuntu-latest
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
      
      - name: 📦 安装依赖
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install pytest-benchmark
          pip install -e .
      
      - name: ⚡ 运行性能测试
        run: |
          pytest tests/performance/ -v --benchmark-only
        continue-on-error: true

48. .github/workflows/cd.yml - GitHub Actions CD配置

# ═══════════════════════════════════════════════════════════════════
# GITHUB ACTIONS CD WORKFLOW
# 持续部署工作流
# ═══════════════════════════════════════════════════════════════════

name: CD

on:
  push:
    tags:
      - 'v*.*.*'
  release:
    types: [published]

env:
  PYTHON_VERSION: '3.11'
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  # ─────────────────────────────────────────────────────────────────
  # 构建并发布Python包
  # ─────────────────────────────────────────────────────────────────
  build-and-publish:
    name: 构建并发布Python包
    runs-on: ubuntu-latest
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
      
      - name: 📦 安装构建工具
        run: |
          python -m pip install --upgrade pip
          pip install build twine
      
      - name: 🏗️ 构建包
        run: |
          python -m build
      
      - name: 🔍 检查包
        run: |
          twine check dist/*
      
      - name: 📤 发布到TestPyPI
        if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
        env:
          TWINE_USERNAME: __token__
          TWINE_PASSWORD: ${{ secrets.TEST_PYPI_API_TOKEN }}
        run: |
          twine upload --repository testpypi dist/*
      
      - name: 📤 发布到PyPI
        if: github.event_name == 'release'
        env:
          TWINE_USERNAME: __token__
          TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN }}
        run: |
          twine upload dist/*

  # ─────────────────────────────────────────────────────────────────
  # 构建并推送Docker镜像
  # ─────────────────────────────────────────────────────────────────
  build-and-push-docker:
    name: 构建并推送Docker镜像
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐳 设置Docker Buildx
        uses: docker/setup-buildx-action@v3
      
      - name: 🔐 登录到GitHub Container Registry
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      
      - name: 📝 提取元数据
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
          tags: |
            type=ref,event=branch
            type=ref,event=pr
            type=semver,pattern={{version}}
            type=semver,pattern={{major}}.{{minor}}
            type=semver,pattern={{major}}
            type=sha
      
      - name: 🏗️ 构建并推送Docker镜像
        uses: docker/build-push-action@v5
        with:
          context: .
          file: ./Dockerfile
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max
          platforms: linux/amd64,linux/arm64

  # ─────────────────────────────────────────────────────────────────
  # 部署文档
  # ─────────────────────────────────────────────────────────────────
  deploy-docs:
    name: 部署文档
    runs-on: ubuntu-latest
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 🐍 设置Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
      
      - name: 📦 安装依赖
        run: |
          python -m pip install --upgrade pip
          pip install sphinx sphinx-rtd-theme sphinx-autodoc-typehints
          pip install -r requirements.txt
      
      - name: 📚 构建文档
        run: |
          cd docs
          make html
      
      - name: 🚀 部署到GitHub Pages
        uses: peaceiris/actions-gh-pages@v3
        with:
          github_token: ${{ secrets.GITHUB_TOKEN }}
          publish_dir: ./docs/_build/html

  # ─────────────────────────────────────────────────────────────────
  # 创建GitHub Release
  # ─────────────────────────────────────────────────────────────────
  create-release:
    name: 创建GitHub Release
    runs-on: ubuntu-latest
    if: startsWith(github.ref, 'refs/tags/')
    needs: [build-and-publish, build-and-push-docker]
    
    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4
      
      - name: 📝 生成变更日志
        id: changelog
        run: |
          echo "## What's Changed" > CHANGELOG.md
          git log --pretty=format:"- %s (%h)" $(git describe --tags --abbrev=0 HEAD^)..HEAD >> CHANGELOG.md
      
      - name: 🎉 创建Release
        uses: softprops/action-gh-release@v1
        with:
          body_path: CHANGELOG.md
          draft: false
          prerelease: false
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

49. .github/workflows/docker-publish.yml - Docker发布工作流

# ═══════════════════════════════════════════════════════════════════
# DOCKER PUBLISH WORKFLOW
# Docker镜像发布工作流
# ═══════════════════════════════════════════════════════════════════

name: Docker Publish

on:
  push:
    branches: [ main ]
    tags: [ 'v*.*.*' ]
  pull_request:
    branches: [ main ]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  build:
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write
      id-token: write

    steps:
      - name: 📥 检出代码
        uses: actions/checkout@v4

      - name: 🐳 设置Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: 🔐 登录到Container Registry
        if: github.event_name != 'pull_request'
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: 📝 提取Docker元数据
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
          tags: |
            type=ref,event=branch
            type=ref,event=pr
            type=semver,pattern={{version}}
            type=semver,pattern={{major}}.{{minor}}
            type=semver,pattern={{major}}
            type=sha,prefix={{branch}}-
            type=raw,value=latest,enable={{is_default_branch}}

      - name: 🏗️ 构建并推送生产镜像
        id: build-and-push
        uses: docker/build-push-action@v5
        with:
          context: .
          file: ./Dockerfile
          push: ${{ github.event_name != 'pull_request' }}
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max
          platforms: linux/amd64,linux/arm64

      - name: 🏗️ 构建并推送开发镜像
        uses: docker/build-push-action@v5
        with:
          context: .
          file: ./Dockerfile.dev
          push: ${{ github.event_name != 'pull_request' }}
          tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:dev
          cache-from: type=gha
          cache-to: type=gha,mode=max

      - name: 📊 生成镜像报告
        run: |
          echo "## Docker镜像构建报告" >> $GITHUB_STEP_SUMMARY
          echo "" >> $GITHUB_STEP_SUMMARY
          echo "### 镜像标签" >> $GITHUB_STEP_SUMMARY
          echo "\`\`\`" >> $GITHUB_STEP_SUMMARY
          echo "${{ steps.meta.outputs.tags }}" >> $GITHUB_STEP_SUMMARY
          echo "\`\`\`" >> $GITHUB_STEP_SUMMARY

50. .gitlab-ci.yml - GitLab CI/CD配置

# ═══════════════════════════════════════════════════════════════════
# GITLAB CI/CD CONFIGURATION
# GitLab持续集成和部署配置
# ═══════════════════════════════════════════════════════════════════

stages:
  - lint
  - test
  - security
  - build
  - deploy

variables:
  PYTHON_VERSION: "3.11"
  PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
  DOCKER_DRIVER: overlay2
  DOCKER_TLS_CERTDIR: "/certs"

# ───────────────────────────────────────────────────────────────────
# 缓存配置
# ───────────────────────────────────────────────────────────────────
cache:
  paths:
    - .cache/pip
    - venv/

# ───────────────────────────────────────────────────────────────────
# 代码质量检查
# ───────────────────────────────────────────────────────────────────
lint:flake8:
  stage: lint
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install flake8
  script:
    - flake8 llm_toolkit/ tests/ examples/
  allow_failure: true

lint:black:
  stage: lint
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install black
  script:
    - black --check llm_toolkit/ tests/ examples/

lint:isort:
  stage: lint
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install isort
  script:
    - isort --check-only llm_toolkit/ tests/ examples/

lint:mypy:
  stage: lint
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install mypy
    - pip install -r requirements.txt
  script:
    - mypy llm_toolkit/ --ignore-missing-imports
  allow_failure: true

# ───────────────────────────────────────────────────────────────────
# 单元测试
# ───────────────────────────────────────────────────────────────────
.test_template: &test_template
  stage: test
  image: python:${PYTHON_VERSION}
  before_script:
    - python -m venv venv
    - source venv/bin/activate
    - pip install --upgrade pip
    - pip install -r requirements.txt
    - pip install -r requirements-dev.txt
    - pip install -e .
  script:
    - pytest tests/ -v --cov=llm_toolkit --cov-report=xml --cov-report=html --cov-report=term
  coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
  artifacts:
    reports:
      coverage_report:
        coverage_format: cobertura
        path: coverage.xml
    paths:
      - htmlcov/
    expire_in: 1 week

test:python3.8:
  <<: *test_template
  image: python:3.8

test:python3.9:
  <<: *test_template
  image: python:3.9

test:python3.10:
  <<: *test_template
  image: python:3.10

test:python3.11:
  <<: *test_template
  image: python:3.11

test:python3.12:
  <<: *test_template
  image: python:3.12

# ───────────────────────────────────────────────────────────────────
# 安全扫描
# ───────────────────────────────────────────────────────────────────
security:safety:
  stage: security
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install safety
    - pip install -r requirements.txt
  script:
    - safety check --json
  allow_failure: true

security:bandit:
  stage: security
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install bandit
  script:
    - bandit -r llm_toolkit/ -f json -o bandit-report.json
  artifacts:
    paths:
      - bandit-report.json
    expire_in: 1 week
  allow_failure: true

# ───────────────────────────────────────────────────────────────────
# Docker构建
# ───────────────────────────────────────────────────────────────────
build:docker:
  stage: build
  image: docker:latest
  services:
    - docker:dind
  before_script:
    - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
  script:
    - docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG .
    - docker build -t $CI_REGISTRY_IMAGE:dev -f Dockerfile.dev .
    - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG
    - docker push $CI_REGISTRY_IMAGE:dev
  only:
    - main
    - develop
    - tags

# ───────────────────────────────────────────────────────────────────
# Python包构建
# ───────────────────────────────────────────────────────────────────
build:package:
  stage: build
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install build twine
  script:
    - python -m build
    - twine check dist/*
  artifacts:
    paths:
      - dist/
    expire_in: 1 week
  only:
    - tags

# ───────────────────────────────────────────────────────────────────
# 部署到PyPI
# ───────────────────────────────────────────────────────────────────
deploy:pypi:
  stage: deploy
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install twine
  script:
    - twine upload dist/* -u __token__ -p $PYPI_TOKEN
  dependencies:
    - build:package
  only:
    - tags
  when: manual

# ───────────────────────────────────────────────────────────────────
# 部署文档
# ───────────────────────────────────────────────────────────────────
deploy:docs:
  stage: deploy
  image: python:${PYTHON_VERSION}
  before_script:
    - pip install sphinx sphinx-rtd-theme sphinx-autodoc-typehints
    - pip install -r requirements.txt
  script:
    - cd docs
    - make html
  artifacts:
    paths:
      - docs/_build/html/
    expire_in: 1 week
  only:
    - main
    - tags

51. .travis.yml - Travis CI配置

# ═══════════════════════════════════════════════════════════════════
# TRAVIS CI CONFIGURATION
# Travis CI持续集成配置
# ═══════════════════════════════════════════════════════════════════

language: python

python:
  - "3.8"
  - "3.9"
  - "3.10"
  - "3.11"
  - "3.12"

os:
  - linux
  - osx
  - windows

# ───────────────────────────────────────────────────────────────────
# 缓存配置
# ───────────────────────────────────────────────────────────────────
cache:
  pip: true
  directories:
    - $HOME/.cache/pip

# ───────────────────────────────────────────────────────────────────
# 服务
# ───────────────────────────────────────────────────────────────────
services:
  - docker
  - redis-server

# ───────────────────────────────────────────────────────────────────
# 安装依赖
# ───────────────────────────────────────────────────────────────────
install:
  - pip install --upgrade pip setuptools wheel
  - pip install -r requirements.txt
  - pip install -r requirements-dev.txt
  - pip install -e .

# ───────────────────────────────────────────────────────────────────
# 运行脚本
# ───────────────────────────────────────────────────────────────────
script:
  # 代码质量检查
  - flake8 llm_toolkit/ tests/ examples/
  - black --check llm_toolkit/ tests/ examples/
  - isort --check-only llm_toolkit/ tests/ examples/
  
  # 运行测试
  - pytest tests/ -v --cov=llm_toolkit --cov-report=xml --cov-report=term
  
  # Docker构建测试
  - docker build -t llm-toolkit:test .

# ───────────────────────────────────────────────────────────────────
# 测试后操作
# ───────────────────────────────────────────────────────────────────
after_success:
  - codecov

# ───────────────────────────────────────────────────────────────────
# 部署配置
# ───────────────────────────────────────────────────────────────────
deploy:
  - provider: pypi
    username: __token__
    password:
      secure: $PYPI_TOKEN
    distributions: "sdist bdist_wheel"
    skip_existing: true
    on:
      tags: true
      python: "3.11"
  
  - provider: releases
    api_key:
      secure: $GITHUB_TOKEN
    file_glob: true
    file: dist/*
    skip_cleanup: true
    on:
      tags: true
      python: "3.11"

# ───────────────────────────────────────────────────────────────────
# 通知配置
# ───────────────────────────────────────────────────────────────────
notifications:
  email:
    on_success: change
    on_failure: always

52. Jenkinsfile - Jenkins Pipeline配置

// ═══════════════════════════════════════════════════════════════════
// JENKINS PIPELINE CONFIGURATION
// Jenkins持续集成和部署流水线
// ═══════════════════════════════════════════════════════════════════

pipeline {
    agent any
    
    environment {
        PYTHON_VERSION = '3.11'
        DOCKER_REGISTRY = 'ghcr.io'
        IMAGE_NAME = 'dreamvfia/llm-toolkit'
    }
    
    options {
        buildDiscarder(logRotator(numToKeepStr: '10'))
        timeout(time: 1, unit: 'HOURS')
        timestamps()
    }
    
    stages {
        // ───────────────────────────────────────────────────────────
        // 检出代码
        // ───────────────────────────────────────────────────────────
        stage('Checkout') {
            steps {
                echo '📥 检出代码...'
                checkout scm
            }
        }
        
        // ───────────────────────────────────────────────────────────
        // 设置环境
        // ───────────────────────────────────────────────────────────
        stage('Setup') {
            steps {
                echo '🔧 设置环境...'
                sh '''
                    python${PYTHON_VERSION} -m venv venv
                    . venv/bin/activate
                    pip install --upgrade pip setuptools wheel
                    pip install -r requirements.txt
                    pip install -r requirements-dev.txt
                    pip install -e .
                '''
            }
        }
        
        // ───────────────────────────────────────────────────────────
        // 代码质量检查
        // ───────────────────────────────────────────────────────────
        stage('Lint') {
            parallel {
                stage('Flake8') {
                    steps {
                        echo '🔍 Flake8检查...'
                        sh '''
                            . venv/bin/activate
                            flake8 llm_toolkit/ tests/ examples/
                        '''
                    }
                }
                
                stage('Black') {
                    steps {
                        echo '✨ Black格式检查...'
                        sh '''
                            . venv/bin/activate
                            black --check llm_toolkit/ tests/ examples/
                        '''
                    }
                }
                
                stage('isort') {
                    steps {
                        echo '📐 isort检查...'
                        sh '''
                            . venv/bin/activate
                            isort --check-only llm_toolkit/ tests/ examples/
                        '''
                    }
                }
                
                stage('MyPy') {
                    steps {
                        echo '🔬 MyPy类型检查...'
                        sh '''
                            . venv/bin/activate
                            mypy llm_toolkit/ --ignore-missing-imports || true
                        '''
                    }
                }
            }
        }
        
        // ───────────────────────────────────────────────────────────
        // 单元测试
        // ───────────────────────────────────────────────────────────
        stage('Test') {
            steps {
                echo '🧪 运行测试...'
                sh '''
                    . venv/bin/activate
                    pytest tests/ \
                        -v \
                        --cov=llm_toolkit \
                        --cov-report=xml \
                        --cov-report=html \
                        --cov-report=term \
                        --junitxml=test-results.xml
                '''
            }
            post {
                always {
                    junit 'test-results.xml'
                    publishHTML([
                        allowMissing: false,
                        alwaysLinkToLastBuild: true,
                        keepAll: true,
                        reportDir: 'htmlcov',
                        reportFiles: 'index.html',
                        reportName: 'Coverage Report'
                    ])
                }
            }
        }
        
        // ───────────────────────────────────────────────────────────
        // 安全扫描
        // ───────────────────────────────────────────────────────────
        stage('Security') {
            parallel {
                stage('Safety') {
                    steps {
                        echo '🔒 Safety依赖检查...'
                        sh '''
                            . venv/bin/activate
                            safety check --json || true
                        '''
                    }
                }
                
                stage('Bandit') {
                    steps {
                        echo '🛡️ Bandit代码检查...'
                        sh '''
                            . venv/bin/activate
                            bandit -r llm_toolkit/ -f json -o bandit-report.json || true
                        '''
                    }
                }
            }
        }
        
        // ───────────────────────────────────────────────────────────
        // Docker构建
        // ───────────────────────────────────────────────────────────
        stage('Docker Build') {
            steps {
                echo '🐳 构建Docker镜像...'
                script {
                    docker.build("${IMAGE_NAME}:${BUILD_NUMBER}")
                    docker.build("${IMAGE_NAME}:dev", "-f Dockerfile.dev .")
                }
            }
        }
        
        // ───────────────────────────────────────────────────────────
        // 构建Python包
        // ───────────────────────────────────────────────────────────
        stage('Build Package') {
            when {
                tag pattern: "v\\d+\\.\\d+\\.\\d+", comparator: "REGEXP"
            }
            steps {
                echo '📦 构建Python包...'
                sh '''
                    . venv/bin/activate
                    python -m build
                    twine check dist/*
                '''
            }
        }
        
        // ───────────────────────────────────────────────────────────
        // 部署
        // ───────────────────────────────────────────────────────────
        stage('Deploy') {
            when {
                tag pattern: "v\\d+\\.\\d+\\.\\d+", comparator: "REGEXP"
            }
            parallel {
                stage('Deploy to PyPI') {
                    steps {
                        echo '📤 部署到PyPI...'
                        withCredentials([string(credentialsId: 'pypi-token', variable: 'PYPI_TOKEN')]) {
                            sh '''
                                . venv/bin/activate
                                twine upload dist/* -u __token__ -p ${PYPI_TOKEN}
                            '''
                        }
                    }
                }
                
                stage('Push Docker Image') {
                    steps {
                        echo '🐳 推送Docker镜像...'
                        script {
                            docker.withRegistry("https://${DOCKER_REGISTRY}", 'docker-credentials') {
                                docker.image("${IMAGE_NAME}:${BUILD_NUMBER}").push()
                                docker.image("${IMAGE_NAME}:${BUILD_NUMBER}").push('latest')
                            }
                        }
                    }
                }
            }
        }
    }
    
    // ───────────────────────────────────────────────────────────────
    // 后处理
    // ───────────────────────────────────────────────────────────────
    post {
        always {
            echo '🧹 清理工作空间...'
            cleanWs()
        }
        
        success {
            echo '✅ 流水线执行成功!'
            emailext(
                subject: "✅ Jenkins构建成功: ${env.JOB_NAME} #${env.BUILD_NUMBER}",
                body: "构建成功!\n\n查看详情: ${env.BUILD_URL}",
                to: "${env.CHANGE_AUTHOR_EMAIL}"
            )
        }
        
        failure {
            echo '❌ 流水线执行失败!'
            emailext(
                subject: "❌ Jenkins构建失败: ${env.JOB_NAME} #${env.BUILD_NUMBER}",
                body: "构建失败!\n\n查看详情: ${env.BUILD_URL}",
                to: "${env.CHANGE_AUTHOR_EMAIL}"
            )
        }
    }
}

53. azure-pipelines.yml - Azure Pipelines配置

# ═══════════════════════════════════════════════════════════════════
# AZURE PIPELINES CONFIGURATION
# Azure DevOps持续集成和部署配置
# ═══════════════════════════════════════════════════════════════════

trigger:
  branches:
    include:
      - main
      - develop
  tags:
    include:
      - v*

pr:
  branches:
    include:
      - main
      - develop

variables:
  pythonVersion: '3.11'
  vmImageName: 'ubuntu-latest'

stages:
  # ─────────────────────────────────────────────────────────────────
  # 构建阶段
  # ─────────────────────────────────────────────────────────────────
  - stage: Build
    displayName: '构建和测试'
    jobs:
      - job: Test
        displayName: '运行测试'
        pool:
          vmImage: $(vmImageName)
        strategy:
          matrix:
            Python38:
              python.version: '3.8'
            Python39:
              python.version: '3.9'
            Python310:
              python.version: '3.10'
            Python311:
              python.version: '3.11'
            Python312:
              python.version: '3.12'
        
        steps:
          - task: UsePythonVersion@0
            inputs:
              versionSpec: '$(python.version)'
            displayName: '使用Python $(python.version)'
          
          - script: |
              python -m pip install --upgrade pip
              pip install -r requirements.txt
              pip install -r requirements-dev.txt
              pip install -e .
            displayName: '安装依赖'
          
          - script: |
              flake8 llm_toolkit/ tests/ examples/
              black --check llm_toolkit/ tests/ examples/
              isort --check-only llm_toolkit/ tests/ examples/
            displayName: '代码质量检查'
          
          - script: |
              pytest tests/ \
                -v \
                --cov=llm_toolkit \
                --cov-report=xml \
                --cov-report=html \
                --junitxml=junit/test-results.xml
            displayName: '运行测试'
          
          - task: PublishTestResults@2
            condition: succeededOrFailed()
            inputs:
              testResultsFiles: '**/test-*.xml'
              testRunTitle: 'Python $(python.version)'
          
          - task: PublishCodeCoverageResults@1
            inputs:
              codeCoverageTool: Cobertura
              summaryFileLocation: '$(System.DefaultWorkingDirectory)/**/coverage.xml'

  # ─────────────────────────────────────────────────────────────────
  # Docker构建阶段
  # ─────────────────────────────────────────────────────────────────
  - stage: Docker
    displayName: 'Docker构建'
    dependsOn: Build
    condition: succeeded()
    jobs:
      - job: BuildDocker
        displayName: '构建Docker镜像'
        pool:
          vmImage: $(vmImageName)
        
        steps:
          - task: Docker@2
            displayName: '构建Docker镜像'
            inputs:
              command: build
              dockerfile: '$(Build.SourcesDirectory)/Dockerfile'
              tags: |
                $(Build.BuildId)
                latest

  # ─────────────────────────────────────────────────────────────────
  # 部署阶段
  # ─────────────────────────────────────────────────────────────────
  - stage: Deploy
    displayName: '部署'
    dependsOn: Docker
    condition: and(succeeded(), startsWith(variables['Build.SourceBranch'], 'refs/tags/'))
    jobs:
      - deployment: DeployPyPI
        displayName: '部署到PyPI'
        pool:
          vmImage: $(vmImageName)
        environment: 'production'
        strategy:
          runOnce:
            deploy:
              steps:
                - task: UsePythonVersion@0
                  inputs:
                    versionSpec: '$(pythonVersion)'
                
                - script: |
                    pip install build twine
                    python -m build
                    twine upload dist/* -u __token__ -p $(PYPI_TOKEN)
                  displayName: '发布到PyPI'

LLM Toolkit - 第十一部分:测试文件

完整的测试套件


54. tests/conftest.py - Pytest配置和Fixtures

# -*- coding: utf-8 -*-
"""
Pytest配置和通用Fixtures
"""

import os
import pytest
import tempfile
from pathlib import Path
from unittest.mock import Mock, MagicMock

# 设置测试环境变量
os.environ["ENV"] = "testing"
os.environ["OPENAI_API_KEY"] = "test-key"
os.environ["CACHE_ENABLED"] = "false"


@pytest.fixture(scope="session")
def test_data_dir():
    """测试数据目录"""
    return Path(__file__).parent / "test_data"


@pytest.fixture(scope="session")
def temp_dir():
    """临时目录"""
    with tempfile.TemporaryDirectory() as tmpdir:
        yield Path(tmpdir)


@pytest.fixture
def mock_openai_response():
    """模拟OpenAI响应"""
    return {
        "id": "chatcmpl-test123",
        "object": "chat.completion",
        "created": 1234567890,
        "model": "gpt-3.5-turbo",
        "choices": [
            {
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": "这是一个测试响应"
                },
                "finish_reason": "stop"
            }
        ],
        "usage": {
            "prompt_tokens": 10,
            "completion_tokens": 20,
            "total_tokens": 30
        }
    }


@pytest.fixture
def mock_openai_stream_response():
    """模拟OpenAI流式响应"""
    chunks = [
        {"choices": [{"delta": {"content": "这"}}]},
        {"choices": [{"delta": {"content": "是"}}]},
        {"choices": [{"delta": {"content": "测试"}}]},
        {"choices": [{"delta": {}}]}
    ]
    return iter(chunks)


@pytest.fixture
def mock_llm():
    """模拟LLM"""
    llm = Mock()
    llm.chat.return_value = "测试响应"
    llm.stream.return_value = iter(["测", "试", "响", "应"])
    llm.count_tokens.return_value = 10
    return llm


@pytest.fixture
def sample_texts():
    """示例文本"""
    return [
        "Python是一种高级编程语言。",
        "它具有简洁的语法和强大的功能。",
        "Python广泛应用于Web开发、数据分析等领域。"
    ]


@pytest.fixture
def sample_documents(temp_dir):
    """示例文档"""
    doc_dir = temp_dir / "docs"
    doc_dir.mkdir()
    
    # 创建测试文档
    (doc_dir / "doc1.txt").write_text("这是第一个文档。", encoding="utf-8")
    (doc_dir / "doc2.txt").write_text("这是第二个文档。", encoding="utf-8")
    
    return doc_dir


@pytest.fixture
def mock_vector_store():
    """模拟向量数据库"""
    store = Mock()
    store.add_texts.return_value = ["id1", "id2", "id3"]
    store.similarity_search.return_value = [
        Mock(page_content="相关文本1", metadata={"source": "doc1"}),
        Mock(page_content="相关文本2", metadata={"source": "doc2"})
    ]
    return store


@pytest.fixture
def mock_embedding():
    """模拟嵌入模型"""
    embedding = Mock()
    embedding.embed_query.return_value = [0.1] * 1536
    embedding.embed_documents.return_value = [[0.1] * 1536, [0.2] * 1536]
    return embedding


@pytest.fixture
def mock_redis():
    """模拟Redis"""
    redis_mock = MagicMock()
    redis_mock.get.return_value = None
    redis_mock.set.return_value = True
    redis_mock.exists.return_value = False
    return redis_mock


@pytest.fixture(autouse=True)
def cleanup_test_files(temp_dir):
    """自动清理测试文件"""
    yield
    # 测试后清理
    import shutil
    if temp_dir.exists():
        shutil.rmtree(temp_dir, ignore_errors=True)


# 标记定义
def pytest_configure(config):
    """配置pytest标记"""
    config.addinivalue_line(
        "markers", "slow: 标记慢速测试"
    )
    config.addinivalue_line(
        "markers", "integration: 标记集成测试"
    )
    config.addinivalue_line(
        "markers", "unit: 标记单元测试"
    )
    config.addinivalue_line(
        "markers", "requires_api_key: 需要真实API密钥的测试"
    )

55. tests/test_llm_manager.py - LLM管理器测试

# -*- coding: utf-8 -*-
"""
LLM管理器测试
"""

import pytest
from unittest.mock import Mock, patch, MagicMock
from llm_toolkit import LLMManager
from llm_toolkit.exceptions import LLMError, RateLimitError


class TestLLMManager:
    """LLM管理器测试类"""
    
    def test_init_openai(self):
        """测试初始化OpenAI"""
        with patch('llm_toolkit.providers.openai_provider.OpenAI'):
            llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
            assert llm.provider == "openai"
            assert llm.model == "gpt-3.5-turbo"
    
    def test_init_invalid_provider(self):
        """测试无效提供商"""
        with pytest.raises(ValueError):
            LLMManager(provider="invalid_provider")
    
    @patch('llm_toolkit.providers.openai_provider.OpenAI')
    def test_chat(self, mock_openai, mock_openai_response):
        """测试聊天功能"""
        # 设置mock
        mock_client = Mock()
        mock_client.chat.completions.create.return_value = Mock(
            choices=[Mock(message=Mock(content="测试响应"))]
        )
        mock_openai.return_value = mock_client
        
        # 测试
        llm = LLMManager(provider="openai")
        response = llm.chat("你好")
        
        assert response == "测试响应"
        mock_client.chat.completions.create.assert_called_once()
    
    @patch('llm_toolkit.providers.openai_provider.OpenAI')
    def test_stream(self, mock_openai):
        """测试流式响应"""
        # 设置mock
        mock_client = Mock()
        mock_stream = [
            Mock(choices=[Mock(delta=Mock(content="测"))]),
            Mock(choices=[Mock(delta=Mock(content="试"))]),
            Mock(choices=[Mock(delta=Mock(content=""))])
        ]
        mock_client.chat.completions.create.return_value = iter(mock_stream)
        mock_openai.return_value = mock_client
        
        # 测试
        llm = LLMManager(provider="openai")
        chunks = list(llm.stream("你好"))
        
        assert chunks == ["测", "试"]
    
    @patch('llm_toolkit.providers.openai_provider.OpenAI')
    def test_batch_chat(self, mock_openai):
        """测试批量聊天"""
        # 设置mock
        mock_client = Mock()
        mock_client.chat.completions.create.return_value = Mock(
            choices=[Mock(message=Mock(content="响应"))]
        )
        mock_openai.return_value = mock_client
        
        # 测试
        llm = LLMManager(provider="openai")
        messages = ["消息1", "消息2", "消息3"]
        responses = llm.batch_chat(messages)
        
        assert len(responses) == 3
        assert all(r == "响应" for r in responses)
    
    @patch('llm_toolkit.providers.openai_provider.OpenAI')
    def test_count_tokens(self, mock_openai):
        """测试token计数"""
        llm = LLMManager(provider="openai")
        count = llm.count_tokens("Hello, world!")
        
        assert isinstance(count, int)
        assert count > 0
    
    @patch('llm_toolkit.providers.openai_provider.OpenAI')
    def test_retry_on_error(self, mock_openai):
        """测试错误重试"""
        mock_client = Mock()
        # 前两次失败,第三次成功
        mock_client.chat.completions.create.side_effect = [
            Exception("Error 1"),
            Exception("Error 2"),
            Mock(choices=[Mock(message=Mock(content="成功"))])
        ]
        mock_openai.return_value = mock_client
        
        llm = LLMManager(provider="openai", max_retries=3)
        response = llm.chat("测试")
        
        assert response == "成功"
        assert mock_client.chat.completions.create.call_count == 3
    
    @patch('llm_toolkit.providers.openai_provider.OpenAI')
    def test_max_retries_exceeded(self, mock_openai):
        """测试超过最大重试次数"""
        mock_client = Mock()
        mock_client.chat.completions.create.side_effect = Exception("Error")
        mock_openai.return_value = mock_client
        
        llm = LLMManager(provider="openai", max_retries=2)
        
        with pytest.raises(LLMError):
            llm.chat("测试")


class TestLLMManagerIntegration:
    """LLM管理器集成测试"""
    
    @pytest.mark.integration
    @pytest.mark.requires_api_key
    def test_real_openai_chat(self):
        """测试真实OpenAI聊天(需要API密钥)"""
        import os
        if not os.getenv("OPENAI_API_KEY"):
            pytest.skip("需要OPENAI_API_KEY环境变量")
        
        llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
        response = llm.chat("说'测试成功'")
        
        assert isinstance(response, str)
        assert len(response) > 0
    
    @pytest.mark.integration
    @pytest.mark.requires_api_key
    def test_real_openai_stream(self):
        """测试真实OpenAI流式响应"""
        import os
        if not os.getenv("OPENAI_API_KEY"):
            pytest.skip("需要OPENAI_API_KEY环境变量")
        
        llm = LLMManager(provider="openai", model="gpt-3.5-turbo")
        chunks = list(llm.stream("数到3"))
        
        assert len(chunks) > 0
        assert all(isinstance(chunk, str) for chunk in chunks)

56. tests/test_prompt_manager.py - Prompt管理器测试

# -*- coding: utf-8 -*-
"""
Prompt管理器测试
"""

import pytest
from pathlib import Path
from llm_toolkit import PromptManager
from llm_toolkit.exceptions import PromptNotFoundError, PromptValidationError


class TestPromptManager:
    """Prompt管理器测试类"""
    
    @pytest.fixture
    def pm(self, temp_dir):
        """创建PromptManager实例"""
        return PromptManager(templates_dir=str(temp_dir / "prompts"))
    
    def test_create_template(self, pm):
        """测试创建模板"""
        pm.create_template(
            name="test_template",
            template="Hello, {name}!",
            variables=["name"],
            category="test"
        )
        
        template = pm.get_template("test_template")
        assert template.name == "test_template"
        assert template.template == "Hello, {name}!"
        assert template.variables == ["name"]
    
    def test_create_duplicate_template(self, pm):
        """测试创建重复模板"""
        pm.create_template(
            name="test",
            template="Test",
            variables=[]
        )
        
        with pytest.raises(ValueError):
            pm.create_template(
                name="test",
                template="Test2",
                variables=[]
            )
    
    def test_get_template(self, pm):
        """测试获取模板"""
        pm.create_template(
            name="greeting",
            template="你好,{name}!",
            variables=["name"]
        )
        
        template = pm.get_template("greeting")
        assert template.name == "greeting"
    
    def test_get_nonexistent_template(self, pm):
        """测试获取不存在的模板"""
        with pytest.raises(PromptNotFoundError):
            pm.get_template("nonexistent")
    
    def test_format_template(self, pm):
        """测试格式化模板"""
        pm.create_template(
            name="greeting",
            template="你好,{name}!今天{weather}。",
            variables=["name", "weather"]
        )
        
        template = pm.get_template("greeting")
        result = template.format(name="张三", weather="天气很好")
        
        assert result == "你好,张三!今天天气很好。"
    
    def test_format_missing_variable(self, pm):
        """测试缺少变量"""
        pm.create_template(
            name="test",
            template="Hello, {name}!",
            variables=["name"]
        )
        
        template = pm.get_template("test")
        
        with pytest.raises(PromptValidationError):
            template.format()
    
    def test_update_template(self, pm):
        """测试更新模板"""
        pm.create_template(
            name="test",
            template="Original",
            variables=[]
        )
        
        pm.update_template(
            name="test",
            template="Updated",
            variables=["var"]
        )
        
        template = pm.get_template("test")
        assert template.template == "Updated"
        assert template.variables == ["var"]
    
    def test_delete_template(self, pm):
        """测试删除模板"""
        pm.create_template(
            name="test",
            template="Test",
            variables=[]
        )
        
        pm.delete_template("test")
        
        with pytest.raises(PromptNotFoundError):
            pm.get_template("test")
    
    def test_list_templates(self, pm):
        """测试列出模板"""
        pm.create_template("test1", "T1", [])
        pm.create_template("test2", "T2", [])
        pm.create_template("test3", "T3", [])
        
        templates = pm.list_templates()
        assert len(templates) == 3
        assert all(t.name in ["test1", "test2", "test3"] for t in templates)
    
    def test_get_categories(self, pm):
        """测试获取分类"""
        pm.create_template("t1", "T1", [], category="cat1")
        pm.create_template("t2", "T2", [], category="cat2")
        pm.create_template("t3", "T3", [], category="cat1")
        
        categories = pm.get_categories()
        assert set(categories) == {"cat1", "cat2"}
    
    def test_search_templates(self, pm):
        """测试搜索模板"""
        pm.create_template("greeting", "Hello", [], description="Say hello")
        pm.create_template("farewell", "Goodbye", [], description="Say goodbye")
        
        results = pm.search_templates("hello")
        assert len(results) == 1
        assert results[0].name == "greeting"
    
    def test_template_versioning(self, pm):
        """测试模板版本控制"""
        pm.create_template("test", "v1", [])
        pm.update_template("test", "v2", [], create_version=True)
        pm.update_template("test", "v3", [], create_version=True)
        
        versions = pm.get_versions("test")
        assert len(versions) >= 2
        
        # 回滚到v1
        pm.rollback("test", "v1")
        template = pm.get_template("test")
        assert template.template == "v1"
    
    def test_save_and_load(self, pm, temp_dir):
        """测试保存和加载"""
        pm.create_template("test1", "T1", [])
        pm.create_template("test2", "T2", [])
        
        # 保存
        save_path = temp_dir / "templates.json"
        pm.save(save_path)
        
        # 加载到新实例
        pm2 = PromptManager(templates_dir=str(temp_dir / "prompts2"))
        pm2.load(save_path)
        
        assert len(pm2.list_templates()) == 2

57. tests/test_conversation_manager.py - 对话管理器测试

# -*- coding: utf-8 -*-
"""
对话管理器测试
"""

import pytest
from unittest.mock import Mock
from llm_toolkit import ConversationManager


class TestConversationManager:
    """对话管理器测试类"""
    
    @pytest.fixture
    def mock_llm(self):
        """模拟LLM"""
        llm = Mock()
        llm.chat.return_value = "测试响应"
        llm.stream.return_value = iter(["测", "试"])
        return llm
    
    def test_init(self, mock_llm):
        """测试初始化"""
        conv = ConversationManager(llm=mock_llm)
        assert conv.llm == mock_llm
        assert len(conv.messages) == 0
    
    def test_init_with_system_message(self, mock_llm):
        """测试带系统消息的初始化"""
        conv = ConversationManager(
            llm=mock_llm,
            system_message="你是一个助手"
        )
        assert len(conv.messages) == 1
        assert conv.messages[0]["role"] == "system"
    
    def test_chat(self, mock_llm):
        """测试对话"""
        conv = ConversationManager(llm=mock_llm)
        response = conv.chat("你好")
        
        assert response == "测试响应"
        assert len(conv.messages) == 2
        assert conv.messages[0]["role"] == "user"
        assert conv.messages[1]["role"] == "assistant"
    
    def test_stream_chat(self, mock_llm):
        """测试流式对话"""
        conv = ConversationManager(llm=mock_llm)
        chunks = list(conv.stream_chat("你好"))
        
        assert chunks == ["测", "试"]
        assert len(conv.messages) == 2
    
    def test_max_history(self, mock_llm):
        """测试最大历史长度"""
        conv = ConversationManager(llm=mock_llm, max_history=2)
        
        conv.chat("消息1")
        conv.chat("消息2")
        conv.chat("消息3")
        
        # 应该只保留最后2轮对话(4条消息)
        assert len(conv.messages) == 4
    
    def test_clear_history(self, mock_llm):
        """测试清空历史"""
        conv = ConversationManager(llm=mock_llm)
        conv.chat("消息1")
        conv.chat("消息2")
        
        conv.clear_history()
        assert len(conv.messages) == 0
    
    def test_get_history(self, mock_llm):
        """测试获取历史"""
        conv = ConversationManager(llm=mock_llm)
        conv.chat("你好")
        
        history = conv.get_history()
        assert len(history) == 2
        assert history[0]["role"] == "user"
        assert history[1]["role"] == "assistant"
    
    def test_save_and_load(self, mock_llm, temp_dir):
        """测试保存和加载"""
        conv = ConversationManager(llm=mock_llm)
        conv.chat("消息1")
        conv.chat("消息2")
        
        # 保存
        save_path = temp_dir / "conversation.json"
        conv.save(str(save_path))
        
        # 加载
        conv2 = ConversationManager(llm=mock_llm)
        conv2.load(str(save_path))
        
        assert len(conv2.messages) == 4
    
    def test_buffer_memory(self, mock_llm):
        """测试缓冲记忆"""
        conv = ConversationManager(
            llm=mock_llm,
            memory_type="buffer",
            max_history=3
        )
        
        for i in range(5):
            conv.chat(f"消息{i}")
        
        # 应该只保留最后3轮
        assert len(conv.messages) == 6
    
    def test_summary_memory(self, mock_llm):
        """测试摘要记忆"""
        mock_llm.chat.side_effect = [
            "响应1", "响应2", "响应3",
            "这是对话摘要"  # 摘要
        ]
        
        conv = ConversationManager(
            llm=mock_llm,
            memory_type="summary",
            max_history=2
        )
        
        conv.chat("消息1")
        conv.chat("消息2")
        conv.chat("消息3")  # 触发摘要
        
        # 检查是否生成了摘要
        assert any(msg.get("role") == "system" for msg in conv.messages)

58. tests/test_rag_pipeline.py - RAG管道测试

# -*- coding: utf-8 -*-
"""
RAG管道测试
"""

import pytest
from unittest.mock import Mock, patch
from llm_toolkit import RAGPipeline


class TestRAGPipeline:
    """RAG管道测试类"""
    
    @pytest.fixture
    def mock_llm(self):
        """模拟LLM"""
        llm = Mock()
        llm.chat.return_value = "基于上下文的答案"
        return llm
    
    @pytest.fixture
    def mock_vector_store(self):
        """模拟向量数据库"""
        store = Mock()
        store.add_texts.return_value = ["id1", "id2"]
        store.similarity_search.return_value = [
            Mock(page_content="相关文本1", metadata={"source": "doc1"}),
            Mock(page_content="相关文本2", metadata={"source": "doc2"})
        ]
        return store
    
    @patch('llm_toolkit.rag.vector_stores.ChromaVectorStore')
    def test_init(self, mock_chroma, mock_llm):
        """测试初始化"""
        rag = RAGPipeline(llm=mock_llm)
        assert rag.llm == mock_llm
    
    @patch('llm_toolkit.rag.vector_stores.ChromaVectorStore')
    def test_add_texts(self, mock_chroma, mock_llm, sample_texts):
        """测试添加文本"""
        mock_store = Mock()
        mock_store.add_texts.return_value = ["id1", "id2", "id3"]
        mock_chroma.return_value = mock_store
        
        rag = RAGPipeline(llm=mock_llm)
        num_added = rag.add_texts(sample_texts)
        
        assert num_added == 3
        mock_store.add_texts.assert_called_once()
    
    @patch('llm_toolkit.rag.vector_stores.ChromaVectorStore')
    def test_add_documents(self, mock_chroma, mock_llm, sample_documents):
        """测试添加文档"""
        mock_store = Mock()
        mock_store.add_texts.return_value = ["id1", "id2"]
        mock_chroma.return_value = mock_store
        
        rag = RAGPipeline(llm=mock_llm)
        num_added = rag.add_documents(str(sample_documents))
        
        assert num_added > 0
    
    @patch('llm_toolkit.rag.vector_stores.ChromaVectorStore')
    def test_query(self, mock_chroma, mock_llm):
        """测试查询"""
        mock_store = Mock()
        mock_store.similarity_search.return_value = [
            Mock(page_content="Python是编程语言", metadata={"source": "doc1"})
        ]
        mock_chroma.return_value = mock_store
        
        rag = RAGPipeline(llm=mock_llm)
        response = rag.query("什么是Python?")
        
        assert response.answer == "基于上下文的答案"
        assert len(response.sources) > 0
        mock_llm.chat.assert_called_once()
    
    @patch('llm_toolkit.rag.vector_stores.ChromaVectorStore')
    def test_query_with_filter(self, mock_chroma, mock_llm):
        """测试带过滤的查询"""
        mock_store = Mock()
        mock_store.similarity_search.return_value = []
        mock_chroma.return_value = mock_store
        
        rag = RAGPipeline(llm=mock_llm)
        response = rag.query(
            "测试问题",
            filter={"source": "doc1"}
        )
        
        mock_store.similarity_search.assert_called_once()
        call_kwargs = mock_store.similarity_search.call_args[1]
        assert "filter" in call_kwargs
    
    @patch('llm_toolkit.rag.vector_stores.ChromaVectorStore')
    def test_hybrid_search(self, mock_chroma, mock_llm):
        """测试混合搜索"""
        mock_store = Mock()
        mock_store.similarity_search.return_value = [
            Mock(page_content="文本1", metadata={})
        ]
        mock_chroma.return_value = mock_store
        
        rag = RAGPipeline(llm=mock_llm)
        response = rag.query("测试", use_hybrid=True)
        
        assert response.answer is not None
    
    @patch('llm_toolkit.rag.vector_stores.ChromaVectorStore')
    def test_reranking(self, mock_chroma, mock_llm):
        """测试重排序"""
        mock_store = Mock()
        mock_store.similarity_search.return_value = [
            Mock(page_content=f"文本{i}", metadata={}) for i in range(10)
        ]
        mock_chroma.return_value = mock_store
        
        rag = RAGPipeline(llm=mock_llm, top_k=10)
        
        with patch('llm_toolkit.rag.reranker.Reranker') as mock_reranker:
            mock_reranker_instance = Mock()
            mock_reranker_instance.rerank.return_value = [
                Mock(page_content="重排文本", metadata={})
            ]
            mock_reranker.return_value = mock_reranker_instance
            
            response = rag.query("测试", use_reranking=True)
            
            assert response.answer is not None

59. tests/test_cost_tracker.py - 成本追踪测试

# -*- coding: utf-8 -*-
"""
成本追踪测试
"""

import pytest
from datetime import datetime, timedelta
from llm_toolkit import CostTracker
from llm_toolkit.exceptions import CostLimitExceededError


class TestCostTracker:
    """成本追踪测试类"""
    
    @pytest.fixture
    def tracker(self, temp_dir):
        """创建成本追踪器"""
        db_path = temp_dir / "costs.db"
        return CostTracker(
            db_path=str(db_path),
            daily_limit=10.0,
            monthly_limit=100.0
        )
    
    def test_init(self, tracker):
        """测试初始化"""
        assert tracker.daily_limit == 10.0
        assert tracker.monthly_limit == 100.0
    
    def test_track_usage(self, tracker):
        """测试追踪使用"""
        tracker.track_usage(
            provider="openai",
            model="gpt-3.5-turbo",
            input_tokens=100,
            output_tokens=50,
            cost=0.001
        )
        
        daily_cost = tracker.get_daily_cost()
        assert daily_cost == 0.001
    
    def test_get_daily_cost(self, tracker):
        """测试获取每日成本"""
        tracker.track_usage("openai", "gpt-3.5-turbo", 100, 50, 0.5)
        tracker.track_usage("openai", "gpt-3.5-turbo", 100, 50, 0.3)
        
        daily_cost = tracker.get_daily_cost()
        assert daily_cost == 0.8
    
    def test_get_monthly_cost(self, tracker):
        """测试获取月度成本"""
        tracker.track_usage("openai", "gpt-3.5-turbo", 100, 50, 1.0)
        
        monthly_cost = tracker.get_monthly_cost()
        assert monthly_cost == 1.0
    
    def test_daily_limit_exceeded(self, tracker):
        """测试超过每日限额"""
        with pytest.raises(CostLimitExceededError):
            tracker.track_usage("openai", "gpt-3.5-turbo", 10000, 5000, 15.0)
    
    def test_monthly_limit_exceeded(self, tracker):
        """测试超过月度限额"""
        with pytest.raises(CostLimitExceededError):
            tracker.track_usage("openai", "gpt-4", 100000, 50000, 150.0)
    
    def test_get_usage_stats(self, tracker):
        """测试获取使用统计"""
        tracker.track_usage("openai", "gpt-3.5-turbo", 100, 50, 0.5)
        tracker.track_usage("openai", "gpt-4", 200, 100, 1.5)
        
        stats = tracker.get_usage_stats(period="daily")
        
        assert "total_cost" in stats
        assert "total_tokens" in stats
        assert "by_model" in stats
    
    def test_generate_report(self, tracker):
        """测试生成报告"""
        tracker.track_usage("openai", "gpt-3.5-turbo", 100, 50, 0.5)
        
        report = tracker.generate_report(period="daily", format="text")
        
        assert isinstance(report, str)
        assert "成本报告" in report
    
    def test_get_optimization_suggestions(self, tracker):
        """测试获取优化建议"""
        # 添加一些使用记录
        for _ in range(10):
            tracker.track_usage("openai", "gpt-4", 1000, 500, 0.5)
        
        suggestions = tracker.get_optimization_suggestions()
        
        assert isinstance(suggestions, list)
    
    def test_reset_daily_cost(self, tracker):
        """测试重置每日成本"""
        tracker.track_usage("openai", "gpt-3.5-turbo", 100, 50, 1.0)
        
        tracker.reset_daily_cost()
        
        daily_cost = tracker.get_daily_cost()
        assert daily_cost == 0.0
    
    def test_export_data(self, tracker, temp_dir):
        """测试导出数据"""
        tracker.track_usage("openai", "gpt-3.5-turbo", 100, 50, 0.5)
        
        export_path = temp_dir / "export.json"
        tracker.export_data(str(export_path))
        
        assert export_path.exists()

60. tests/integration/test_full_workflow.py - 完整工作流集成测试

# -*- coding: utf-8 -*-
"""
完整工作流集成测试
"""

import pytest
from pathlib import Path
from llm_toolkit import (
    LLMManager,
    PromptManager,
    ConversationManager,
    RAGPipeline,
    CostTracker
)


@pytest.mark.integration
class TestFullWorkflow:
    """完整工作流集成测试"""
    
    @pytest.fixture
    def setup_environment(self, temp_dir):
        """设置测试环境"""
        # 创建测试文档
        doc_dir = temp_dir / "docs"
        doc_dir.mkdir()
        
        (doc_dir / "python.txt").write_text(
            "Python是一种高级编程语言,由Guido van Rossum创建。",
            encoding="utf-8"
        )
        
        return {
            "doc_dir": doc_dir,
            "temp_dir": temp_dir
        }
    
    def test_rag_workflow(self, setup_environment, mock_llm):
        """测试RAG工作流"""
        env = setup_environment
        
        # 1. 初始化RAG
        rag = RAGPipeline(llm=mock_llm)
        
        # 2. 添加文档
        num_docs = rag.add_documents(str(env["doc_dir"]))
        assert num_docs > 0
        
        # 3. 查询
        response = rag.query("谁创建了Python?")
        assert response.answer is not None
    
    def test_conversation_with_prompts(self, mock_llm, temp_dir):
        """测试对话与Prompt模板结合"""
        # 1. 创建Prompt模板
        pm = PromptManager(templates_dir=str(temp_dir / "prompts"))
        pm.create_template(
            name="assistant",
            template="你是一个{role},请{task}。",
            variables=["role", "task"]
        )
        
        # 2. 使用模板创建系统消息
        template = pm.get_template("assistant")
        system_message = template.format(
            role="编程助手",
            task="帮助用户解决编程问题"
        )
        
        # 3. 创建对话
        conv = ConversationManager(
            llm=mock_llm,
            system_message=system_message
        )
        
        # 4. 进行对话
        response = conv.chat("如何学习Python?")
        assert response is not None
    
    def test_cost_tracking_workflow(self, temp_dir):
        """测试成本追踪工作流"""
        from unittest.mock import Mock, patch
        
        # 1. 创建成本追踪器
        tracker = CostTracker(
            db_path=str(temp_dir / "costs.db"),
            daily_limit=5.0
        )
        
        # 2. 创建LLM(关联追踪器)
        with patch('llm_toolkit.providers.openai_provider.OpenAI'):
            llm = LLMManager(
                provider="openai",
                cost_tracker=tracker
            )
            
            # 模拟LLM调用
            mock_client = Mock()
            mock_client.chat.completions.create.return_value = Mock(
                choices=[Mock(message=Mock(content="响应"))],
                usage=Mock(prompt_tokens=10, completion_tokens=20)
            )
            llm.client = mock_client
            
            # 3. 进行对话
            llm.chat("测试")
            
            # 4. 检查成本
            daily_cost = tracker.get_daily_cost()
            assert daily_cost > 0

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐