作为AI技术专家兼学习规划博主,我每天都会收到读者的类似提问:
“南木,RNN明明能处理时序数据,为啥还要费劲学LSTM?”
“GRU和LSTM看起来差不多,实际项目里该怎么选?”
“用PyTorch写时序预测,跑出来的结果总是飘,问题出在哪?”

其实时序数据处理的核心矛盾,从始至终都是“如何记住关键信息,同时忘掉噪音”——RNN的出现解决了“时序依赖”的初步需求,却栽在“长序列记不住”的坑里;LSTM用“门控机制”补上了这个漏洞,却带来了“参数太多、训练慢”的新问题;GRU则在两者之间做了平衡,成为“效率优先”场景的首选。

这篇文章会用“进化逻辑+原理拆解+实战对比”的方式,把RNN/LSTM/GRU讲透:从三者的核心区别,到各自的适用场景,最后用PyTorch实现文本情感分析(GRU)、股价预测(LSTM)、关键词识别(LSTM) 三个实战案例。全程无冗余理论,每个知识点都配“通俗解释+代码验证”,看完就能落地。

同时需要学习规划、就业指导、技术答疑和系统课程学习的同学 欢迎扫码交流

在这里插入图片描述

一、先搞懂:为什么时序数据需要“特殊对待”?

在讲RNN之前,我们得先明确一个前提:为什么CNN、MLP(多层感知机)处理不了时序数据?

因为时序数据的核心是“前后依赖关系”——比如:

  • 文本中,“他今天没带伞,所以______”,横线处的词(淋湿/回家)依赖前面的“没带伞”;
  • 股价中,“过去5天连续上涨”的趋势,会影响第6天的涨跌判断;
  • 语音中,“你好,我是______”,后面的名字依赖“我是”这个上下文。

而传统模型(如CNN、MLP)的输入是“独立同分布”的——每个样本之间没有关联,比如识别图片中的猫,每张图的像素都是独立的,不需要考虑上一张图的内容。这种“无记忆”的特性,注定了它们处理不了时序数据。

RNN的革命性在于:它给网络加了“记忆”——让当前的输出不仅依赖当前输入,还依赖上一时刻的“隐藏状态”(可以理解为“过去的记忆”)。

二、时序处理的“进化三阶段”:RNN→LSTM→GRU

我们按“问题-解决方案”的进化逻辑,逐个拆解三个模型的核心原理,重点讲清“为什么需要下一个模型”。

2.1 第一阶段:RNN(循环神经网络)——时序处理的“启蒙者”

(1)核心结构:一个“循环”解决依赖问题

RNN的结构非常简单,核心是“隐藏层的循环连接”——可以理解为“把同一个神经网络模块(RNN Cell)重复使用,每个时刻的模块都接收当前输入和上一时刻的记忆,输出当前结果和更新后的记忆”。

用公式和通俗解释结合说明(假设第t时刻):

  • 输入:当前时刻的输入向量 ( x_t )(比如文本中的第t个词的嵌入向量);
  • 隐藏状态:上一时刻的记忆 ( h_{t-1} )(比如第t-1个词的语义信息);
  • 输出:当前时刻的隐藏状态 ( h_t )(更新后的记忆,会传给下一时刻)和预测输出 ( y_t )(比如对第t个词的分类结果)。

核心公式(简化版):
( h_t = \tanh(W_{xh}x_t + W_{hh}h_{t-1} + b_h) )
( y_t = W_{hy}h_t + b_y )

  • ( W_{xh} ):输入到隐藏层的权重矩阵;
  • ( W_{hh} ):隐藏层到自身(循环)的权重矩阵;
  • ( \tanh ):激活函数(把输出压缩到[-1,1],避免数值爆炸);
  • ( b_h, b_y ):偏置项。

通俗比喻:RNN就像一个“记笔记的学生”——听课时(处理输入 ( x_t )),会结合之前的笔记(( h_{t-1} )),更新笔记内容(( h_t )),最后根据笔记回答问题(( y_t ))。

(2)致命缺陷:长序列的“梯度消失/爆炸”

RNN的问题出在“循环连接”的梯度传播上——训练时,梯度需要从最后一个时刻反向传播到第一个时刻,而每次传播都会乘以 ( W_{hh} )。

  • 如果 ( W_{hh} ) 的特征值小于1:梯度会像“多米诺骨牌”一样越传越小,最后趋近于0(梯度消失)——前面时刻的信息传不到后面,比如读100个词的句子,RNN记不住第1个词的内容;
  • 如果 ( W_{hh} ) 的特征值大于1:梯度会越传越大,最终超出计算范围(梯度爆炸)——模型参数被破坏,无法训练。

实际影响:RNN最多只能处理“几十步”的短序列(比如20个词的句子),对于长文本(如新闻)、长语音(如10秒以上)完全无能为力。

2.2 第二阶段:LSTM(长短期记忆网络)——解决“长记忆”的“工程师”

