从RNN到LSTM/GRU:时序数据处理的“进化史”——3分钟搞懂区别,附3大实战场景代码(文本/股价/语音)
1. 词嵌入层(将词索引转为向量)# 2. GRU层(n_layers=2层,bidirectional=False=单向)embedding_dim, # 输入维度(词嵌入维度)hidden_dim, # 隐藏层维度num_layers=n_layers, # 层数bidirectional=False, # 单向GRU(情感分析无需双向)dropout=dropout if n_layers
作为AI技术专家兼学习规划博主,我每天都会收到读者的类似提问:
“南木,RNN明明能处理时序数据,为啥还要费劲学LSTM?”
“GRU和LSTM看起来差不多,实际项目里该怎么选?”
“用PyTorch写时序预测,跑出来的结果总是飘,问题出在哪?”
其实时序数据处理的核心矛盾,从始至终都是“如何记住关键信息,同时忘掉噪音”——RNN的出现解决了“时序依赖”的初步需求,却栽在“长序列记不住”的坑里;LSTM用“门控机制”补上了这个漏洞,却带来了“参数太多、训练慢”的新问题;GRU则在两者之间做了平衡,成为“效率优先”场景的首选。
这篇文章会用“进化逻辑+原理拆解+实战对比”的方式,把RNN/LSTM/GRU讲透:从三者的核心区别,到各自的适用场景,最后用PyTorch实现文本情感分析(GRU)、股价预测(LSTM)、关键词识别(LSTM) 三个实战案例。全程无冗余理论,每个知识点都配“通俗解释+代码验证”,看完就能落地。
同时需要学习规划、就业指导、技术答疑和系统课程学习的同学 欢迎扫码交流

