PyTorch时间序列预测:LSTM/Transformer+TCN,股价/天气预测实战(附数据清洗+模型融合技巧)
大家好,我是南木。时间序列预测是AI落地最广泛的场景之一,从股价波动、气象预警,到工业设备故障预测、电商销量预估,几乎覆盖所有行业。但它比图像、NLP更依赖“”和“”:比如股价的“高噪声、非线性”适合LSTM+Transformer融合,而气象数据的“多尺度周期性”更适合TCN。这篇文章结合“新能源电站功率预测”和“消费指数预测”两个工业项目经验,从“数据预处理→模型实现→实战调优→融合落地”全流
大家好,我是南木。
时间序列预测是AI落地最广泛的场景之一,从股价波动、气象预警,到工业设备故障预测、电商销量预估,几乎覆盖所有行业。但它比图像、NLP更依赖“数据特性适配”和“模型选型匹配”:比如股价的“高噪声、非线性”适合LSTM+Transformer融合,而气象数据的“多尺度周期性”更适合TCN。
这篇文章结合“新能源电站功率预测”和“消费指数预测”两个工业项目经验,从“数据预处理→模型实现→实战调优→融合落地”全流程拆解时间序列预测。包含LSTM/TCN/Transformer的PyTorch实现、股价/天气双案例实战、数据清洗核心技巧、模型融合方法论四大模块,每个环节都附“可复用代码+避坑指南”。
同时 这里也给大家整理了一份适合零基础学习的深度学习资料包 需要的同学扫码自取