为了解决RNN的“健忘症”,1997年Hochreiter & Schmidhuber提出了LSTM——核心思路是“设计一套‘门控机制’,让网络自主决定‘记住什么、忘记什么’”,就像给“学生”加了一个“文件夹”:重要的笔记(长期信息)放进文件夹保存,不重要的临时笔记(短期噪音)直接丢掉。

(1)核心改进:3个门控+1个细胞状态

LSTM在RNN的基础上,增加了两个关键组件:

  • 细胞状态(Cell State):相当于“长期记忆传送带”——信息在上面平稳传递,只有少量修改(通过门控),避免梯度快速衰减;
  • 门控(Gates):相当于“传送带的开关”——通过sigmoid激活函数(输出0~1,0表示关闭,1表示打开)控制信息的流入和流出,共3个门:遗忘门、输入门、输出门。

我们按“信息处理流程”拆解每个组件的作用(第t时刻):

① 遗忘门(Forget Gate):决定“该忘记什么”

作用:判断上一时刻的细胞状态 ( C_{t-1} ) 中,哪些信息需要保留,哪些需要丢弃。

公式:( f_t = \sigma(W_f [h_{t-1}, x_t] + b_f) )

  • ( [h_{t-1}, x_t] ):上一时刻隐藏状态和当前输入的拼接;
  • ( \sigma ):sigmoid函数,输出0~1(0=完全遗忘,1=完全保留);
  • ( W_f, b_f ):遗忘门的权重和偏置。

通俗比喻:读句子“小明昨天去了上海,今天他______”,遗忘门会决定“保留‘小明’‘今天’,忘记‘昨天’‘上海’(因为当前主语还是小明,时间是今天)”。

② 输入门(Input Gate):决定“该记住什么新信息”

作用:有两个步骤,先筛选当前输入的重要信息,再更新到细胞状态中。

步骤1:筛选新信息(用sigmoid选“要记的”,用tanh生成“候选信息”)
( i_t = \sigma(W_i [h_{t-1}, x_t] + b_i) )(输入门开关,0=不记,1=记)
( \tilde{C}t = \tanh(W_C [h{t-1}, x_t] + b_C) )(当前输入的候选信息,值在[-1,1])

步骤2:更新细胞状态(遗忘旧信息+加入新信息)
( C_t = f_t \odot C_{t-1} + i_t \odot \tilde{C}_t )

  • ( \odot ):元素-wise乘法(对应位置相乘)。

通俗比喻:输入“今天他去了北京”,输入门会决定“记住‘去了北京’,并把细胞状态中的‘上海’替换成‘北京’”。

③ 输出门(Output Gate):决定“该输出什么”

作用:从更新后的细胞状态 ( C_t ) 中,筛选出需要传给下一时刻的“短期记忆”(隐藏状态 ( h_t ))。

公式:
( o_t = \sigma(W_o [h_{t-1}, x_t] + b_o) )(输出门开关)
( h_t = o_t \odot \tanh(C_t) )(当前隐藏状态,传给下一时刻)

通俗比喻:细胞状态中保存了“小明今天去了北京”,输出门会决定“输出‘小明去了北京’这个信息,用于预测下一个词(比如‘明天会去天津’)”。

(2)LSTM的优势与缺点
  • 优势:通过细胞状态和门控机制,大幅缓解了梯度消失问题,能处理“几百步”的长序列(比如100个词的文本、30天的股价);
  • 缺点:参数数量是RNN的4倍(3个门控+1个细胞状态,每个都有独立权重),训练速度慢、占用内存多——比如同样的隐藏层维度,LSTM的训练时间是RNN的3~4倍。

2.3 第三阶段:GRU(门控循环单元)——平衡“性能与效率”的“优化者”

2014年,Cho等人提出了GRU——核心是“简化LSTM的结构,在保证性能的前提下减少参数,提升训练效率”。

GRU做了两个关键简化:

  1. 合并“细胞状态”和“隐藏状态”:去掉LSTM的细胞状态 ( C_t ),只用隐藏状态 ( h_t ) 同时存储长期和短期信息;
  2. 减少门控数量:把LSTM的“遗忘门+输入门”合并成“更新门”,去掉“输出门”,只保留“更新门”和“重置门”两个门控。
(1)核心结构:2个门控+1个隐藏状态

我们同样按流程拆解(第t时刻):

① 重置门(Reset Gate):决定“是否忽略过去的记忆”

作用:判断是否需要“清空”上一时刻的隐藏状态 ( h_{t-1} ),专注于当前输入 ( x_t )。

公式:( r_t = \sigma(W_r [h_{t-1}, x_t] + b_r) )

  • ( r_t ) 输出0~1:0=完全忽略过去记忆,1=完全保留过去记忆。

通俗比喻:读句子“张三昨天买了苹果,李四今天买了______”,重置门会决定“忽略‘张三’‘苹果’的记忆,专注于‘李四’这个新主语”。

② 更新门(Update Gate):决定“该保留多少过去/现在的信息”

作用:同时实现LSTM中“遗忘门”和“输入门”的功能——既决定忘记多少过去的记忆,也决定记住多少当前的新信息。