一、先搞懂:为什么时序数据需要“特殊对待”?
在讲RNN之前,我们得先明确一个前提:为什么CNN、MLP(多层感知机)处理不了时序数据?
因为时序数据的核心是“前后依赖关系”——比如:
- 文本中,“他今天没带伞,所以______”,横线处的词(淋湿/回家)依赖前面的“没带伞”;
- 股价中,“过去5天连续上涨”的趋势,会影响第6天的涨跌判断;
- 语音中,“你好,我是______”,后面的名字依赖“我是”这个上下文。
而传统模型(如CNN、MLP)的输入是“独立同分布”的——每个样本之间没有关联,比如识别图片中的猫,每张图的像素都是独立的,不需要考虑上一张图的内容。这种“无记忆”的特性,注定了它们处理不了时序数据。
RNN的革命性在于:它给网络加了“记忆”——让当前的输出不仅依赖当前输入,还依赖上一时刻的“隐藏状态”(可以理解为“过去的记忆”)。
二、时序处理的“进化三阶段”:RNN→LSTM→GRU
我们按“问题-解决方案”的进化逻辑,逐个拆解三个模型的核心原理,重点讲清“为什么需要下一个模型”。
2.1 第一阶段:RNN(循环神经网络)——时序处理的“启蒙者”
(1)核心结构:一个“循环”解决依赖问题
RNN的结构非常简单,核心是“隐藏层的循环连接”——可以理解为“把同一个神经网络模块(RNN Cell)重复使用,每个时刻的模块都接收当前输入和上一时刻的记忆,输出当前结果和更新后的记忆”。
用公式和通俗解释结合说明(假设第t时刻):
- 输入:当前时刻的输入向量 ( x_t )(比如文本中的第t个词的嵌入向量);
- 隐藏状态:上一时刻的记忆 ( h_{t-1} )(比如第t-1个词的语义信息);
- 输出:当前时刻的隐藏状态 ( h_t )(更新后的记忆,会传给下一时刻)和预测输出 ( y_t )(比如对第t个词的分类结果)。
核心公式(简化版):
( h_t = \tanh(W_{xh}x_t + W_{hh}h_{t-1} + b_h) )
( y_t = W_{hy}h_t + b_y )
- ( W_{xh} ):输入到隐藏层的权重矩阵;
- ( W_{hh} ):隐藏层到自身(循环)的权重矩阵;
- ( \tanh ):激活函数(把输出压缩到[-1,1],避免数值爆炸);
- ( b_h, b_y ):偏置项。
通俗比喻:RNN就像一个“记笔记的学生”——听课时(处理输入 ( x_t )),会结合之前的笔记(( h_{t-1} )),更新笔记内容(( h_t )),最后根据笔记回答问题(( y_t ))。
(2)致命缺陷:长序列的“梯度消失/爆炸”
RNN的问题出在“循环连接”的梯度传播上——训练时,梯度需要从最后一个时刻反向传播到第一个时刻,而每次传播都会乘以 ( W_{hh} )。
- 如果 ( W_{hh} ) 的特征值小于1:梯度会像“多米诺骨牌”一样越传越小,最后趋近于0(梯度消失)——前面时刻的信息传不到后面,比如读100个词的句子,RNN记不住第1个词的内容;
- 如果 ( W_{hh} ) 的特征值大于1:梯度会越传越大,最终超出计算范围(梯度爆炸)——模型参数被破坏,无法训练。
实际影响:RNN最多只能处理“几十步”的短序列(比如20个词的句子),对于长文本(如新闻)、长语音(如10秒以上)完全无能为力。
2.2 第二阶段:LSTM(长短期记忆网络)——解决“长记忆”的“工程师”
为了解决RNN的“健忘症”,1997年Hochreiter & Schmidhuber提出了LSTM——核心思路是“设计一套‘门控机制’,让网络自主决定‘记住什么、忘记什么’”,就像给“学生”加了一个“文件夹”:重要的笔记(长期信息)放进文件夹保存,不重要的临时笔记(短期噪音)直接丢掉。
(1)核心改进:3个门控+1个细胞状态
LSTM在RNN的基础上,增加了两个关键组件:
- 细胞状态(Cell State):相当于“长期记忆传送带”——信息在上面平稳传递,只有少量修改(通过门控),避免梯度快速衰减;
- 门控(Gates):相当于“传送带的开关”——通过sigmoid激活函数(输出0~1,0表示关闭,1表示打开)控制信息的流入和流出,共3个门:遗忘门、输入门、输出门。
我们按“信息处理流程”拆解每个组件的作用(第t时刻):
① 遗忘门(Forget Gate):决定“该忘记什么”
作用:判断上一时刻的细胞状态 ( C_{t-1} ) 中,哪些信息需要保留,哪些需要丢弃。
公式:( f_t = \sigma(W_f [h_{t-1}, x_t] + b_f) )
- ( [h_{t-1}, x_t] ):上一时刻隐藏状态和当前输入的拼接;
- ( \sigma ):sigmoid函数,输出0~1(0=完全遗忘,1=完全保留);
- ( W_f, b_f ):遗忘门的权重和偏置。
通俗比喻:读句子“小明昨天去了上海,今天他______”,遗忘门会决定“保留‘小明’‘今天’,忘记‘昨天’‘上海’(因为当前主语还是小明,时间是今天)”。
② 输入门(Input Gate):决定“该记住什么新信息”
作用:有两个步骤,先筛选当前输入的重要信息,再更新到细胞状态中。
步骤1:筛选新信息(用sigmoid选“要记的”,用tanh生成“候选信息”)
( i_t = \sigma(W_i [h_{t-1}, x_t] + b_i) )(输入门开关,0=不记,1=记)
( \tilde{C}t = \tanh(W_C [h{t-1}, x_t] + b_C) )(当前输入的候选信息,值在[-1,1])
步骤2:更新细胞状态(遗忘旧信息+加入新信息)
( C_t = f_t \odot C_{t-1} + i_t \odot \tilde{C}_t )
- ( \odot ):元素-wise乘法(对应位置相乘)。
通俗比喻:输入“今天他去了北京”,输入门会决定“记住‘去了北京’,并把细胞状态中的‘上海’替换成‘北京’”。
③ 输出门(Output Gate):决定“该输出什么”
作用:从更新后的细胞状态 ( C_t ) 中,筛选出需要传给下一时刻的“短期记忆”(隐藏状态 ( h_t ))。
公式:
( o_t = \sigma(W_o [h_{t-1}, x_t] + b_o) )(输出门开关)
( h_t = o_t \odot \tanh(C_t) )(当前隐藏状态,传给下一时刻)
通俗比喻:细胞状态中保存了“小明今天去了北京”,输出门会决定“输出‘小明去了北京’这个信息,用于预测下一个词(比如‘明天会去天津’)”。
(2)LSTM的优势与缺点
- 优势:通过细胞状态和门控机制,大幅缓解了梯度消失问题,能处理“几百步”的长序列(比如100个词的文本、30天的股价);
- 缺点:参数数量是RNN的4倍(3个门控+1个细胞状态,每个都有独立权重),训练速度慢、占用内存多——比如同样的隐藏层维度,LSTM的训练时间是RNN的3~4倍。
2.3 第三阶段:GRU(门控循环单元)——平衡“性能与效率”的“优化者”
2014年,Cho等人提出了GRU——核心是“简化LSTM的结构,在保证性能的前提下减少参数,提升训练效率”。
GRU做了两个关键简化:
- 合并“细胞状态”和“隐藏状态”:去掉LSTM的细胞状态 ( C_t ),只用隐藏状态 ( h_t ) 同时存储长期和短期信息;
- 减少门控数量:把LSTM的“遗忘门+输入门”合并成“更新门”,去掉“输出门”,只保留“更新门”和“重置门”两个门控。
(1)核心结构:2个门控+1个隐藏状态
我们同样按流程拆解(第t时刻):
① 重置门(Reset Gate):决定“是否忽略过去的记忆”
作用:判断是否需要“清空”上一时刻的隐藏状态 ( h_{t-1} ),专注于当前输入 ( x_t )。
公式:( r_t = \sigma(W_r [h_{t-1}, x_t] + b_r) )
- ( r_t ) 输出0~1:0=完全忽略过去记忆,1=完全保留过去记忆。
通俗比喻:读句子“张三昨天买了苹果,李四今天买了______”,重置门会决定“忽略‘张三’‘苹果’的记忆,专注于‘李四’这个新主语”。
② 更新门(Update Gate):决定“该保留多少过去/现在的信息”
作用:同时实现LSTM中“遗忘门”和“输入门”的功能——既决定忘记多少过去的记忆,也决定记住多少当前的新信息。
步骤1:计算更新门开关
( z_t = \sigma(W_z [h_{t-1}, x_t] + b_z) )(0=不保留过去,1=不保留现在)
步骤2:生成候选隐藏状态(结合重置门筛选的过去记忆和当前输入)
( \tilde{h}t = \tanh(W_h [r_t \odot h{t-1}, x_t] + b_h) )
步骤3:更新隐藏状态(过去记忆×保留比例 + 新信息×保留比例)
( h_t = (1 - z_t) \odot h_{t-1} + z_t \odot \tilde{h}_t )
- ( (1 - z_t) \odot h_{t-1} ):保留的过去记忆;
- ( z_t \odot \tilde{h}_t ):保留的当前新信息。
通俗比喻:输入“李四今天买了香蕉”,更新门会决定“保留‘李四’的记忆,忘记‘张三’,并加入‘香蕉’的新信息”。
(2)GRU的优势与缺点
- 优势:参数数量比LSTM少30%~40%,训练速度快、内存占用少;在多数场景(如文本分类、短时序预测)中,性能接近LSTM;
- 缺点:对于超长长序列(如500步以上)或复杂依赖(如语音识别),长期记忆能力略逊于LSTM——因为没有独立的细胞状态,长期信息容易被短期信息干扰。
三、核心区别对比:RNN/LSTM/GRU该怎么选?
很多人纠结“选哪个模型”,本质是没搞懂三者的“trade-off(权衡)”——下表从5个关键维度做对比,帮你快速决策:
| 维度 | RNN | LSTM | GRU |
|---|---|---|---|
| 核心结构 | 1个隐藏状态,无门控 | 1个细胞状态+1个隐藏状态,3个门控 | 1个隐藏状态,2个门控 |
| 参数规模 | 最小(基准) | 最大(RNN的4倍) | 中等(LSTM的70%) |
| 训练效率 | 最快 | 最慢 | 较快(LSTM的1.5~2倍) |
| 长序列能力 | 弱(最多几十步) | 强(几百~上千步) | 中(几百步) |
| 适用场景 | 超短序列(如关键词)、简单任务 | 长序列(如语音、长文本)、高精度需求 | 中短序列(如情感分析、股价预测)、效率优先 |
| 常见误区 | 现在很少用(仅教学) | 不是“万能”,超长长序列仍会梯度消失 | 不是“LSTM简化版=性能差”,多数场景足够 |
一句话总结选择逻辑:
- 若序列短(<50步)、任务简单、资源有限 → 优先GRU;
- 若序列长(>100步)、任务复杂(如语音、机器翻译)、追求高精度 → 优先LSTM;
- 若只是入门学习 → 先学RNN理解原理,再学LSTM/GRU落地。
四、实战环节:3个场景用PyTorch实现(附完整代码)
我们选择3个最典型的时序任务,分别用GRU和LSTM实现,对比效果差异。所有代码都可复现,依赖库用PyTorch+常用数据处理库(pandas、numpy、matplotlib)。
4.1 场景1:文本情感分析(GRU实现)——短序列任务
任务目标:对IMDB电影评论进行二分类(正面/负面情感),评论长度约100~200词(中短序列),适合用GRU(效率高)。
(1)环境准备
# 安装依赖(已安装可跳过)
pip install torch torchvision pandas numpy matplotlib scikit-learn
pip install torchtext # 处理文本数据(IMDB数据集)
(2)数据加载与预处理
用torchtext加载IMDB数据集,做分词、词嵌入(将词转为向量):
import torch
from torchtext.datasets import IMDB
from torchtext.data import Field, LabelField, BucketIterator
import random
# 1. 定义文本和标签的处理方式
TEXT = Field(
tokenize='spacy', # 用spaCy分词(需提前安装:pip install spacy en_core_web_sm)
tokenizer_language='en_core_web_sm',
lower=True, # 字母小写
include_lengths=True # 保留句子长度(用于后续padding处理)
)
LABEL = LabelField(dtype=torch.float) # 标签:0=负面,1=正面
# 2. 加载IMDB数据集(训练集25000条,测试集25000条)
train_data, test_data = IMDB.splits(TEXT, LABEL)
# 3. 划分验证集(从训练集中分20%作为验证集)
train_data, val_data = train_data.split(random_state=random.seed(42), split_ratio=0.8)
# 4. 构建词汇表(用训练集的词,限制最大词汇量为10000)
TEXT.build_vocab(train_data, max_size=10000, vectors='glove.6B.100d') # 用GloVe预训练词嵌入(100维)
LABEL.build_vocab(train_data)
# 5. 构建迭代器(按句子长度分组,减少padding,加速训练)
BATCH_SIZE = 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 优先用GPU
train_iterator, val_iterator, test_iterator = BucketIterator.splits(
(train_data, val_data, test_data),
batch_size=BATCH_SIZE,
sort_within_batch=True, # 每个batch内按句子长度排序
device=device
)
# 查看数据信息
print(f"训练集样本数:{len(train_data)}")
print(f"验证集样本数:{len(val_data)}")
print(f"测试集样本数:{len(test_data)}")
print(f"词汇表大小:{len(TEXT.vocab)}")
print(f"标签分布:正面{LABEL.vocab.freqs['pos']}条,负面{LABEL.vocab.freqs['neg']}条")
(3)定义GRU模型
import torch.nn as nn
class GRU_Sentiment(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
super().__init__()
# 1. 词嵌入层(将词索引转为向量)
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# 2. GRU层(n_layers=2层,bidirectional=False=单向)
self.gru = nn.GRU(
embedding_dim, # 输入维度(词嵌入维度)
hidden_dim, # 隐藏层维度
num_layers=n_layers, # 层数
bidirectional=False, # 单向GRU(情感分析无需双向)
dropout=dropout if n_layers > 1 else 0 # 多层时才用dropout
)
# 3. 全连接层(将GRU输出转为2分类)
self.fc = nn.Linear(hidden_dim, output_dim)
# 4. Dropout层(防止过拟合)
self.dropout = nn.Dropout(dropout)
def forward(self, text, text_lengths):
# text: (句子长度, batch_size) → 注意torchtext的输入格式是[seq_len, batch]
# text_lengths: (batch_size) → 每个句子的实际长度(用于pack_padded_sequence)
# 步骤1:词嵌入 → (seq_len, batch_size, embedding_dim)
embedded = self.dropout(self.embedding(text))
# 步骤2:处理变长序列(用pack_padded_sequence,避免padding部分参与计算)
packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths)
packed_output, hidden = self.gru(packed_embedded) # hidden: (n_layers, batch_size, hidden_dim)
# 步骤3:GRU输出取最后一层的隐藏状态(因为情感分析关注整个句子的语义)
hidden = self.dropout(hidden[-1, :, :]) # (batch_size, hidden_dim)
# 步骤4:全连接层输出 → (batch_size, output_dim)
output = self.fc(hidden)
return output
# 初始化模型参数
VOCAB_SIZE = len(TEXT.vocab)
EMBEDDING_DIM = 100 # 与GloVe词嵌入维度一致
HIDDEN_DIM = 256 # 隐藏层维度
OUTPUT_DIM = 1 # 输出维度(二分类,用sigmoid)
N_LAYERS = 2 # 2层GRU
DROPOUT = 0.5 # Dropout概率
model = GRU_Sentiment(
vocab_size=VOCAB_SIZE,
embedding_dim=EMBEDDING_DIM,
hidden_dim=HIDDEN_DIM,
output_dim=OUTPUT_DIM,
n_layers=N_LAYERS,
dropout=DROPOUT
).to(device)
# 加载预训练词嵌入(GloVe)
model.embedding.weight.data.copy_(TEXT.vocab.vectors)
# 初始化未在GloVe中的词(<unk>和<pad>)
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
print(model)
(4)模型训练与评估
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
# 1. 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = F.binary_cross_entropy_with_logits # 二分类交叉熵(带sigmoid)
# 2. 定义准确率计算函数
def binary_accuracy(preds, y):
# preds: (batch_size, 1) → 模型输出(未经过sigmoid)
# y: (batch_size, 1) → 真实标签
rounded_preds = torch.round(torch.sigmoid(preds)) # 四舍五入(0/1)
correct = (rounded_preds == y).float() # 正确预测的样本数
acc = correct.sum() / len(correct)
return acc
# 3. 训练函数
def train(model, iterator, optimizer, criterion):
model.train() # 训练模式(启用dropout)
epoch_loss = 0
epoch_acc = 0
for batch in iterator:
text, text_lengths = batch.text # 取文本和长度
labels = batch.label.unsqueeze(1) # 标签转为(batch_size, 1)
optimizer.zero_grad() # 清空梯度
preds = model(text, text_lengths) # 前向传播
loss = criterion(preds, labels) # 计算损失
acc = binary_accuracy(preds, labels) # 计算准确率
loss.backward() # 反向传播
optimizer.step() # 更新参数
epoch_loss += loss.item()
epoch_acc += acc.item()
# 平均损失和准确率(按batch数平均)
return epoch_loss / len(iterator), epoch_acc / len(iterator)
# 4. 评估函数
def evaluate(model, iterator, criterion):
model.eval() # 评估模式(关闭dropout)
epoch_loss = 0
epoch_acc = 0
with torch.no_grad(): # 禁用梯度计算
for batch in iterator:
text, text_lengths = batch.text
labels = batch.label.unsqueeze(1)
preds = model(text, text_lengths)
loss = criterion(preds, labels)
acc = binary_accuracy(preds, labels)
epoch_loss += loss.item()
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
# 5. 开始训练(10个epoch)
N_EPOCHS = 10
best_val_loss = float('inf') # 记录最佳验证损失(用于保存模型)
train_losses = []
val_losses = []
train_accs = []
val_accs = []
for epoch in range(N_EPOCHS):
train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
val_loss, val_acc = evaluate(model, val_iterator, criterion)
# 保存最佳模型(验证损失最小)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'gru_sentiment.pt')
# 记录损失和准确率(用于绘图)
train_losses.append(train_loss)
val_losses.append(val_loss)
train_accs.append(train_acc)
val_accs.append(val_acc)
# 打印结果
print(f'Epoch {epoch+1}/{N_EPOCHS}')
print(f'训练损失:{train_loss:.4f} | 训练准确率:{train_acc:.4f}')
print(f'验证损失:{val_loss:.4f} | 验证准确率:{val_acc:.4f}')
print('-' * 50)
# 6. 测试集评估(加载最佳模型)
model.load_state_dict(torch.load('gru_sentiment.pt'))
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'测试损失:{test_loss:.4f} | 测试准确率:{test_acc:.4f}')
# 7. 绘制损失和准确率曲线
plt.figure(figsize=(12, 4))
# 损失曲线
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='训练损失')
plt.plot(val_losses, label='验证损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
# 准确率曲线
plt.subplot(1, 2, 2)
plt.plot(train_accs, label='训练准确率')
plt.plot(val_accs, label='验证准确率')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
(5)结果分析
- 预期效果:测试准确率约88%~90%,GRU在情感分析任务上表现优秀;
- 为什么不用LSTM?因为评论长度在200词以内,GRU足够处理,且训练速度比LSTM快30%左右;
- 优化方向:用双向GRU(
bidirectional=True),可进一步提升准确率到92%左右(考虑句子前后文)。
4.2 场景2:股价预测(LSTM实现)——长序列任务
任务目标:根据苹果公司(AAPL)过去60天的股价数据,预测第61天的收盘价(长序列依赖,适合用LSTM)。
(1)数据加载与预处理
用yfinance获取AAPL的历史股价数据,做归一化(时序预测必须归一化,否则数值范围差异会影响模型):
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
# 1. 获取AAPL股价数据(2010-2024年)
ticker = 'AAPL'
start_date = '2010-01-01'
end_date = '2024-01-01'
data = yf.download(ticker, start=start_date, end=end_date)
# 只保留收盘价(预测目标)
df = data[['Close']].copy()
print(f"数据形状:{df.shape}")
print(df.head())
# 2. 数据归一化(将价格缩放到[0,1])
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(df)
# 3. 构建时序数据集(用过去60天预测未来1天)
def create_dataset(dataset, time_step=60):
X, y = [], []
for i in range(time_step, len(dataset)):
X.append(dataset[i-time_step:i, 0]) # 前60天数据(输入)
y.append(dataset[i, 0]) # 第61天数据(输出)
return np.array(X), np.array(y)
TIME_STEP = 60 # 时间步长(用过去60天预测)
X, y = create_dataset(scaled_data, TIME_STEP)
# 4. 数据格式转换(LSTM输入要求:[样本数, 时间步长, 特征数])
X = np.reshape(X, (X.shape[0], X.shape[1], 1)) # 特征数=1(仅收盘价)
# 5. 划分训练集和测试集(8:2)
train_size = int(0.8 * len(X))
test_size = len(X) - train_size
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[test_size:]
# 6. 转为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.float32).to(device)
# 查看数据信息
print(f"训练集输入形状:{X_train.shape}") # (样本数, 60, 1)
print(f"测试集输入形状:{X_test.shape}") # (样本数, 60, 1)
print(f"训练集输出形状:{y_train.shape}") # (样本数,)
print(f"测试集输出形状:{y_test.shape}") # (样本数,)
# 绘制收盘价曲线
plt.figure(figsize=(12, 6))
plt.plot(df['Close'], label='AAPL Close Price')
plt.title('AAPL Historical Close Price (2010-2024)')
plt.xlabel('Date')
plt.ylabel('Close Price (USD)')
plt.legend()
plt.show()
(2)定义LSTM模型
import torch.nn as nn
class LSTM_Stock(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size, dropout):
super().__init__()
self.hidden_size = hidden_size # 隐藏层维度
self.num_layers = num_layers # LSTM层数
# LSTM层(input_size=1,仅收盘价一个特征)
self.lstm = nn.LSTM(
input_size,
hidden_size,
num_layers,
batch_first=True, # 输入格式:[batch_size, time_step, input_size]
dropout=dropout if num_layers > 1 else 0
)
# 全连接层(LSTM输出→预测值)
self.fc = nn.Linear(hidden_size, output_size)
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# x: (batch_size, time_step, input_size)
# 初始化隐藏状态和细胞状态(初始为0)
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
# LSTM前向传播(output: (batch_size, time_step, hidden_size))
# hn, cn: 最后一个时刻的隐藏状态和细胞状态
output, (hn, cn) = self.lstm(x, (h0, c0))
# 取最后一个时刻的输出(因为预测第61天,只需要最后一步的结果)
output = self.dropout(hn[-1, :, :]) # (batch_size, hidden_size)
# 全连接层输出(预测值)
output = self.fc(output) # (batch_size, output_size)
return output
# 初始化模型参数
INPUT_SIZE = 1 # 输入特征数(仅收盘价)
HIDDEN_SIZE = 128 # 隐藏层维度
NUM_LAYERS = 2 # 2层LSTM
OUTPUT_SIZE = 1 # 输出维度(预测收盘价)
DROPOUT = 0.2 # Dropout概率(避免过拟合)
model = LSTM_Stock(
input_size=INPUT_SIZE,
hidden_size=HIDDEN_SIZE,
num_layers=NUM_LAYERS,
output_size=OUTPUT_SIZE,
dropout=DROPOUT
).to(device)
print(model)
(3)模型训练与评估
import torch.optim as optim
import math
# 1. 定义优化器和损失函数(回归任务用MSE)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
# 2. 构建数据加载器(批量训练)
BATCH_SIZE = 32
# 训练集
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# 测试集
test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
# 3. 训练函数
def train(model, loader, optimizer, criterion):
model.train()
epoch_loss = 0
for X_batch, y_batch in loader:
optimizer.zero_grad()
preds = model(X_batch) # (batch_size, 1)
loss = criterion(preds.squeeze(), y_batch) # 挤压维度,与y_batch匹配
loss.backward()
optimizer.step()
epoch_loss += loss.item() * X_batch.size(0) # 按样本数加权
# 平均损失(总损失/总样本数)
return epoch_loss / len(loader.dataset)
# 4. 评估函数
def evaluate(model, loader, criterion):
model.eval()
epoch_loss = 0
with torch.no_grad():
for X_batch, y_batch in loader:
preds = model(X_batch)
loss = criterion(preds.squeeze(), y_batch)
epoch_loss += loss.item() * X_batch.size(0)
return epoch_loss / len(loader.dataset)
# 5. 开始训练(50个epoch)
N_EPOCHS = 50
best_val_loss = float('inf')
train_losses = []
test_losses = []
for epoch in range(N_EPOCHS):
train_loss = train(model, train_loader, optimizer, criterion)
test_loss = evaluate(model, test_loader, criterion)
# 保存最佳模型
if test_loss < best_val_loss:
best_val_loss = test_loss
torch.save(model.state_dict(), 'lstm_stock.pt')
# 记录损失
train_losses.append(train_loss)
test_losses.append(test_loss)
# 打印结果(同时输出RMSE,更直观)
train_rmse = math.sqrt(train_loss)
test_rmse = math.sqrt(test_loss)
print(f'Epoch {epoch+1}/{N_EPOCHS}')
print(f'训练损失(MSE):{train_loss:.6f} | 训练RMSE:{train_rmse:.4f}')
print(f'测试损失(MSE):{test_loss:.6f} | 测试RMSE:{test_rmse:.4f}')
print('-' * 50)
# 6. 预测与可视化(加载最佳模型)
model.load_state_dict(torch.load('lstm_stock.pt'))
model.eval()
# 训练集预测
with torch.no_grad():
train_preds = model(X_train).cpu().numpy()
test_preds = model(X_test).cpu().numpy()
# 反归一化(将预测值转回原始价格)
train_preds = scaler.inverse_transform(train_preds)
test_preds = scaler.inverse_transform(test_preds)
y_train_original = scaler.inverse_transform(y_train.cpu().numpy().reshape(-1, 1))
y_test_original = scaler.inverse_transform(y_test.cpu().numpy().reshape(-1, 1))
# 构建完整的预测曲线(用于绘图)
# 训练集部分
train_plot = np.empty_like(scaled_data)
train_plot[:, :] = np.nan
train_plot[TIME_STEP:TIME_STEP+len(train_preds), :] = train_preds
# 测试集部分
test_plot = np.empty_like(scaled_data)
test_plot[:, :] = np.nan
test_plot[TIME_STEP+len(train_preds):, :] = test_preds
# 绘制预测结果
plt.figure(figsize=(14, 8))
plt.plot(scaler.inverse_transform(scaled_data), label='真实收盘价', color='blue')
plt.plot(train_plot, label='训练集预测', color='green', alpha=0.7)
plt.plot(test_plot, label='测试集预测', color='red', alpha=0.7)
plt.title('AAPL股价预测(LSTM)')
plt.xlabel('时间(天数)')
plt.ylabel('收盘价(USD)')
plt.legend()
plt.show()
(4)结果分析
- 预期效果:测试RMSE约5~8(取决于市场波动),LSTM能较好拟合股价的趋势(上升/下降),但无法预测突发波动(如黑天鹅事件);
- 为什么用LSTM?因为股价依赖长期趋势(60天),LSTM的长期记忆能力比GRU强,预测精度更高;
- 优化方向:加入更多特征(如开盘价、成交量、MACD指标),可进一步降低RMSE。
4.3 场景3:关键词识别(LSTM实现)——复杂时序任务
任务目标:识别语音中的关键词“yes”和“no”(语音序列长、依赖复杂,适合用LSTM)。我们用Google的SpeechCommands数据集(简化版,仅保留“yes”“no”两类)。
(1)数据加载与预处理
语音数据需要转为MFCC特征(语音信号的常用特征,将时域信号转为频域特征):
import os
import torch
import torchaudio
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
# 1. 下载SpeechCommands数据集(简化版,仅yes/no)
# 先创建数据目录
data_dir = './speech_commands'
os.makedirs(data_dir, exist_ok=True)
# 下载yes和no类别的语音文件(共约4000个wav文件)
!wget -q -P $data_dir https://storage.googleapis.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz
!tar -xf $data_dir/speech_commands_v0.02.tar.gz -C $data_dir
# 只保留yes和no文件夹
import shutil
for folder in os.listdir(data_dir):
if folder not in ['yes', 'no', 'README.md']:
path = os.path.join(data_dir, folder)
if os.path.isdir(path):
shutil.rmtree(path)
# 2. 定义语音数据集类(加载wav文件,提取MFCC特征)
class SpeechDataset(Dataset):
def __init__(self, data_dir, sample_rate=16000, n_mfcc=13):
self.data_dir = data_dir
self.sample_rate = sample_rate
self.n_mfcc = n_mfcc # MFCC特征数
self.classes = ['no', 'yes'] # 标签:0=no,1=yes
self.file_paths = []
self.labels = []
# 收集所有wav文件路径和标签
for cls in self.classes:
cls_dir = os.path.join(data_dir, cls)
for filename in os.listdir(cls_dir):
if filename.endswith('.wav'):
self.file_paths.append(os.path.join(cls_dir, filename))
self.labels.append(self.classes.index(cls))
# 语音预处理:统一长度(1秒=16000个采样点)
self.max_len = sample_rate # 1秒
def __len__(self):
return len(self.file_paths)
def __getitem__(self, idx):
file_path = self.file_paths[idx]
label = self.labels[idx]
# 加载wav文件(torchaudio.load返回:波形张量,采样率)
waveform, sr = torchaudio.load(file_path)
# 重采样(确保采样率一致)
if sr != self.sample_rate:
resampler = torchaudio.transforms.Resample(sr, self.sample_rate)
waveform = resampler(waveform)
# 统一长度(截断或补零到1秒)
waveform = waveform.squeeze() # 转为1D张量
if len(waveform) < self.max_len:
waveform = torch.nn.functional.pad(waveform, (0, self.max_len - len(waveform)))
else:
waveform = waveform[:self.max_len]
# 提取MFCC特征(输入:[sample_rate] → 输出:[n_mfcc, time_steps])
mfcc_transform = torchaudio.transforms.MFCC(
sample_rate=self.sample_rate,
n_mfcc=self.n_mfcc
)
mfcc = mfcc_transform(waveform) # (n_mfcc, time_steps)
# 转置为[time_steps, n_mfcc](符合LSTM输入格式:[time_step, input_size])
mfcc = mfcc.transpose(0, 1)
return mfcc, label
# 3. 创建数据集和数据加载器
dataset = SpeechDataset(data_dir)
# 划分训练集和测试集(8:2)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
dataset, [train_size, test_size],
generator=torch.Generator().manual_seed(42)
)
# 数据加载器
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
# 查看数据信息
mfcc_sample, label_sample = dataset[0]
print(f"MFCC特征形状:{mfcc_sample.shape}") # (time_steps, n_mfcc) → 约94个时间步,13个特征
print(f"训练集样本数:{len(train_dataset)}")
print(f"测试集样本数:{len(test_dataset)}")
print(f"标签:{dataset.classes[label_sample]}")
# 绘制MFCC特征图
plt.figure(figsize=(10, 4))
plt.imshow(mfcc_sample.transpose(0, 1), cmap='viridis', aspect='auto')
plt.title('MFCC Feature of Keyword "{}"'.format(dataset.classes[label_sample]))
plt.xlabel('Time Steps')
plt.ylabel('MFCC Coefficients')
plt.colorbar()
plt.show()
(2)定义LSTM模型
import torch.nn as nn
class LSTM_Keyword(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# LSTM层(input_size=13,MFCC特征数)
self.lstm = nn.LSTM(
input_size,
hidden_size,
num_layers,
batch_first=True, # 输入格式:[batch_size, time_step, input_size]
bidirectional=True, # 双向LSTM(语音需要前后文信息)
dropout=dropout if num_layers > 1 else 0
)
# 全连接层(双向LSTM输出维度=2×hidden_size)
self.fc = nn.Linear(hidden_size * 2, num_classes)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# x: (batch_size, time_step, input_size)
# 初始化隐藏状态和细胞状态
h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device) # 2=双向
c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
# LSTM前向传播
output, (hn, cn) = self.lstm(x, (h0, c0)) # output: (batch_size, time_step, 2×hidden_size)
# 取最后一个时间步的输出(关键词识别关注整个语音序列的特征)
output = self.dropout(output[:, -1, :]) # (batch_size, 2×hidden_size)
# 全连接层输出(二分类)
output = self.fc(output) # (batch_size, num_classes)
return output
# 初始化模型参数
INPUT_SIZE = 13 # 输入特征数(MFCC系数数)
HIDDEN_SIZE = 64 # 隐藏层维度
NUM_LAYERS = 2 # 2层LSTM
NUM_CLASSES = 2 # 输出类别数(yes/no)
DROPOUT = 0.3 # Dropout概率
model = LSTM_Keyword(
input_size=INPUT_SIZE,
hidden_size=HIDDEN_SIZE,
num_layers=NUM_LAYERS,
num_classes=NUM_CLASSES,
dropout=DROPOUT
).to(device)
print(model)
(3)模型训练与评估
import torch.optim as optim
import torch.nn.functional as F
# 1. 定义优化器和损失函数
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss() # 多分类交叉熵
# 2. 准确率计算函数
def accuracy(preds, labels):
_, predicted = torch.max(preds, 1) # 取概率最大的类别
correct = (predicted == labels).sum().item()
return correct / len(labels)
# 3. 训练函数
def train(model, loader, optimizer, criterion):
model.train()
epoch_loss = 0
epoch_acc = 0
for batch in loader:
mfcc, labels = batch
mfcc = mfcc.to(device)
labels = labels.to(device)
optimizer.zero_grad()
preds = model(mfcc) # (batch_size, num_classes)
loss = criterion(preds, labels)
acc = accuracy(preds, labels)
loss.backward()
optimizer.step()
epoch_loss += loss.item() * mfcc.size(0)
epoch_acc += acc * mfcc.size(0)
return epoch_loss / len(loader.dataset), epoch_acc / len(loader.dataset)
# 4. 评估函数
def evaluate(model, loader, criterion):
model.eval()
epoch_loss = 0
epoch_acc = 0
with torch.no_grad():
for batch in loader:
mfcc, labels = batch
mfcc = mfcc.to(device)
labels = labels.to(device)
preds = model(mfcc)
loss = criterion(preds, labels)
acc = accuracy(preds, labels)
epoch_loss += loss.item() * mfcc.size(0)
epoch_acc += acc * mfcc.size(0)
return epoch_loss / len(loader.dataset), epoch_acc / len(loader.dataset)
# 5. 开始训练(20个epoch)
N_EPOCHS = 20
best_val_acc = 0.0
train_losses = []
train_accs = []
test_losses = []
test_accs = []
for epoch in range(N_EPOCHS):
train_loss, train_acc = train(model, train_loader, optimizer, criterion)
test_loss, test_acc = evaluate(model, test_loader, criterion)
# 保存最佳模型
if test_acc > best_val_acc:
best_val_acc = test_acc
torch.save(model.state_dict(), 'lstm_keyword.pt')
# 记录结果
train_losses.append(train_loss)
train_accs.append(train_acc)
test_losses.append(test_loss)
test_accs.append(test_acc)
# 打印结果
print(f'Epoch {epoch+1}/{N_EPOCHS}')
print(f'训练损失:{train_loss:.4f} | 训练准确率:{train_acc:.4f}')
print(f'测试损失:{test_loss:.4f} | 测试准确率:{test_acc:.4f}')
print('-' * 50)
# 6. 绘制训练曲线
plt.figure(figsize=(12, 4))
# 损失曲线
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='训练损失')
plt.plot(test_losses, label='测试损失')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
# 准确率曲线
plt.subplot(1, 2, 2)
plt.plot(train_accs, label='训练准确率')
plt.plot(test_accs, label='测试准确率')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
# 最终测试准确率
print(f'最佳测试准确率:{best_val_acc:.4f}')
(4)结果分析
- 预期效果:测试准确率约95%~97%,LSTM能有效识别“yes”和“no”;
- 为什么用双向LSTM?因为语音中的关键词依赖前后的静音或其他声音(比如“yes”可能前面有“say”),双向能同时考虑过去和未来的信息;
- 为什么不用GRU?因为语音序列较长(1秒=94个时间步),且特征复杂(MFCC有13个维度),LSTM的长期记忆能力更优。
五、南木的学习路径建议:从入门到实战
很多人学RNN/LSTM/GRU会陷入“公式劝退”或“代码跑不通”的困境,我结合自己的经验,整理了一条“循序渐进”的学习路径:
阶段1:基础铺垫(1~2周)
- 数学基础:回顾微积分(梯度下降)、线性代数(矩阵乘法)、概率论(交叉熵)——不用深钻,够用就行;
- 工具基础:掌握PyTorch的核心操作(张量、模型定义、训练流程),推荐看PyTorch官方教程《60分钟快速入门》;
- 时序概念:理解“时间步”“序列长度”“特征数”的定义,搞懂LSTM输入格式
[batch_size, time_step, input_size]的含义。
阶段2:核心原理(2~3周)
- 先学RNN:手动计算1个简单案例(比如用RNN处理3个词的句子),理解隐藏状态的传递过程,搞懂梯度消失的原因;
- 再学LSTM:重点理解3个门控的作用(不用死记公式,用“文件夹”比喻辅助记忆),推荐看Colah的博客《Understanding LSTM Networks》(图文并茂,最经典);
- 最后学GRU:对比LSTM的结构差异,理解“简化”的逻辑(合并门控、去掉细胞状态),搞懂“什么时候用GRU更合适”。
阶段3:实战落地(3~4周)
- 入门项目:从文本分类(如本文场景1)开始,熟悉torchtext或Hugging Face的使用,掌握变长序列处理(
pack_padded_sequence); - 进阶项目:尝试时序预测(如股价、天气),重点掌握数据归一化、时间步长选择、预测结果反归一化;
- 复杂项目:挑战语音识别或机器翻译,学习特征提取(如MFCC)、双向RNN、多层RNN的使用。
阶段4:进阶扩展(长期)
- 模型优化:学习RNN的正则化方法(Dropout、权重衰减)、学习率调度(ReduceLROnPlateau);
- 对比学习:了解Transformer(如BERT、GPT)在时序任务上的应用,搞懂“为什么Transformer能替代RNN”;
- 行业应用:结合具体领域(如金融时序、医疗信号、工业传感器),学习领域特定的预处理方法。
RNN/LSTM/GRU的进化史,本质是“不断平衡‘记忆能力’和‘效率’”的过程:
- RNN证明了“循环结构能处理时序依赖”,但败给了长序列;
- LSTM用门控和细胞状态突破了长序列限制,但牺牲了效率;
- GRU在两者之间找到了平衡点,成为多数场景的“性价比之选”。

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)