一、先搞懂:时间序列预测的核心逻辑(和其他任务有什么不同?)
在动手写代码前,必须先明确时间序列的本质特性——它和图像、文本的“静态数据”不同,核心是“时间依赖性”,盲目套用分类/回归的经验只会事倍功半。
1. 时间序列的4个核心特性(决定模型选型)
时间序列的预测难度,本质上由“趋势、季节性、周期性、噪声”四大特性决定,必须先分析清楚再选模型:
- 趋势(Trend):数据长期的上升/下降趋势(如全球气温逐年上升);
- 季节性(Seasonality):固定周期的重复波动(如每月10号电商销量高峰、夏季用电量激增);
- 周期性(Cyclicity):非固定周期的波动(如经济周期3-5年一次,无严格规律);
- 噪声(Noise):随机干扰(如股价因突发新闻的短期波动、传感器的测量误差)。
例:
- 天气数据(温度、降水):强趋势+强季节性+弱噪声 → 适合TCN(捕捉多尺度周期);
- 股价数据:弱趋势+弱周期性+强噪声 → 适合LSTM+Transformer(兼顾记忆与非线性);
- 工业设备振动数据:无趋势+强周期性+中噪声 → 适合LSTM(捕捉时序依赖)。
2. 时间序列预测的核心任务:“序列到序列”的映射
所有时间序列预测都可归结为“用过去N个时刻的数据,预测未来M个时刻的值”,按预测长度分为两类:
- 单步预测:预测未来1个时刻(如预测明天的气温);
- 多步预测:预测未来连续多个时刻(如预测未来7天的股价、未来24小时的电力负荷)。
关键概念:
- 输入序列长度(look_back):用过去多少个时刻的数据做输入(如用过去30天的股价预测明天,look_back=30);
- 输出序列长度(predict_steps):预测未来多少个时刻(如预测未来7天,predict_steps=7);
- 特征维度(feature_dim):每个时刻的特征数量(如股价预测可能包含“开盘价、收盘价、成交量”3个特征,feature_dim=3)。
3. 评估指标:不能只用MSE!(按场景选对指标)
时间序列的评估指标需结合“业务场景”选择,避免用单一指标误导判断:
| 指标名称 | 公式 | 特点 | 适用场景 |
|---|---|---|---|
| 平均绝对误差(MAE) | $MAE = \frac{1}{n}\sum | y_i - \hat{y}_i | $ |
| 均方误差(MSE) | MSE=1n∑(yi−y^i)2MSE = \frac{1}{n}\sum(y_i - \hat{y}_i)^2MSE=n1∑(yi−y^i)2 | 放大异常值影响,适合优化模型 | 模型训练目标函数 |
| 均方根误差(RMSE) | RMSE=MSERMSE = \sqrt{MSE}RMSE=MSE | 单位与目标一致,放大异常值影响 | 大多数通用场景(如气温预测) |
| 平均绝对百分比误差(MAPE) | $MAPE = \frac{1}{n}\sum | \frac{y_i - \hat{y}_i}{y_i} | ×100%$ |
避坑:股价、销量等可能为0或接近0的数据,禁用MAPE(会出现除以0的情况);异常值多的场景(如设备故障预测),优先用MAE。
二、数据层:时间序列数据预处理全攻略(预测准的基础)
“数据决定预测上限”——时间序列的预处理比其他任务更关键,因为“缺失值、异常值、非平稳性”会直接破坏时序依赖关系。以下是经过工业项目验证的实战流程。
1. 数据来源与加载(附实战数据集)
首先获取高质量的时序数据,推荐几个常用数据源,附加载代码:
(1)股价数据:Tushare(免费、接口友好)
# 安装:pip install tushare
import tushare as ts
import pandas as pd
import numpy as np
# 初始化Tushare(需注册获取token:https://tushare.pro/register)
ts.set_token("你的Tushare token")
pro = ts.pro_api()
# 获取贵州茅台(600519.SH)2018-2023年的日度数据
df_stock = pro.daily(
ts_code="600519.SH",
start_date="20180101",
end_date="20231231"
)
# 处理时间列(转为datetime,按时间排序)
df_stock["trade_date"] = pd.to_datetime(df_stock["trade_date"], format="%Y%m%d")
df_stock = df_stock.sort_values("trade_date").reset_index(drop=True)
# 选择核心特征(开盘价、最高价、最低价、收盘价、成交量)
df_stock = df_stock[["trade_date", "open", "high", "low", "close", "vol"]]
print(f"股价数据形状:{df_stock.shape}")
print(df_stock.head())
(2)天气数据:NOAA(全球气象数据,免费)
# 从NOAA获取北京2018-2023年的日度气温数据(也可直接用本地CSV)
df_weather = pd.read_csv("beijing_weather_2018_2023.csv")
# 处理时间列
df_weather["date"] = pd.to_datetime(df_weather["date"])
# 选择特征(日期、平均气温、最高气温、最低气温、降水量)
df_weather = df_weather[["date", "temp_avg", "temp_max", "temp_min", "precip"]]
df_weather = df_weather.sort_values("date").reset_index(drop=True)
print(f"天气数据形状:{df_weather.shape}")
print(df_weather.head())
2. 数据清洗:3大核心问题的实战解法
时间序列的清洗核心是“保留时序依赖,最小化信息损失”,不能像静态数据那样直接删除或填充。
(1)问题1:缺失值处理(最常见)
时序数据的缺失通常是“连续缺失”(如传感器故障)或“单点缺失”(如数据传输错误),推荐按缺失长度选择方法:
- 单点缺失:线性插值(
interpolate(method="linear")); - 连续缺失(<5个时刻):样条插值(
interpolate(method="spline", order=3)); - 连续缺失(≥5个时刻):前向填充+滑动平均修正(避免填充值过于生硬)。
代码实现:
def fill_missing_values(df, feature_cols):
"""处理缺失值"""
df_clean = df.copy()
for col in feature_cols:
# 1. 先处理单点缺失(线性插值)
df_clean[col] = df_clean[col].interpolate(method="linear", limit=1)
# 2. 处理连续缺失(样条插值,最多处理4个连续缺失)
df_clean[col] = df_clean[col].interpolate(method="spline", order=3, limit=4)
# 3. 剩余的长连续缺失(前向填充后用3步滑动平均修正)
df_clean[col] = df_clean[col].fillna(method="ffill")
# 滑动平均修正
df_clean[col] = df_clean[col].rolling(window=3, min_periods=1).mean()
return df_clean
# 清洗股价数据(特征列为open, high, low, close, vol)
feature_cols_stock = ["open", "high", "low", "close", "vol"]
df_stock_clean = fill_missing_values(df_stock, feature_cols_stock)
# 清洗天气数据(特征列为temp_avg, temp_max, temp_min, precip)
feature_cols_weather = ["temp_avg", "temp_max", "temp_min", "precip"]
df_weather_clean = fill_missing_values(df_weather, feature_cols_weather)
# 检查是否还有缺失值
print(f"股价数据缺失值:{df_stock_clean[feature_cols_stock].isnull().sum().sum()}")
print(f"天气数据缺失值:{df_weather_clean[feature_cols_weather].isnull().sum().sum()}")
(2)问题2:异常值处理(最影响精度)
时序异常值(如股价暴跌、气温骤升)通常是“突发噪声”,直接删除会破坏时序连续性,推荐“检测+修正”两步法:
- 检测:用“3σ原则”(适用于正态分布)或“IQR法”(适用于非正态分布);
- 修正:用“滑动中位数”替换异常值(比均值更鲁棒)。
代码实现:
def detect_and_correct_outliers(df, feature_cols, window=5):
"""检测并修正异常值"""
df_clean = df.copy()
for col in feature_cols:
# 1. 用IQR法检测异常值
Q1 = df_clean[col].quantile(0.25)
Q3 = df_clean[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 2. 标记异常值
outliers = (df_clean[col] < lower_bound) | (df_clean[col] > upper_bound)
print(f"{col}异常值数量:{outliers.sum()}")
# 3. 用滑动中位数修正异常值
if outliers.sum() > 0:
# 计算滑动中位数
rolling_median = df_clean[col].rolling(window=window, min_periods=1).median()
# 用中位数替换异常值
df_clean.loc[outliers, col] = rolling_median[outliers]
return df_clean
# 修正股价和天气数据的异常值
df_stock_clean = detect_and_correct_outliers(df_stock_clean, feature_cols_stock)
df_weather_clean = detect_and_correct_outliers(df_weather_clean, feature_cols_weather)
(3)问题3:数据归一化(模型收敛的关键)
时间序列的特征通常具有不同量级(如股价1000+,成交量1e8+),必须归一化到同一区间。推荐两种方法:
- Min-Max Scaling(缩放到[0,1]):适合有明确上下限的数据(如气温-20~40℃);
- Standard Scaling(均值0,标准差1):适合近似正态分布的数据(如股价、成交量)。
代码实现:
from sklearn.preprocessing import MinMaxScaler, StandardScaler
def normalize_data(df, feature_cols, method="standard"):
"""数据归一化"""
df_norm = df.copy()
# 选择缩放器
if method == "standard":
scaler = StandardScaler()
elif method == "minmax":
scaler = MinMaxScaler(feature_range=(0, 1))
# 拟合并转换
df_norm[feature_cols] = scaler.fit_transform(df_norm[feature_cols])
return df_norm, scaler
# 股价数据用Standard Scaling(近似正态分布)
df_stock_norm, scaler_stock = normalize_data(df_stock_clean, feature_cols_stock, method="standard")
# 天气数据用Min-Max Scaling(气温、降水量有明确范围)
df_weather_norm, scaler_weather = normalize_data(df_weather_clean, feature_cols_weather, method="minmax")
# 查看归一化后的数据
print("股价数据归一化后:")
print(df_stock_norm[feature_cols_stock].describe())
3. 特征工程:提升预测精度的“关键一步”
时间序列的特征工程核心是“挖掘时序依赖”,常用4类特征:
(1)时间特征(捕捉季节性)
提取“小时、星期、月份、季度”等特征,帮助模型学习周期性:
def add_time_features(df, time_col):
"""添加时间特征"""
df_time = df.copy()
df_time["hour"] = df_time[time_col].dt.hour # 小时(日度数据为0)
df_time["dayofweek"] = df_time[time_col].dt.dayofweek # 星期(0=周一,6=周日)
df_time["month"] = df_time[time_col].dt.month # 月份
df_time["quarter"] = df_time[time_col].dt.quarter # 季度
df_time["is_weekend"] = df_time["dayofweek"].apply(lambda x: 1 if x >=5 else 0) # 是否周末
return df_time
# 给股价数据添加时间特征(time_col="trade_date")
df_stock_norm = add_time_features(df_stock_norm, "trade_date")
# 给天气数据添加时间特征(time_col="date")
df_weather_norm = add_time_features(df_weather_norm, "date")
# 查看添加后的特征
print("股价数据新增时间特征:")
print(df_stock_norm[["trade_date", "dayofweek", "month", "is_weekend"]].head())
(2)滞后特征(捕捉时序依赖)
提取“过去1天、3天、7天”的历史值,直接喂给模型学习依赖关系:
def add_lag_features(df, feature_cols, lag_steps=[1, 3, 7]):
"""添加滞后特征"""
df_lag = df.copy()
for col in feature_cols:
for lag in lag_steps:
# 滞后lag步的特征(如close_lag1=昨天的收盘价)
df_lag[f"{col}_lag{lag}"] = df_lag[col].shift(lag)
# 删除因滞后产生的缺失值
df_lag = df_lag.dropna()
return df_lag
# 给股价数据添加滞后特征(滞后1、3、7天)
df_stock_norm = add_lag_features(df_stock_norm, feature_cols_stock, lag_steps=[1,3,7])
# 给天气数据添加滞后特征(滞后1、2、3天)
df_weather_norm = add_lag_features(df_weather_norm, feature_cols_weather, lag_steps=[1,2,3])
print(f"添加滞后特征后股价数据形状:{df_stock_norm.shape}")
(3)滚动统计特征(捕捉趋势)
计算“滚动均值、滚动标准差”等,帮助模型学习短期趋势:
def add_rolling_features(df, feature_cols, windows=[3, 7, 14]):
"""添加滚动统计特征"""
df_rolling = df.copy()
for col in feature_cols:
for window in windows:
# 滚动均值(如close_roll7=过去7天收盘价均值)
df_rolling[f"{col}_roll{window}_mean"] = df_rolling[col].rolling(window=window, min_periods=1).mean()
# 滚动标准差(捕捉波动)
df_rolling[f"{col}_roll{window}_std"] = df_rolling[col].rolling(window=window, min_periods=1).std()
return df_rolling
# 给股价数据添加滚动特征(窗口3、7、14天)
df_stock_norm = add_rolling_features(df_stock_norm, feature_cols_stock, windows=[3,7,14])
print(f"添加滚动特征后股价数据形状:{df_stock_norm.shape}")
4. 数据集构建:时序数据的“序列划分”
时序数据不能随机划分训练/测试集(会破坏时序依赖),必须按时间顺序划分,且要构建“输入序列→输出序列”的样本格式。
(1)构建序列样本(核心函数)
import torch
from torch.utils.data import Dataset, DataLoader
def create_sequences(data, look_back, predict_steps, input_cols, target_cols):
"""
构建时序序列样本
:param data: 预处理后的DataFrame
:param look_back: 输入序列长度(用过去多少个时刻)
:param predict_steps: 输出序列长度(预测未来多少个时刻)
:param input_cols: 输入特征列
:param target_cols: 目标列(要预测的特征)
:return: X(输入序列), y(输出序列)
"""
X, y = [], []
# 遍历数据,构建样本
for i in range(len(data) - look_back - predict_steps + 1):
# 输入序列:过去look_back个时刻的输入特征
input_seq = data.iloc[i:i+look_back][input_cols].values
# 输出序列:未来predict_steps个时刻的目标特征
target_seq = data.iloc[i+look_back:i+look_back+predict_steps][target_cols].values
X.append(input_seq)
y.append(target_seq)
# 转为PyTorch张量
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)
return X, y
# 定义股价预测的参数
look_back_stock = 30 # 用过去30天预测
predict_steps_stock = 1 # 单步预测(预测明天的收盘价)
input_cols_stock = feature_cols_stock + [col for col in df_stock_norm.columns if any(suffix in col for suffix in ["lag", "roll", "dayofweek", "month", "is_weekend"])]
target_cols_stock = ["close"] # 预测收盘价
# 构建股价序列样本
X_stock, y_stock = create_sequences(
df_stock_norm,
look_back=look_back_stock,
predict_steps=predict_steps_stock,
input_cols=input_cols_stock,
target_cols=target_cols_stock
)
print(f"股价样本形状:X={X_stock.shape}, y={y_stock.shape}") # X: [样本数, look_back, 输入特征数], y: [样本数, predict_steps, 目标特征数]
(2)按时间划分训练/测试集
def split_train_test(X, y, test_size=0.2):
"""按时间顺序划分训练/测试集"""
split_idx = int(len(X) * (1 - test_size))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
return X_train, X_test, y_train, y_test
# 划分股价数据(测试集占20%)
X_stock_train, X_stock_test, y_stock_train, y_stock_test = split_train_test(X_stock, y_stock, test_size=0.2)
print(f"股价训练集:X={X_stock_train.shape}, y={y_stock_train.shape}")
print(f"股价测试集:X={X_stock_test.shape}, y={y_stock_test.shape}")
(3)封装为PyTorch Dataset
class TimeSeriesDataset(Dataset):
"""时间序列数据集"""
def __init__(self, X, y):
self.X = X
self.y = y
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# 实例化股价数据集
train_dataset_stock = TimeSeriesDataset(X_stock_train, y_stock_train)
test_dataset_stock = TimeSeriesDataset(X_stock_test, y_stock_test)
# 加载数据(batch_size根据GPU显存调整)
batch_size = 32
train_loader_stock = DataLoader(train_dataset_stock, batch_size=batch_size, shuffle=False) # 时序数据不shuffle!
test_loader_stock = DataLoader(test_dataset_stock, batch_size=batch_size, shuffle=False)
# 测试数据集
sample_X, sample_y = train_dataset_stock[0]
print(f"样本输入形状:{sample_X.shape}") # [look_back, 输入特征数]
print(f"样本输出形状:{sample_y.shape}") # [predict_steps, 目标特征数]
三、模型层:LSTM/TCN/Transformer的PyTorch实现与选型
时间序列模型的选型核心是“匹配数据特性”——没有“万能模型”,但有“最优适配”。以下是3种主流模型的实战实现。
1. LSTM:捕捉长时序依赖(经典之选)
LSTM(Long Short-Term Memory)通过“门控机制”解决了RNN的“梯度消失”问题,能有效捕捉长时序依赖,是时序预测的“入门首选”。
(1)核心原理
LSTM有3个关键门控:
- 遗忘门:决定丢弃哪些历史信息;
- 输入门:决定哪些新信息存入细胞状态;
- 输出门:根据细胞状态输出当前时刻的结果。
适合场景:有明确时序依赖、中等噪声的数据(如工业设备振动、电力负荷)。
(2)PyTorch实现
class LSTMTimeSeries(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, output_dim, predict_steps, dropout=0.2):
"""
:param input_dim: 输入特征维度
:param hidden_dim: LSTM隐藏层维度
:param num_layers: LSTM层数
:param output_dim: 输出特征维度(目标特征数)
:param predict_steps: 预测步数
:param dropout: dropout率
"""
super().__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.predict_steps = predict_steps
# LSTM层
self.lstm = nn.LSTM(
input_size=input_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True, # 输入形状:[batch_size, seq_len, input_dim]
dropout=dropout if num_layers > 1 else 0 # 多层时才用dropout
)
# 全连接层(将LSTM输出映射到预测步数)
self.fc = nn.Linear(hidden_dim, output_dim * predict_steps)
def forward(self, x):
# x: [batch_size, look_back, input_dim]
batch_size = x.size(0)
# 初始化LSTM隐藏状态和细胞状态
h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(x.device)
c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(x.device)
# LSTM前向传播:输出shape=[batch_size, look_back, hidden_dim]
lstm_out, _ = self.lstm(x, (h0, c0))
# 取最后一个时刻的输出(包含所有历史信息)
last_hidden = lstm_out[:, -1, :] # [batch_size, hidden_dim]
# 全连接层预测:输出shape=[batch_size, output_dim*predict_steps]
output = self.fc(last_hidden)
# 重塑为[batch_size, predict_steps, output_dim]
output = output.view(batch_size, self.predict_steps, -1)
return output
# 实例化LSTM模型(股价预测)
input_dim_stock = X_stock_train.shape[2] # 输入特征维度
output_dim_stock = len(target_cols_stock) # 输出特征维度(1:仅收盘价)
lstm_model = LSTMTimeSeries(
input_dim=input_dim_stock,
hidden_dim=128,
num_layers=2,
output_dim=output_dim_stock,
predict_steps=predict_steps_stock,
dropout=0.2
)
# 迁移到GPU(如果可用)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lstm_model.to(device)
print(f"LSTM模型结构:")
print(lstm_model)
print(f"模型参数量:{sum(p.numel() for p in lstm_model.parameters()):,}")
2. TCN:捕捉多尺度周期性(气象/电力首选)
TCN(Temporal Convolutional Network)通过“扩张卷积”和“残差连接”,能同时捕捉短期和长期周期性,比LSTM更适合有强季节性、多尺度依赖的数据(如天气、电力负荷)。
(1)核心原理
- 扩张卷积:通过扩大卷积核的感受野,捕捉不同尺度的时序模式(如天气的日周期、周周期、月周期);
- 残差连接:解决深层网络的梯度消失问题,允许构建更深的网络。
适合场景:强季节性、多尺度依赖的数据(如气温、降水量、电力负荷)。
(2)PyTorch实现
class TemporalBlock(nn.Module):
"""TCN的残差块"""
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
super().__init__()
# 第一层卷积
self.conv1 = nn.Conv1d(
n_inputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation
)
self.bn1 = nn.BatchNorm1d(n_outputs)
self.relu1 = nn.ReLU()
self.dropout1 = nn.Dropout(dropout)
# 第二层卷积
self.conv2 = nn.Conv1d(
n_outputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation
)
self.bn2 = nn.BatchNorm1d(n_outputs)
self.relu2 = nn.ReLU()
self.dropout2 = nn.Dropout(dropout)
# 残差连接(若输入输出维度不同,用1x1卷积调整)
self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
self.relu = nn.ReLU()
def forward(self, x):
# x: [batch_size, input_dim, seq_len]
residual = x
# 第一层
out = self.conv1(x)
out = self.bn1(out)
out = self.relu1(out)
out = self.dropout1(out)
# 第二层
out = self.conv2(out)
out = self.bn2(out)
out = self.dropout2(out)
# 残差连接
if self.downsample is not None:
residual = self.downsample(residual)
out += residual
out = self.relu(out)
return out
class TCNTimeSeries(nn.Module):
def __init__(self, input_dim, output_dim, predict_steps, num_channels, kernel_size=3, dropout=0.2):
"""
:param input_dim: 输入特征维度
:param output_dim: 输出特征维度
:param predict_steps: 预测步数
:param num_channels: 各层输出通道数(如[64, 64, 128])
:param kernel_size: 卷积核大小
:param dropout: dropout率
"""
super().__init__()
self.predict_steps = predict_steps
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation_size = 2 ** i # 扩张系数(2^i,指数增长)
in_channels = input_dim if i == 0 else num_channels[i-1]
out_channels = num_channels[i]
# 计算padding,保证输出序列长度与输入一致
padding = (kernel_size - 1) * dilation_size
# 添加TCN残差块
layers += [TemporalBlock(
in_channels, out_channels, kernel_size,
stride=1, dilation=dilation_size, padding=padding, dropout=dropout
)]
# TCN卷积层
self.tcn = nn.Sequential(*layers)
# 全连接层(预测输出)
self.fc = nn.Linear(num_channels[-1], output_dim * predict_steps)
def forward(self, x):
# x: [batch_size, look_back, input_dim] → TCN需要[batch_size, input_dim, look_back]
x = x.permute(0, 2, 1) # 维度转换
# TCN前向传播:输出[batch_size, num_channels[-1], look_back]
tcn_out = self.tcn(x)
# 取最后一个时刻的输出
last_out = tcn_out[:, :, -1] # [batch_size, num_channels[-1]]
# 全连接层预测
output = self.fc(last_out)
# 重塑为[batch_size, predict_steps, output_dim]
output = output.view(output.size(0), self.predict_steps, -1)
return output
# 实例化TCN模型(天气预测,假设已构建天气数据的X_weather_train)
# input_dim_weather = X_weather_train.shape[2]
# output_dim_weather = len(target_cols_weather) # 如预测气温、降水量,output_dim=4
# tcn_model = TCNTimeSeries(
# input_dim=input_dim_weather,
# output_dim=output_dim_weather,
# predict_steps=7, # 预测未来7天天气
# num_channels=[64, 64, 128], # 三层TCN,通道数64→64→128
# kernel_size=3,
# dropout=0.2
# )
# tcn_model.to(device)
3. Transformer:捕捉非线性依赖(股价/金融首选)
Transformer通过“自注意力机制”能同时关注序列中的所有时刻,比LSTM更擅长捕捉非线性、长距离依赖,适合股价、金融等复杂时序数据。
(1)核心原理
- 自注意力机制:计算每个时刻与其他所有时刻的关联权重,捕捉全局依赖;
- 位置编码:给序列添加位置信息(Transformer本身无时序感知);
- 多头注意力:并行计算多个注意力头,捕捉不同类型的依赖。
适合场景:高噪声、非线性、长距离依赖的数据(如股价、汇率、金融指数)。
(2)PyTorch实现
class PositionalEncoding(nn.Module):
"""位置编码"""
def __init__(self, d_model, max_len=5000):
super().__init__()
# 初始化位置编码矩阵
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# 位置编码公式:PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
# PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# 添加batch维度
pe = pe.unsqueeze(0)
# 不更新位置编码
self.register_buffer('pe', pe)
def forward(self, x):
# x: [batch_size, seq_len, d_model]
x = x + self.pe[:, :x.size(1), :]
return x
class TransformerTimeSeries(nn.Module):
def __init__(self, input_dim, d_model, n_heads, n_layers, output_dim, predict_steps, dropout=0.2):
"""
:param input_dim: 输入特征维度
:param d_model: Transformer的模型维度(需为n_heads的倍数)
:param n_heads: 多头注意力头数
:param n_layers: Transformer层数
:param output_dim: 输出特征维度
:param predict_steps: 预测步数
:param dropout: dropout率
"""
super().__init__()
self.predict_steps = predict_steps
self.d_model = d_model
# 输入投影(将input_dim映射到d_model)
self.input_proj = nn.Linear(input_dim, d_model)
# 位置编码
self.pos_encoder = PositionalEncoding(d_model)
# Transformer编码器层
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=n_heads,
dim_feedforward=d_model * 4, # 前馈网络维度通常为d_model*4
dropout=dropout,
batch_first=True # 输入shape=[batch_size, seq_len, d_model]
)
# Transformer编码器
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
# 全连接层(预测输出)
self.fc = nn.Linear(d_model, output_dim * predict_steps)
def forward(self, x):
# x: [batch_size, look_back, input_dim]
batch_size = x.size(0)
# 输入投影:[batch_size, look_back, input_dim] → [batch_size, look_back, d_model]
x = self.input_proj(x) * np.sqrt(self.d_model) # 缩放,稳定训练
# 位置编码
x = self.pos_encoder(x)
# Transformer编码:输出[batch_size, look_back, d_model]
transformer_out = self.transformer_encoder(x)
# 取最后一个时刻的输出
last_out = transformer_out[:, -1, :] # [batch_size, d_model]
# 全连接层预测
output = self.fc(last_out)
# 重塑为[batch_size, predict_steps, output_dim]
output = output.view(batch_size, self.predict_steps, -1)
return output
# 实例化Transformer模型(股价预测)
transformer_model = TransformerTimeSeries(
input_dim=input_dim_stock,
d_model=128, # 需为n_heads的倍数(如128=8*16)
n_heads=8,
n_layers=2,
output_dim=output_dim_stock,
predict_steps=predict_steps_stock,
dropout=0.2
)
transformer_model.to(device)
print(f"Transformer模型参数量:{sum(p.numel() for p in transformer_model.parameters()):,}")
4. 模型选型决策树(实战首选)
| 数据特性 | 推荐模型 | 理由 | 示例场景 |
|---|---|---|---|
| 强时序依赖、中等噪声 | LSTM | 门控机制捕捉长依赖,实现简单 | 设备振动预测、电力负荷预测 |
| 强季节性、多尺度依赖 | TCN | 扩张卷积捕捉多周期,精度高于LSTM | 气温预测、降水量预测 |
| 高噪声、非线性、长依赖 | Transformer | 自注意力捕捉全局非线性依赖 | 股价预测、汇率预测 |
| 追求精度与效率平衡 | LSTM+Transformer | 融合LSTM的时序记忆与Transformer的全局依赖 | 金融指数预测、销量预估 |
四、实战案例:股价+天气预测全流程(从训练到预测)
以“股价单步预测”和“天气多步预测”为例,完整展示时序预测的实战流程。
案例1:股价单步预测(LSTM+Transformer融合)
任务:用过去30天的股价数据(开盘价、收盘价、成交量等)预测明天的收盘价,目标RMSE<5%。
(1)训练配置
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
# 选择模型(这里用Transformer,也可替换为LSTM)
model = transformer_model
# 损失函数(MSE,适合回归任务)
criterion = nn.MSELoss()
# 优化器(AdamW,比Adam更稳定)
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
# 学习率调度器(验证损失不下降时衰减lr)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)
# 训练轮数
num_epochs = 50
# 最佳模型保存路径
best_model_path = "best_stock_transformer.pth"
# 记录最佳验证损失
best_val_loss = float('inf')
(2)训练与验证循环
def train_one_epoch(model, train_loader, criterion, optimizer, device):
"""训练一个epoch"""
model.train()
total_loss = 0.0
for X_batch, y_batch in train_loader:
# 迁移到设备
X_batch = X_batch.to(device)
y_batch = y_batch.to(device)
# 前向传播
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
# 反向传播与优化
optimizer.zero_grad()
loss.backward()
# 梯度裁剪(防止梯度爆炸)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
# 累积损失
total_loss += loss.item() * X_batch.size(0)
# 平均损失
avg_loss = total_loss / len(train_loader.dataset)
return avg_loss
def validate(model, val_loader, criterion, device):
"""验证模型"""
model.eval()
total_loss = 0.0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch = X_batch.to(device)
y_batch = y_batch.to(device)
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
total_loss += loss.item() * X_batch.size(0)
avg_loss = total_loss / len(val_loader.dataset)
return avg_loss
# 开始训练
for epoch in range(num_epochs):
print(f"\nEpoch {epoch+1}/{num_epochs}")
# 训练
train_loss = train_one_epoch(model, train_loader_stock, criterion, optimizer, device)
# 验证
val_loss = validate(model, test_loader_stock, criterion, device)
# 学习率调度
scheduler.step(val_loss)
# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), best_model_path)
print(f"最佳模型更新:验证损失 {val_loss:.6f}")
print(f"训练损失:{train_loss:.6f}, 验证损失:{val_loss:.6f}")
# 加载最佳模型
model.load_state_dict(torch.load(best_model_path))
(3)预测与结果可视化
import matplotlib.pyplot as plt
def inverse_transform_prediction(y_pred, y_true, scaler, target_cols, feature_cols):
"""将预测结果反归一化(恢复原始尺度)"""
# 创建空数组,形状与归一化前的特征一致
y_pred_inv = np.zeros((y_pred.shape[0], len(feature_cols)))
y_true_inv = np.zeros((y_true.shape[0], len(feature_cols)))
# 填入目标特征的预测值和真实值
target_idx = [feature_cols.index(col) for col in target_cols]
y_pred_inv[:, target_idx] = y_pred.squeeze()
y_true_inv[:, target_idx] = y_true.squeeze()
# 反归一化
y_pred_inv = scaler.inverse_transform(y_pred_inv)[:, target_idx]
y_true_inv = scaler.inverse_transform(y_true_inv)[:, target_idx]
return y_pred_inv, y_true_inv
def plot_predictions(y_true, y_pred, title):
"""绘制预测结果对比图"""
plt.figure(figsize=(12, 6))
plt.plot(y_true, label='真实值', color='blue', linewidth=1.5)
plt.plot(y_pred, label='预测值', color='red', linewidth=1.5, alpha=0.8)
plt.title(title, fontsize=14)
plt.xlabel('时间步', fontsize=12)
plt.ylabel('收盘价(元)', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 生成测试集预测结果
model.eval()
y_pred = []
with torch.no_grad():
for X_batch, _ in test_loader_stock:
X_batch = X_batch.to(device)
outputs = model(X_batch)
y_pred.append(outputs.cpu().numpy())
# 合并预测结果
y_pred = np.concatenate(y_pred, axis=0)
y_true = y_stock_test.cpu().numpy()
# 反归一化(恢复原始股价尺度)
y_pred_inv, y_true_inv = inverse_transform_prediction(
y_pred, y_true, scaler_stock, target_cols_stock, feature_cols_stock
)
# 计算评估指标
from sklearn.metrics import mean_absolute_error, mean_squared_error
mae = mean_absolute_error(y_true_inv, y_pred_inv)
mse = mean_squared_error(y_true_inv, y_pred_inv)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((y_true_inv - y_pred_inv) / y_true_inv)) * 100
print(f"股价预测评估指标:")
print(f"MAE:{mae:.2f} 元")
print(f"MSE:{mse:.2f}")
print(f"RMSE:{rmse:.2f} 元")
print(f"MAPE:{mape:.2f}%")
# 绘制预测结果(取最后100个时刻的结果,更直观)
plot_predictions(y_true_inv[-100:], y_pred_inv[-100:], title="股价预测结果对比(最后100个时刻)")
(4)结果分析
- 若预测值“滞后于”真实值:说明模型只学习了短期依赖,可增加
look_back长度(如从30增至60),或添加更多滞后特征; - 若预测值波动过大:说明模型过拟合噪声,可增大
dropout率(如从0.2增至0.3),或减少模型参数量; - 若RMSE过高(>5%):可尝试LSTM+Transformer融合,或添加更多特征(如宏观经济指标)。
案例2:天气多步预测(TCN)
任务:用过去14天的天气数据预测未来7天的平均气温、最高气温、最低气温和降水量,目标MAE<2℃(气温)、<5mm(降水量)。
(1)核心差异与调整
多步预测比单步预测更复杂,需调整以下参数:
- predict_steps=7(预测未来7天);
- 损失函数:用“多步MSE”,对远期预测可加权(如远期预测的权重降低,减少误差影响);
- 模型选择:优先用TCN(捕捉多尺度季节性)。
(2)关键代码(省略重复部分)
# 1. 数据预处理(假设已完成清洗、特征工程)
look_back_weather = 14 # 用过去14天预测
predict_steps_weather = 7 # 预测未来7天
input_cols_weather = feature_cols_weather + [时间特征、滞后特征、滚动特征]
target_cols_weather = ["temp_avg", "temp_max", "temp_min", "precip"]
# 2. 构建序列样本
X_weather, y_weather = create_sequences(
df_weather_norm,
look_back=look_back_weather,
predict_steps=predict_steps_weather,
input_cols=input_cols_weather,
target_cols=target_cols_weather
)
# 3. 划分训练/测试集
X_weather_train, X_weather_test, y_weather_train, y_weather_test = split_train_test(X_weather, y_weather, test_size=0.2)
# 4. 实例化TCN模型
tcn_model = TCNTimeSeries(
input_dim=X_weather_train.shape[2],
output_dim=len(target_cols_weather),
predict_steps=predict_steps_weather,
num_channels=[64, 64, 128],
kernel_size=3,
dropout=0.2
)
tcn_model.to(device)
# 5. 训练(同股价预测,损失函数用MSE)
# ...(训练代码省略)
# 6. 预测与反归一化
model.eval()
y_weather_pred = []
with torch.no_grad():
for X_batch, _ in test_loader_weather:
X_batch = X_batch.to(device)
outputs = tcn_model(X_batch)
y_weather_pred.append(outputs.cpu().numpy())
y_weather_pred = np.concatenate(y_weather_pred, axis=0)
y_weather_true = y_weather_test.cpu().numpy()
# 反归一化(天气用Min-Max Scaling)
def inverse_transform_multi(y_pred, y_true, scaler, target_cols, feature_cols):
y_pred_inv = np.zeros((y_pred.shape[0], y_pred.shape[1], len(feature_cols)))
y_true_inv = np.zeros((y_true.shape[0], y_true.shape[1], len(feature_cols)))
target_idx = [feature_cols.index(col) for col in target_cols]
for t in range(y_pred.shape[1]):
y_pred_inv[:, t, target_idx] = y_pred[:, t, :]
y_true_inv[:, t, target_idx] = y_true[:, t, :]
# 反归一化(需遍历每个时间步)
for i in range(y_pred_inv.shape[0]):
y_pred_inv[i] = scaler.inverse_transform(y_pred_inv[i])
y_true_inv[i] = scaler.inverse_transform(y_true_inv[i])
return y_pred_inv[:, :, target_idx], y_true_inv[:, :, target_idx]
y_weather_pred_inv, y_weather_true_inv = inverse_transform_multi(
y_weather_pred, y_weather_true, scaler_weather, target_cols_weather, feature_cols_weather
)
# 7. 绘制7天气温预测结果
plt.figure(figsize=(14, 8))
# 真实值
plt.plot(y_weather_true_inv[0, :, 0], 'o-', label='真实平均气温', color='blue')
# 预测值
plt.plot(y_weather_pred_inv[0, :, 0], 's-', label='预测平均气温', color='red')
plt.title('未来7天平均气温预测结果(第1个测试样本)', fontsize=14)
plt.xlabel('预测天数', fontsize=12)
plt.ylabel('平均气温(℃)', fontsize=12)
plt.xticks(range(7), [f'第{i+1}天' for i in range(7)])
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
五、模型融合:提升预测精度的“终极技巧”
单一模型的预测能力有限,模型融合能通过组合多个模型的优势,进一步提升精度(通常能降低10%-20%的误差)。以下是3种实战融合方法。
1. 加权平均融合(最简单有效)
将多个模型的预测结果按权重平均,权重可根据模型的验证精度设定(精度高的模型权重更大)。
代码实现:
def weighted_average_fusion(predictions, weights):
"""
加权平均融合
:param predictions: 模型预测结果列表(如[lstm_pred, transformer_pred, tcn_pred])
:param weights: 权重列表(和为1,如[0.3, 0.5, 0.2])
:return: 融合后的预测结果
"""
# 检查预测结果形状是否一致
assert len(predictions) == len(weights), "模型数量与权重数量不一致"
for pred in predictions[1:]:
assert pred.shape == predictions[0].shape, "所有模型的预测形状必须一致"
# 加权平均
fused_pred = np.zeros_like(predictions[0])
for pred, w in zip(predictions, weights):
fused_pred += pred * w
return fused_pred
# 假设已有3个模型的预测结果
lstm_pred = ... # LSTM的预测结果
transformer_pred = ... # Transformer的预测结果
tcn_pred = ... # TCN的预测结果(若适用)
# 根据验证精度设定权重(如Transformer精度最高,权重0.5)
weights = [0.3, 0.5, 0.2]
fused_pred = weighted_average_fusion([lstm_pred, transformer_pred, tcn_pred], weights)
# 计算融合后的指标
fused_rmse = np.sqrt(mean_squared_error(y_true_inv, fused_pred))
print(f"融合后RMSE:{fused_rmse:.2f} 元")
print(f"融合前Transformer RMSE:{rmse_transformer:.2f} 元")
2. Stacking融合(精度更高)
Stacking是“两级模型融合”:用多个基础模型的预测结果作为“新特征”,训练一个元模型(如线性回归、LightGBM)来输出最终预测。
代码实现:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import KFold
def stacking_fusion(X_train, y_train, X_test, base_models, meta_model, n_folds=5):
"""
Stacking融合
:param X_train: 训练集输入序列
:param y_train: 训练集目标
:param X_test: 测试集输入序列
:param base_models: 基础模型列表(如[lstm_model, transformer_model])
:param meta_model: 元模型(如LinearRegression)
:param n_folds: K折交叉验证的折数
:return: 测试集的融合预测结果
"""
batch_size = 32
n_base_models = len(base_models)
n_train = len(X_train)
n_test = len(X_test)
# 初始化基础模型的训练集预测矩阵(n_train, n_base_models, output_dim)
train_meta_features = np.zeros((n_train, n_base_models, y_train.shape[2]))
# 初始化基础模型的测试集预测矩阵(n_test, n_base_models, output_dim)
test_meta_features = np.zeros((n_test, n_base_models, y_train.shape[2]))
# K折交叉验证训练基础模型,生成元特征
kf = KFold(n_splits=n_folds, shuffle=False) # 时序数据不shuffle
for model_idx, model in enumerate(base_models):
print(f"训练基础模型 {model_idx+1}/{n_base_models}")
for train_idx, val_idx in kf.split(X_train):
# 划分K折的训练/验证集
X_kf_train, X_kf_val = X_train[train_idx], X_train[val_idx]
y_kf_train, y_kf_val = y_train[train_idx], y_train[val_idx]
# 加载基础模型并训练(简化代码,实际需重新初始化训练)
# ...(基础模型训练代码省略)
# 生成验证集元特征
model.eval()
with torch.no_grad():
val_loader = DataLoader(TimeSeriesDataset(X_kf_val, y_kf_val), batch_size=batch_size, shuffle=False)
val_pred = []
for X_batch, _ in val_loader:
X_batch = X_batch.to(device)
outputs = model(X_batch)
val_pred.append(outputs.cpu().numpy())
val_pred = np.concatenate(val_pred, axis=0)
train_meta_features[val_idx, model_idx, :] = val_pred.squeeze()
# 生成测试集元特征(用完整训练的基础模型)
test_loader = DataLoader(TimeSeriesDataset(X_test, y_test), batch_size=batch_size, shuffle=False)
test_pred = []
with torch.no_grad():
for X_batch, _ in test_loader:
X_batch = X_batch.to(device)
outputs = model(X_batch)
test_pred.append(outputs.cpu().numpy())
test_pred = np.concatenate(test_pred, axis=0)
test_meta_features[:, model_idx, :] = test_pred.squeeze()
# 训练元模型(需将元特征展平)
# 训练集:[n_train, n_base_models*output_dim]
X_meta_train = train_meta_features.reshape(n_train, -1)
y_meta_train = y_train.squeeze()
# 测试集:[n_test, n_base_models*output_dim]
X_meta_test = test_meta_features.reshape(n_test, -1)
# 训练元模型
meta_model.fit(X_meta_train, y_meta_train)
# 生成融合预测
fused_pred = meta_model.predict(X_meta_test).reshape(n_test, 1, -1)
return fused_pred
# 实例化基础模型和元模型
base_models = [lstm_model, transformer_model]
meta_model = LinearRegression()
# 生成Stacking融合结果
stacked_pred = stacking_fusion(
X_stock_train, y_stock_train, X_stock_test, base_models, meta_model, n_folds=5
)
# 评估融合效果
stacked_rmse = np.sqrt(mean_squared_error(y_true_inv, stacked_pred.squeeze()))
print(f"Stacking融合后RMSE:{stacked_rmse:.2f} 元")
3. 融合避坑指南
(1)坑1:融合“相似模型”
错误:融合LSTM和GRU(结构相似,捕捉的依赖关系一致)。
后果:融合增益极低(误差降低<5%),浪费计算资源。
解决:选择“互补模型”(如LSTM+TCN+Transformer),分别捕捉时序依赖、周期性、非线性。
(2)坑2:权重设定不合理
错误:给所有模型相同的权重(如[0.33, 0.33, 0.33])。
后果:精度低的模型拉低整体效果。
解决:按“验证集误差的倒数”设定权重(误差越小,权重越大),或用交叉验证优化权重。
(3)坑3:过拟合元模型
错误:用复杂的元模型(如深度神经网络)且训练数据少。
后果:元模型过拟合基础模型的噪声,测试集精度下降。
解决:优先用简单元模型(如线性回归、Ridge),或增加训练数据量。
六、学习路径
1. 学习路径(分3个阶段)
(1)入门阶段(1-2个月):基础能力搭建
- 目标:掌握时序数据预处理和LSTM实现,能完成单步预测;
- 核心任务:
- 用LSTM预测股价单步收盘价,RMSE<8%;
- 掌握数据清洗(缺失值、异常值)和特征工程(滞后、滚动特征);
- 实现预测结果的反归一化和可视化。
- 推荐资源:
- 书籍《Python时间序列分析与应用》;
- PyTorch官方教程《Sequence Models and Long-Short Term Memory Networks》。
(2)进阶阶段(2-3个月):模型进阶与多步预测
- 目标:掌握TCN/Transformer实现,能完成多步预测和模型融合;
- 核心任务:
- 用TCN预测未来7天的气温,MAE<2℃;
- 用Transformer预测股价,对比LSTM的效果差异;
- 实现加权平均融合,将预测误差降低10%+。
- 推荐资源:
- 论文《An Empirical Evaluation of Generic Convolutional and Recurrent Networks for Sequence Modeling》(TCN);
- 论文《Attention Is All You Need》(Transformer);
- 开源项目《PyTorch-Time-Series》。
(3)专家阶段(3-6个月):工业级落地
- 目标:能独立设计时序预测系统,解决工业场景问题;
- 核心任务:
- 完成一个工业项目(如设备故障预测、电力负荷预测),包含全流程落地;
- 用ONNX Runtime/TensorRT优化推理速度,满足实时性要求;
- 设计监控系统,检测模型漂移并触发重训练。
- 推荐资源:
- 书籍《Time Series Forecasting with Python》;
- 工业案例《Kaggle Energy Load Forecasting Competitions》;
- ONNX Runtime官方文档《Time Series Optimization》。
我是南木 需要学习规划、就业指导、论文辅导和技术答疑的小伙伴欢迎扫码交流
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)