步骤1:计算更新门开关
( z_t = \sigma(W_z [h_{t-1}, x_t] + b_z) )(0=不保留过去,1=不保留现在)

步骤2:生成候选隐藏状态(结合重置门筛选的过去记忆和当前输入)
( \tilde{h}t = \tanh(W_h [r_t \odot h{t-1}, x_t] + b_h) )

步骤3:更新隐藏状态(过去记忆×保留比例 + 新信息×保留比例)
( h_t = (1 - z_t) \odot h_{t-1} + z_t \odot \tilde{h}_t )

  • ( (1 - z_t) \odot h_{t-1} ):保留的过去记忆;
  • ( z_t \odot \tilde{h}_t ):保留的当前新信息。

通俗比喻:输入“李四今天买了香蕉”,更新门会决定“保留‘李四’的记忆,忘记‘张三’,并加入‘香蕉’的新信息”。

(2)GRU的优势与缺点
  • 优势:参数数量比LSTM少30%~40%,训练速度快、内存占用少;在多数场景(如文本分类、短时序预测)中,性能接近LSTM;
  • 缺点:对于超长长序列(如500步以上)或复杂依赖(如语音识别),长期记忆能力略逊于LSTM——因为没有独立的细胞状态,长期信息容易被短期信息干扰。

三、核心区别对比:RNN/LSTM/GRU该怎么选?

很多人纠结“选哪个模型”,本质是没搞懂三者的“trade-off(权衡)”——下表从5个关键维度做对比,帮你快速决策:

维度 RNN LSTM GRU
核心结构 1个隐藏状态,无门控 1个细胞状态+1个隐藏状态,3个门控 1个隐藏状态,2个门控
参数规模 最小(基准) 最大(RNN的4倍) 中等(LSTM的70%)
训练效率 最快 最慢 较快(LSTM的1.5~2倍)
长序列能力 弱(最多几十步) 强(几百~上千步) 中(几百步)
适用场景 超短序列(如关键词)、简单任务 长序列(如语音、长文本)、高精度需求 中短序列(如情感分析、股价预测)、效率优先
常见误区 现在很少用(仅教学) 不是“万能”,超长长序列仍会梯度消失 不是“LSTM简化版=性能差”,多数场景足够

一句话总结选择逻辑

  • 若序列短(<50步)、任务简单、资源有限 → 优先GRU;
  • 若序列长(>100步)、任务复杂(如语音、机器翻译)、追求高精度 → 优先LSTM;
  • 若只是入门学习 → 先学RNN理解原理,再学LSTM/GRU落地。

四、实战环节:3个场景用PyTorch实现(附完整代码)

我们选择3个最典型的时序任务,分别用GRU和LSTM实现,对比效果差异。所有代码都可复现,依赖库用PyTorch+常用数据处理库(pandas、numpy、matplotlib)。

4.1 场景1:文本情感分析(GRU实现)——短序列任务

任务目标:对IMDB电影评论进行二分类(正面/负面情感),评论长度约100~200词(中短序列),适合用GRU(效率高)。

(1)环境准备
# 安装依赖(已安装可跳过)
pip install torch torchvision pandas numpy matplotlib scikit-learn
pip install torchtext  # 处理文本数据(IMDB数据集)
(2)数据加载与预处理

用torchtext加载IMDB数据集,做分词、词嵌入(将词转为向量):

import torch
from torchtext.datasets import IMDB
from torchtext.data import Field, LabelField, BucketIterator
import random

# 1. 定义文本和标签的处理方式
TEXT = Field(
    tokenize='spacy',  # 用spaCy分词(需提前安装:pip install spacy en_core_web_sm)
    tokenizer_language='en_core_web_sm',
    lower=True,  # 字母小写
    include_lengths=True  # 保留句子长度(用于后续padding处理)
)
LABEL = LabelField(dtype=torch.float)  # 标签:0=负面,1=正面

# 2. 加载IMDB数据集(训练集25000条,测试集25000条)
train_data, test_data = IMDB.splits(TEXT, LABEL)

# 3. 划分验证集(从训练集中分20%作为验证集)
train_data, val_data = train_data.split(random_state=random.seed(42), split_ratio=0.8)

# 4. 构建词汇表(用训练集的词,限制最大词汇量为10000)
TEXT.build_vocab(train_data, max_size=10000, vectors='glove.6B.100d')  # 用GloVe预训练词嵌入(100维)
LABEL.build_vocab(train_data)

# 5. 构建迭代器(按句子长度分组,减少padding,加速训练)
BATCH_SIZE = 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # 优先用GPU
train_iterator, val_iterator, test_iterator = BucketIterator.splits(
    (train_data, val_data, test_data),
    batch_size=BATCH_SIZE,
    sort_within_batch=True,  # 每个batch内按句子长度排序
    device=device
)

# 查看数据信息
print(f"训练集样本数:{len(train_data)}")
print(f"验证集样本数:{len(val_data)}")
print(f"测试集样本数:{len(test_data)}")
print(f"词汇表大小:{len(TEXT.vocab)}")
print(f"标签分布:正面{LABEL.vocab.freqs['pos']}条,负面{LABEL.vocab.freqs['neg']}条")
(3)定义GRU模型
import torch.nn as nn

class GRU_Sentiment(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        # 1. 词嵌入层(将词索引转为向量)
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 2. GRU层(n_layers=2层,bidirectional=False=单向)
        self.gru = nn.GRU(
            embedding_dim,  # 输入维度(词嵌入维度)
            hidden_dim,     # 隐藏层维度
            num_layers=n_layers,  # 层数
            bidirectional=False,  # 单向GRU(情感分析无需双向)
            dropout=dropout if n_layers > 1 else 0  # 多层时才用dropout
        )
        # 3. 全连接层(将GRU输出转为2分类)
        self.fc = nn.Linear(hidden_dim, output_dim)
        # 4. Dropout层(防止过拟合)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text, text_lengths):
        # text: (句子长度, batch_size) → 注意torchtext的输入格式是[seq_len, batch]
        # text_lengths: (batch_size) → 每个句子的实际长度(用于pack_padded_sequence)
        
        # 步骤1:词嵌入 → (seq_len, batch_size, embedding_dim)
        embedded = self.dropout(self.embedding(text))
        
        # 步骤2:处理变长序列(用pack_padded_sequence,避免padding部分参与计算)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths)
        packed_output, hidden = self.gru(packed_embedded)  # hidden: (n_layers, batch_size, hidden_dim)
        
        # 步骤3:GRU输出取最后一层的隐藏状态(因为情感分析关注整个句子的语义)
        hidden = self.dropout(hidden[-1, :, :])  # (batch_size, hidden_dim)
        
        # 步骤4:全连接层输出 → (batch_size, output_dim)
        output = self.fc(hidden)
        return output

# 初始化模型参数
VOCAB_SIZE = len(TEXT.vocab)
EMBEDDING_DIM = 100  # 与GloVe词嵌入维度一致
HIDDEN_DIM = 256     # 隐藏层维度
OUTPUT_DIM = 1       # 输出维度(二分类,用sigmoid)
N_LAYERS = 2         # 2层GRU
DROPOUT = 0.5        # Dropout概率

model = GRU_Sentiment(
    vocab_size=VOCAB_SIZE,
    embedding_dim=EMBEDDING_DIM,
    hidden_dim=HIDDEN_DIM,
    output_dim=OUTPUT_DIM,
    n_layers=N_LAYERS,
    dropout=DROPOUT
).to(device)

# 加载预训练词嵌入(GloVe)
model.embedding.weight.data.copy_(TEXT.vocab.vectors)
# 初始化未在GloVe中的词(<unk>和<pad>)
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)

print(model)
(4)模型训练与评估
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt

# 1. 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = F.binary_cross_entropy_with_logits  # 二分类交叉熵(带sigmoid)

# 2. 定义准确率计算函数
def binary_accuracy(preds, y):
    # preds: (batch_size, 1) → 模型输出(未经过sigmoid)
    # y: (batch_size, 1) → 真实标签
    rounded_preds = torch.round(torch.sigmoid(preds))  # 四舍五入(0/1)
    correct = (rounded_preds == y).float()  # 正确预测的样本数
    acc = correct.sum() / len(correct)
    return acc

# 3. 训练函数
def train(model, iterator, optimizer, criterion):
    model.train()  # 训练模式(启用dropout)
    epoch_loss = 0
    epoch_acc = 0
    for batch in iterator:
        text, text_lengths = batch.text  # 取文本和长度
        labels = batch.label.unsqueeze(1)  # 标签转为(batch_size, 1)
        
        optimizer.zero_grad()  # 清空梯度
        preds = model(text, text_lengths)  # 前向传播
        loss = criterion(preds, labels)    # 计算损失
        acc = binary_accuracy(preds, labels)  # 计算准确率
        
        loss.backward()  # 反向传播
        optimizer.step()  # 更新参数
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    
    # 平均损失和准确率(按batch数平均)
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# 4. 评估函数
def evaluate(model, iterator, criterion):
    model.eval()  # 评估模式(关闭dropout)
    epoch_loss = 0
    epoch_acc = 0
    with torch.no_grad():  # 禁用梯度计算
        for batch in iterator:
            text, text_lengths = batch.text
            labels = batch.label.unsqueeze(1)
            
            preds = model(text, text_lengths)
            loss = criterion(preds, labels)
            acc = binary_accuracy(preds, labels)
            
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# 5. 开始训练(10个epoch)
N_EPOCHS = 10
best_val_loss = float('inf')  # 记录最佳验证损失(用于保存模型)
train_losses = []
val_losses = []
train_accs = []
val_accs = []

for epoch in range(N_EPOCHS):
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_iterator, criterion)
    
    # 保存最佳模型(验证损失最小)
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'gru_sentiment.pt')
    
    # 记录损失和准确率(用于绘图)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)
    
    # 打印结果
    print(f'Epoch {epoch+1}/{N_EPOCHS}')
    print(f'训练损失:{train_loss:.4f} | 训练准确率:{train_acc:.4f}')
    print(f'验证损失:{val_loss:.4f} | 验证准确率:{val_acc:.4f}')
    print('-' * 50)

# 6. 测试集评估(加载最佳模型)
model.load_state_dict(torch.load('gru_sentiment.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'测试损失:{test_loss:.4f} | 测试准确率:{test_acc:.4f}')

# 7. 绘制损失和准确率曲线
plt.figure(figsize=(12, 4))
# 损失曲线
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='训练损失')
plt.plot(val_losses, label='验证损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
# 准确率曲线
plt.subplot(1, 2, 2)
plt.plot(train_accs, label='训练准确率')
plt.plot(val_accs, label='验证准确率')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
(5)结果分析
  • 预期效果:测试准确率约88%~90%,GRU在情感分析任务上表现优秀;
  • 为什么不用LSTM?因为评论长度在200词以内,GRU足够处理,且训练速度比LSTM快30%左右;
  • 优化方向:用双向GRU(bidirectional=True),可进一步提升准确率到92%左右(考虑句子前后文)。

4.2 场景2:股价预测(LSTM实现)——长序列任务

任务目标:根据苹果公司(AAPL)过去60天的股价数据,预测第61天的收盘价(长序列依赖,适合用LSTM)。

(1)数据加载与预处理

用yfinance获取AAPL的历史股价数据,做归一化(时序预测必须归一化,否则数值范围差异会影响模型):

import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

# 1. 获取AAPL股价数据(2010-2024年)
ticker = 'AAPL'
start_date = '2010-01-01'
end_date = '2024-01-01'
data = yf.download(ticker, start=start_date, end=end_date)
# 只保留收盘价(预测目标)
df = data[['Close']].copy()
print(f"数据形状:{df.shape}")
print(df.head())

# 2. 数据归一化(将价格缩放到[0,1])
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(df)

# 3. 构建时序数据集(用过去60天预测未来1天)
def create_dataset(dataset, time_step=60):
    X, y = [], []
    for i in range(time_step, len(dataset)):
        X.append(dataset[i-time_step:i, 0])  # 前60天数据(输入)
        y.append(dataset[i, 0])              # 第61天数据(输出)
    return np.array(X), np.array(y)

TIME_STEP = 60  # 时间步长(用过去60天预测)
X, y = create_dataset(scaled_data, TIME_STEP)

# 4. 数据格式转换(LSTM输入要求:[样本数, 时间步长, 特征数])
X = np.reshape(X, (X.shape[0], X.shape[1], 1))  # 特征数=1(仅收盘价)

# 5. 划分训练集和测试集(8:2)
train_size = int(0.8 * len(X))
test_size = len(X) - train_size
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[test_size:]

# 6. 转为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.float32).to(device)

# 查看数据信息
print(f"训练集输入形状:{X_train.shape}")  # (样本数, 60, 1)
print(f"测试集输入形状:{X_test.shape}")    # (样本数, 60, 1)
print(f"训练集输出形状:{y_train.shape}")  # (样本数,)
print(f"测试集输出形状:{y_test.shape}")    # (样本数,)

# 绘制收盘价曲线
plt.figure(figsize=(12, 6))
plt.plot(df['Close'], label='AAPL Close Price')
plt.title('AAPL Historical Close Price (2010-2024)')
plt.xlabel('Date')
plt.ylabel('Close Price (USD)')
plt.legend()
plt.show()
(2)定义LSTM模型
import torch.nn as nn

class LSTM_Stock(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout):
        super().__init__()
        self.hidden_size = hidden_size  # 隐藏层维度
        self.num_layers = num_layers    # LSTM层数
        
        # LSTM层(input_size=1,仅收盘价一个特征)
        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers,
            batch_first=True,  # 输入格式:[batch_size, time_step, input_size]
            dropout=dropout if num_layers > 1 else 0
        )
        # 全连接层(LSTM输出→预测值)
        self.fc = nn.Linear(hidden_size, output_size)
        # Dropout层
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x: (batch_size, time_step, input_size)
        
        # 初始化隐藏状态和细胞状态(初始为0)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # LSTM前向传播(output: (batch_size, time_step, hidden_size))
        # hn, cn: 最后一个时刻的隐藏状态和细胞状态
        output, (hn, cn) = self.lstm(x, (h0, c0))
        
        # 取最后一个时刻的输出(因为预测第61天,只需要最后一步的结果)
        output = self.dropout(hn[-1, :, :])  # (batch_size, hidden_size)
        # 全连接层输出(预测值)
        output = self.fc(output)  # (batch_size, output_size)
        return output

# 初始化模型参数
INPUT_SIZE = 1       # 输入特征数(仅收盘价)
HIDDEN_SIZE = 128    # 隐藏层维度
NUM_LAYERS = 2       # 2层LSTM
OUTPUT_SIZE = 1      # 输出维度(预测收盘价)
DROPOUT = 0.2        # Dropout概率(避免过拟合)

model = LSTM_Stock(
    input_size=INPUT_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    output_size=OUTPUT_SIZE,
    dropout=DROPOUT
).to(device)

print(model)
(3)模型训练与评估
import torch.optim as optim
import math

# 1. 定义优化器和损失函数(回归任务用MSE)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()

# 2. 构建数据加载器(批量训练)
BATCH_SIZE = 32
# 训练集
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# 测试集
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# 3. 训练函数
def train(model, loader, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for X_batch, y_batch in loader:
        optimizer.zero_grad()
        preds = model(X_batch)  # (batch_size, 1)
        loss = criterion(preds.squeeze(), y_batch)  # 挤压维度,与y_batch匹配
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item() * X_batch.size(0)  # 按样本数加权
    # 平均损失(总损失/总样本数)
    return epoch_loss / len(loader.dataset)

# 4. 评估函数
def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in loader:
            preds = model(X_batch)
            loss = criterion(preds.squeeze(), y_batch)
            epoch_loss += loss.item() * X_batch.size(0)
    return epoch_loss / len(loader.dataset)

# 5. 开始训练(50个epoch)
N_EPOCHS = 50
best_val_loss = float('inf')
train_losses = []
test_losses = []

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion)
    test_loss = evaluate(model, test_loader, criterion)
    
    # 保存最佳模型
    if test_loss < best_val_loss:
        best_val_loss = test_loss
        torch.save(model.state_dict(), 'lstm_stock.pt')
    
    # 记录损失
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    
    # 打印结果(同时输出RMSE,更直观)
    train_rmse = math.sqrt(train_loss)
    test_rmse = math.sqrt(test_loss)
    print(f'Epoch {epoch+1}/{N_EPOCHS}')
    print(f'训练损失(MSE):{train_loss:.6f} | 训练RMSE:{train_rmse:.4f}')
    print(f'测试损失(MSE):{test_loss:.6f} | 测试RMSE:{test_rmse:.4f}')
    print('-' * 50)

# 6. 预测与可视化(加载最佳模型)
model.load_state_dict(torch.load('lstm_stock.pt'))
model.eval()

# 训练集预测
with torch.no_grad():
    train_preds = model(X_train).cpu().numpy()
    test_preds = model(X_test).cpu().numpy()

# 反归一化(将预测值转回原始价格)
train_preds = scaler.inverse_transform(train_preds)
test_preds = scaler.inverse_transform(test_preds)
y_train_original = scaler.inverse_transform(y_train.cpu().numpy().reshape(-1, 1))
y_test_original = scaler.inverse_transform(y_test.cpu().numpy().reshape(-1, 1))

# 构建完整的预测曲线(用于绘图)
# 训练集部分
train_plot = np.empty_like(scaled_data)
train_plot[:, :] = np.nan
train_plot[TIME_STEP:TIME_STEP+len(train_preds), :] = train_preds
# 测试集部分
test_plot = np.empty_like(scaled_data)
test_plot[:, :] = np.nan
test_plot[TIME_STEP+len(train_preds):, :] = test_preds

# 绘制预测结果
plt.figure(figsize=(14, 8))
plt.plot(scaler.inverse_transform(scaled_data), label='真实收盘价', color='blue')
plt.plot(train_plot, label='训练集预测', color='green', alpha=0.7)
plt.plot(test_plot, label='测试集预测', color='red', alpha=0.7)
plt.title('AAPL股价预测(LSTM)')
plt.xlabel('时间(天数)')
plt.ylabel('收盘价(USD)')
plt.legend()
plt.show()
(4)结果分析
  • 预期效果:测试RMSE约5~8(取决于市场波动),LSTM能较好拟合股价的趋势(上升/下降),但无法预测突发波动(如黑天鹅事件);
  • 为什么用LSTM?因为股价依赖长期趋势(60天),LSTM的长期记忆能力比GRU强,预测精度更高;
  • 优化方向:加入更多特征(如开盘价、成交量、MACD指标),可进一步降低RMSE。

4.3 场景3:关键词识别(LSTM实现)——复杂时序任务

任务目标:识别语音中的关键词“yes”和“no”(语音序列长、依赖复杂,适合用LSTM)。我们用Google的SpeechCommands数据集(简化版,仅保留“yes”“no”两类)。

(1)数据加载与预处理

语音数据需要转为MFCC特征(语音信号的常用特征,将时域信号转为频域特征):

import os
import torch
import torchaudio
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader

# 1. 下载SpeechCommands数据集(简化版,仅yes/no)
# 先创建数据目录
data_dir = './speech_commands'
os.makedirs(data_dir, exist_ok=True)
# 下载yes和no类别的语音文件(共约4000个wav文件)
!wget -q -P $data_dir https://storage.googleapis.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz
!tar -xf $data_dir/speech_commands_v0.02.tar.gz -C $data_dir
# 只保留yes和no文件夹
import shutil
for folder in os.listdir(data_dir):
    if folder not in ['yes', 'no', 'README.md']:
        path = os.path.join(data_dir, folder)
        if os.path.isdir(path):
            shutil.rmtree(path)

# 2. 定义语音数据集类(加载wav文件,提取MFCC特征)
class SpeechDataset(Dataset):
    def __init__(self, data_dir, sample_rate=16000, n_mfcc=13):
        self.data_dir = data_dir
        self.sample_rate = sample_rate
        self.n_mfcc = n_mfcc  # MFCC特征数
        self.classes = ['no', 'yes']  # 标签:0=no,1=yes
        self.file_paths = []
        self.labels = []
        
        # 收集所有wav文件路径和标签
        for cls in self.classes:
            cls_dir = os.path.join(data_dir, cls)
            for filename in os.listdir(cls_dir):
                if filename.endswith('.wav'):
                    self.file_paths.append(os.path.join(cls_dir, filename))
                    self.labels.append(self.classes.index(cls))
        
        # 语音预处理:统一长度(1秒=16000个采样点)
        self.max_len = sample_rate  # 1秒
        
    def __len__(self):
        return len(self.file_paths)
    
    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        label = self.labels[idx]
        
        # 加载wav文件(torchaudio.load返回:波形张量,采样率)
        waveform, sr = torchaudio.load(file_path)
        # 重采样(确保采样率一致)
        if sr != self.sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.sample_rate)
            waveform = resampler(waveform)
        # 统一长度(截断或补零到1秒)
        waveform = waveform.squeeze()  # 转为1D张量
        if len(waveform) < self.max_len:
            waveform = torch.nn.functional.pad(waveform, (0, self.max_len - len(waveform)))
        else:
            waveform = waveform[:self.max_len]
        
        # 提取MFCC特征(输入:[sample_rate] → 输出:[n_mfcc, time_steps])
        mfcc_transform = torchaudio.transforms.MFCC(
            sample_rate=self.sample_rate,
            n_mfcc=self.n_mfcc
        )
        mfcc = mfcc_transform(waveform)  # (n_mfcc, time_steps)
        # 转置为[time_steps, n_mfcc](符合LSTM输入格式:[time_step, input_size])
        mfcc = mfcc.transpose(0, 1)
        
        return mfcc, label

# 3. 创建数据集和数据加载器
dataset = SpeechDataset(data_dir)
# 划分训练集和测试集(8:2)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, test_size],
    generator=torch.Generator().manual_seed(42)
)

# 数据加载器
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

# 查看数据信息
mfcc_sample, label_sample = dataset[0]
print(f"MFCC特征形状:{mfcc_sample.shape}")  # (time_steps, n_mfcc) → 约94个时间步,13个特征
print(f"训练集样本数:{len(train_dataset)}")
print(f"测试集样本数:{len(test_dataset)}")
print(f"标签:{dataset.classes[label_sample]}")

# 绘制MFCC特征图
plt.figure(figsize=(10, 4))
plt.imshow(mfcc_sample.transpose(0, 1), cmap='viridis', aspect='auto')
plt.title('MFCC Feature of Keyword "{}"'.format(dataset.classes[label_sample]))
plt.xlabel('Time Steps')
plt.ylabel('MFCC Coefficients')
plt.colorbar()
plt.show()
(2)定义LSTM模型
import torch.nn as nn

class LSTM_Keyword(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层(input_size=13,MFCC特征数)
        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers,
            batch_first=True,  # 输入格式:[batch_size, time_step, input_size]
            bidirectional=True,  # 双向LSTM(语音需要前后文信息)
            dropout=dropout if num_layers > 1 else 0
        )
        # 全连接层(双向LSTM输出维度=2×hidden_size)
        self.fc = nn.Linear(hidden_size * 2, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x: (batch_size, time_step, input_size)
        
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)  # 2=双向
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
        
        # LSTM前向传播
        output, (hn, cn) = self.lstm(x, (h0, c0))  # output: (batch_size, time_step, 2×hidden_size)
        
        # 取最后一个时间步的输出(关键词识别关注整个语音序列的特征)
        output = self.dropout(output[:, -1, :])  # (batch_size, 2×hidden_size)
        # 全连接层输出(二分类)
        output = self.fc(output)  # (batch_size, num_classes)
        return output

# 初始化模型参数
INPUT_SIZE = 13      # 输入特征数(MFCC系数数)
HIDDEN_SIZE = 64     # 隐藏层维度
NUM_LAYERS = 2       # 2层LSTM
NUM_CLASSES = 2      # 输出类别数(yes/no)
DROPOUT = 0.3        # Dropout概率

model = LSTM_Keyword(
    input_size=INPUT_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    num_classes=NUM_CLASSES,
    dropout=DROPOUT
).to(device)

print(model)
(3)模型训练与评估
import torch.optim as optim
import torch.nn.functional as F

# 1. 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()  # 多分类交叉熵

# 2. 准确率计算函数
def accuracy(preds, labels):
    _, predicted = torch.max(preds, 1)  # 取概率最大的类别
    correct = (predicted == labels).sum().item()
    return correct / len(labels)

# 3. 训练函数
def train(model, loader, optimizer, criterion):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    for batch in loader:
        mfcc, labels = batch
        mfcc = mfcc.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        preds = model(mfcc)  # (batch_size, num_classes)
        loss = criterion(preds, labels)
        acc = accuracy(preds, labels)
        
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item() * mfcc.size(0)
        epoch_acc += acc * mfcc.size(0)
    
    return epoch_loss / len(loader.dataset), epoch_acc / len(loader.dataset)

# 4. 评估函数
def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0
    with torch.no_grad():
        for batch in loader:
            mfcc, labels = batch
            mfcc = mfcc.to(device)
            labels = labels.to(device)
            
            preds = model(mfcc)
            loss = criterion(preds, labels)
            acc = accuracy(preds, labels)
            
            epoch_loss += loss.item() * mfcc.size(0)
            epoch_acc += acc * mfcc.size(0)
    
    return epoch_loss / len(loader.dataset), epoch_acc / len(loader.dataset)

# 5. 开始训练(20个epoch)
N_EPOCHS = 20
best_val_acc = 0.0
train_losses = []
train_accs = []
test_losses = []
test_accs = []

for epoch in range(N_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    test_loss, test_acc = evaluate(model, test_loader, criterion)
    
    # 保存最佳模型
    if test_acc > best_val_acc:
        best_val_acc = test_acc
        torch.save(model.state_dict(), 'lstm_keyword.pt')
    
    # 记录结果
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    test_losses.append(test_loss)
    test_accs.append(test_acc)
    
    # 打印结果
    print(f'Epoch {epoch+1}/{N_EPOCHS}')
    print(f'训练损失:{train_loss:.4f} | 训练准确率:{train_acc:.4f}')
    print(f'测试损失:{test_loss:.4f} | 测试准确率:{test_acc:.4f}')
    print('-' * 50)

# 6. 绘制训练曲线
plt.figure(figsize=(12, 4))
# 损失曲线
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='训练损失')
plt.plot(test_losses, label='测试损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
# 准确率曲线
plt.subplot(1, 2, 2)
plt.plot(train_accs, label='训练准确率')
plt.plot(test_accs, label='测试准确率')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# 最终测试准确率
print(f'最佳测试准确率:{best_val_acc:.4f}')
(4)结果分析
  • 预期效果:测试准确率约95%~97%,LSTM能有效识别“yes”和“no”;
  • 为什么用双向LSTM?因为语音中的关键词依赖前后的静音或其他声音(比如“yes”可能前面有“say”),双向能同时考虑过去和未来的信息;
  • 为什么不用GRU?因为语音序列较长(1秒=94个时间步),且特征复杂(MFCC有13个维度),LSTM的长期记忆能力更优。

五、南木的学习路径建议:从入门到实战

很多人学RNN/LSTM/GRU会陷入“公式劝退”或“代码跑不通”的困境,我结合自己的经验,整理了一条“循序渐进”的学习路径:

阶段1:基础铺垫(1~2周)

  • 数学基础:回顾微积分(梯度下降)、线性代数(矩阵乘法)、概率论(交叉熵)——不用深钻,够用就行;
  • 工具基础:掌握PyTorch的核心操作(张量、模型定义、训练流程),推荐看PyTorch官方教程《60分钟快速入门》;
  • 时序概念:理解“时间步”“序列长度”“特征数”的定义,搞懂LSTM输入格式[batch_size, time_step, input_size]的含义。

阶段2:核心原理(2~3周)

  • 先学RNN:手动计算1个简单案例(比如用RNN处理3个词的句子),理解隐藏状态的传递过程,搞懂梯度消失的原因;
  • 再学LSTM:重点理解3个门控的作用(不用死记公式,用“文件夹”比喻辅助记忆),推荐看Colah的博客《Understanding LSTM Networks》(图文并茂,最经典);
  • 最后学GRU:对比LSTM的结构差异,理解“简化”的逻辑(合并门控、去掉细胞状态),搞懂“什么时候用GRU更合适”。

阶段3:实战落地(3~4周)

  • 入门项目:从文本分类(如本文场景1)开始,熟悉torchtext或Hugging Face的使用,掌握变长序列处理(pack_padded_sequence);
  • 进阶项目:尝试时序预测(如股价、天气),重点掌握数据归一化、时间步长选择、预测结果反归一化;
  • 复杂项目:挑战语音识别或机器翻译,学习特征提取(如MFCC)、双向RNN、多层RNN的使用。

阶段4:进阶扩展(长期)

  • 模型优化:学习RNN的正则化方法(Dropout、权重衰减)、学习率调度(ReduceLROnPlateau);
  • 对比学习:了解Transformer(如BERT、GPT)在时序任务上的应用,搞懂“为什么Transformer能替代RNN”;
  • 行业应用:结合具体领域(如金融时序、医疗信号、工业传感器),学习领域特定的预处理方法。

RNN/LSTM/GRU的进化史,本质是“不断平衡‘记忆能力’和‘效率’”的过程:

  • RNN证明了“循环结构能处理时序依赖”,但败给了长序列;
  • LSTM用门控和细胞状态突破了长序列限制,但牺牲了效率;
  • GRU在两者之间找到了平衡点,成为多数场景的“性价比之选”。

在这里插入图片描述

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