大家好,我是南木。
时间序列预测是AI落地最广泛的场景之一,从股价波动、气象预警,到工业设备故障预测、电商销量预估,几乎覆盖所有行业。但它比图像、NLP更依赖“数据特性适配”和“模型选型匹配”:比如股价的“高噪声、非线性”适合LSTM+Transformer融合,而气象数据的“多尺度周期性”更适合TCN。

这篇文章结合“新能源电站功率预测”和“消费指数预测”两个工业项目经验,从“数据预处理→模型实现→实战调优→融合落地”全流程拆解时间序列预测。包含LSTM/TCN/Transformer的PyTorch实现、股价/天气双案例实战、数据清洗核心技巧、模型融合方法论四大模块,每个环节都附“可复用代码+避坑指南”。

同时 这里也给大家整理了一份适合零基础学习的深度学习资料包 需要的同学扫码自取
在这里插入图片描述
在这里插入图片描述

一、先搞懂:时间序列预测的核心逻辑(和其他任务有什么不同?)

在动手写代码前,必须先明确时间序列的本质特性——它和图像、文本的“静态数据”不同,核心是“时间依赖性”,盲目套用分类/回归的经验只会事倍功半。

1. 时间序列的4个核心特性(决定模型选型)

时间序列的预测难度,本质上由“趋势、季节性、周期性、噪声”四大特性决定,必须先分析清楚再选模型:

  • 趋势(Trend):数据长期的上升/下降趋势(如全球气温逐年上升);
  • 季节性(Seasonality):固定周期的重复波动(如每月10号电商销量高峰、夏季用电量激增);
  • 周期性(Cyclicity):非固定周期的波动(如经济周期3-5年一次,无严格规律);
  • 噪声(Noise):随机干扰(如股价因突发新闻的短期波动、传感器的测量误差)。

  • 天气数据(温度、降水):强趋势+强季节性+弱噪声 → 适合TCN(捕捉多尺度周期);
  • 股价数据:弱趋势+弱周期性+强噪声 → 适合LSTM+Transformer(兼顾记忆与非线性);
  • 工业设备振动数据:无趋势+强周期性+中噪声 → 适合LSTM(捕捉时序依赖)。

2. 时间序列预测的核心任务:“序列到序列”的映射

所有时间序列预测都可归结为“用过去N个时刻的数据,预测未来M个时刻的值”,按预测长度分为两类:

  1. 单步预测:预测未来1个时刻(如预测明天的气温);
  2. 多步预测:预测未来连续多个时刻(如预测未来7天的股价、未来24小时的电力负荷)。

关键概念

  • 输入序列长度(look_back):用过去多少个时刻的数据做输入(如用过去30天的股价预测明天,look_back=30);
  • 输出序列长度(predict_steps):预测未来多少个时刻(如预测未来7天,predict_steps=7);
  • 特征维度(feature_dim):每个时刻的特征数量(如股价预测可能包含“开盘价、收盘价、成交量”3个特征,feature_dim=3)。

3. 评估指标:不能只用MSE!(按场景选对指标)

时间序列的评估指标需结合“业务场景”选择,避免用单一指标误导判断:

指标名称 公式 特点 适用场景
平均绝对误差(MAE) $MAE = \frac{1}{n}\sum y_i - \hat{y}_i $
均方误差(MSE) MSE=1n∑(yi−y^i)2MSE = \frac{1}{n}\sum(y_i - \hat{y}_i)^2MSE=n1(yiy^i)2 放大异常值影响,适合优化模型 模型训练目标函数
均方根误差(RMSE) RMSE=MSERMSE = \sqrt{MSE}RMSE=MSE 单位与目标一致,放大异常值影响 大多数通用场景(如气温预测)
平均绝对百分比误差(MAPE) $MAPE = \frac{1}{n}\sum \frac{y_i - \hat{y}_i}{y_i} ×100%$

避坑:股价、销量等可能为0或接近0的数据,禁用MAPE(会出现除以0的情况);异常值多的场景(如设备故障预测),优先用MAE。

二、数据层:时间序列数据预处理全攻略(预测准的基础)

“数据决定预测上限”——时间序列的预处理比其他任务更关键,因为“缺失值、异常值、非平稳性”会直接破坏时序依赖关系。以下是经过工业项目验证的实战流程。

1. 数据来源与加载(附实战数据集)

首先获取高质量的时序数据,推荐几个常用数据源,附加载代码:

(1)股价数据:Tushare(免费、接口友好)
# 安装:pip install tushare
import tushare as ts
import pandas as pd
import numpy as np

# 初始化Tushare(需注册获取token:https://tushare.pro/register)
ts.set_token("你的Tushare token")
pro = ts.pro_api()

# 获取贵州茅台(600519.SH)2018-2023年的日度数据
df_stock = pro.daily(
    ts_code="600519.SH",
    start_date="20180101",
    end_date="20231231"
)

# 处理时间列(转为datetime,按时间排序)
df_stock["trade_date"] = pd.to_datetime(df_stock["trade_date"], format="%Y%m%d")
df_stock = df_stock.sort_values("trade_date").reset_index(drop=True)

# 选择核心特征(开盘价、最高价、最低价、收盘价、成交量)
df_stock = df_stock[["trade_date", "open", "high", "low", "close", "vol"]]
print(f"股价数据形状:{df_stock.shape}")
print(df_stock.head())
(2)天气数据:NOAA(全球气象数据,免费)
# 从NOAA获取北京2018-2023年的日度气温数据(也可直接用本地CSV)
df_weather = pd.read_csv("beijing_weather_2018_2023.csv")
# 处理时间列
df_weather["date"] = pd.to_datetime(df_weather["date"])
# 选择特征(日期、平均气温、最高气温、最低气温、降水量)
df_weather = df_weather[["date", "temp_avg", "temp_max", "temp_min", "precip"]]
df_weather = df_weather.sort_values("date").reset_index(drop=True)
print(f"天气数据形状:{df_weather.shape}")
print(df_weather.head())

2. 数据清洗:3大核心问题的实战解法

时间序列的清洗核心是“保留时序依赖,最小化信息损失”,不能像静态数据那样直接删除或填充。

(1)问题1:缺失值处理(最常见)

时序数据的缺失通常是“连续缺失”(如传感器故障)或“单点缺失”(如数据传输错误),推荐按缺失长度选择方法:

  • 单点缺失:线性插值(interpolate(method="linear"));
  • 连续缺失(<5个时刻):样条插值(interpolate(method="spline", order=3));
  • 连续缺失(≥5个时刻):前向填充+滑动平均修正(避免填充值过于生硬)。

代码实现

def fill_missing_values(df, feature_cols):
    """处理缺失值"""
    df_clean = df.copy()
    for col in feature_cols:
        # 1. 先处理单点缺失(线性插值)
        df_clean[col] = df_clean[col].interpolate(method="linear", limit=1)
        # 2. 处理连续缺失(样条插值,最多处理4个连续缺失)
        df_clean[col] = df_clean[col].interpolate(method="spline", order=3, limit=4)
        # 3. 剩余的长连续缺失(前向填充后用3步滑动平均修正)
        df_clean[col] = df_clean[col].fillna(method="ffill")
        # 滑动平均修正
        df_clean[col] = df_clean[col].rolling(window=3, min_periods=1).mean()
    return df_clean

# 清洗股价数据(特征列为open, high, low, close, vol)
feature_cols_stock = ["open", "high", "low", "close", "vol"]
df_stock_clean = fill_missing_values(df_stock, feature_cols_stock)

# 清洗天气数据(特征列为temp_avg, temp_max, temp_min, precip)
feature_cols_weather = ["temp_avg", "temp_max", "temp_min", "precip"]
df_weather_clean = fill_missing_values(df_weather, feature_cols_weather)

# 检查是否还有缺失值
print(f"股价数据缺失值:{df_stock_clean[feature_cols_stock].isnull().sum().sum()}")
print(f"天气数据缺失值:{df_weather_clean[feature_cols_weather].isnull().sum().sum()}")
(2)问题2:异常值处理(最影响精度)

时序异常值(如股价暴跌、气温骤升)通常是“突发噪声”,直接删除会破坏时序连续性,推荐“检测+修正”两步法:

  1. 检测:用“3σ原则”(适用于正态分布)或“IQR法”(适用于非正态分布);
  2. 修正:用“滑动中位数”替换异常值(比均值更鲁棒)。

代码实现

def detect_and_correct_outliers(df, feature_cols, window=5):
    """检测并修正异常值"""
    df_clean = df.copy()
    for col in feature_cols:
        # 1. 用IQR法检测异常值
        Q1 = df_clean[col].quantile(0.25)
        Q3 = df_clean[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # 2. 标记异常值
        outliers = (df_clean[col] < lower_bound) | (df_clean[col] > upper_bound)
        print(f"{col}异常值数量:{outliers.sum()}")
        
        # 3. 用滑动中位数修正异常值
        if outliers.sum() > 0:
            # 计算滑动中位数
            rolling_median = df_clean[col].rolling(window=window, min_periods=1).median()
            # 用中位数替换异常值
            df_clean.loc[outliers, col] = rolling_median[outliers]
    return df_clean

# 修正股价和天气数据的异常值
df_stock_clean = detect_and_correct_outliers(df_stock_clean, feature_cols_stock)
df_weather_clean = detect_and_correct_outliers(df_weather_clean, feature_cols_weather)
(3)问题3:数据归一化(模型收敛的关键)

时间序列的特征通常具有不同量级(如股价1000+,成交量1e8+),必须归一化到同一区间。推荐两种方法:

  • Min-Max Scaling(缩放到[0,1]):适合有明确上下限的数据(如气温-20~40℃);
  • Standard Scaling(均值0,标准差1):适合近似正态分布的数据(如股价、成交量)。

代码实现

from sklearn.preprocessing import MinMaxScaler, StandardScaler

def normalize_data(df, feature_cols, method="standard"):
    """数据归一化"""
    df_norm = df.copy()
    # 选择缩放器
    if method == "standard":
        scaler = StandardScaler()
    elif method == "minmax":
        scaler = MinMaxScaler(feature_range=(0, 1))
    # 拟合并转换
    df_norm[feature_cols] = scaler.fit_transform(df_norm[feature_cols])
    return df_norm, scaler

# 股价数据用Standard Scaling(近似正态分布)
df_stock_norm, scaler_stock = normalize_data(df_stock_clean, feature_cols_stock, method="standard")

# 天气数据用Min-Max Scaling(气温、降水量有明确范围)
df_weather_norm, scaler_weather = normalize_data(df_weather_clean, feature_cols_weather, method="minmax")

# 查看归一化后的数据
print("股价数据归一化后:")
print(df_stock_norm[feature_cols_stock].describe())

3. 特征工程:提升预测精度的“关键一步”

时间序列的特征工程核心是“挖掘时序依赖”,常用4类特征:

(1)时间特征(捕捉季节性)

提取“小时、星期、月份、季度”等特征,帮助模型学习周期性:

def add_time_features(df, time_col):
    """添加时间特征"""
    df_time = df.copy()
    df_time["hour"] = df_time[time_col].dt.hour  # 小时(日度数据为0)
    df_time["dayofweek"] = df_time[time_col].dt.dayofweek  # 星期(0=周一,6=周日)
    df_time["month"] = df_time[time_col].dt.month  # 月份
    df_time["quarter"] = df_time[time_col].dt.quarter  # 季度
    df_time["is_weekend"] = df_time["dayofweek"].apply(lambda x: 1 if x >=5 else 0)  # 是否周末
    return df_time

# 给股价数据添加时间特征(time_col="trade_date")
df_stock_norm = add_time_features(df_stock_norm, "trade_date")

# 给天气数据添加时间特征(time_col="date")
df_weather_norm = add_time_features(df_weather_norm, "date")

# 查看添加后的特征
print("股价数据新增时间特征:")
print(df_stock_norm[["trade_date", "dayofweek", "month", "is_weekend"]].head())
(2)滞后特征(捕捉时序依赖)

提取“过去1天、3天、7天”的历史值,直接喂给模型学习依赖关系:

def add_lag_features(df, feature_cols, lag_steps=[1, 3, 7]):
    """添加滞后特征"""
    df_lag = df.copy()
    for col in feature_cols:
        for lag in lag_steps:
            # 滞后lag步的特征(如close_lag1=昨天的收盘价)
            df_lag[f"{col}_lag{lag}"] = df_lag[col].shift(lag)
    # 删除因滞后产生的缺失值
    df_lag = df_lag.dropna()
    return df_lag

# 给股价数据添加滞后特征(滞后1、3、7天)
df_stock_norm = add_lag_features(df_stock_norm, feature_cols_stock, lag_steps=[1,3,7])

# 给天气数据添加滞后特征(滞后1、2、3天)
df_weather_norm = add_lag_features(df_weather_norm, feature_cols_weather, lag_steps=[1,2,3])

print(f"添加滞后特征后股价数据形状:{df_stock_norm.shape}")
(3)滚动统计特征(捕捉趋势)

计算“滚动均值、滚动标准差”等,帮助模型学习短期趋势:

def add_rolling_features(df, feature_cols, windows=[3, 7, 14]):
    """添加滚动统计特征"""
    df_rolling = df.copy()
    for col in feature_cols:
        for window in windows:
            # 滚动均值(如close_roll7=过去7天收盘价均值)
            df_rolling[f"{col}_roll{window}_mean"] = df_rolling[col].rolling(window=window, min_periods=1).mean()
            # 滚动标准差(捕捉波动)
            df_rolling[f"{col}_roll{window}_std"] = df_rolling[col].rolling(window=window, min_periods=1).std()
    return df_rolling

# 给股价数据添加滚动特征(窗口3、7、14天)
df_stock_norm = add_rolling_features(df_stock_norm, feature_cols_stock, windows=[3,7,14])

print(f"添加滚动特征后股价数据形状:{df_stock_norm.shape}")

4. 数据集构建:时序数据的“序列划分”

时序数据不能随机划分训练/测试集(会破坏时序依赖),必须按时间顺序划分,且要构建“输入序列→输出序列”的样本格式。

(1)构建序列样本(核心函数)
import torch
from torch.utils.data import Dataset, DataLoader

def create_sequences(data, look_back, predict_steps, input_cols, target_cols):
    """
    构建时序序列样本
    :param data: 预处理后的DataFrame
    :param look_back: 输入序列长度(用过去多少个时刻)
    :param predict_steps: 输出序列长度(预测未来多少个时刻)
    :param input_cols: 输入特征列
    :param target_cols: 目标列(要预测的特征)
    :return: X(输入序列), y(输出序列)
    """
    X, y = [], []
    # 遍历数据,构建样本
    for i in range(len(data) - look_back - predict_steps + 1):
        # 输入序列:过去look_back个时刻的输入特征
        input_seq = data.iloc[i:i+look_back][input_cols].values
        # 输出序列:未来predict_steps个时刻的目标特征
        target_seq = data.iloc[i+look_back:i+look_back+predict_steps][target_cols].values
        X.append(input_seq)
        y.append(target_seq)
    # 转为PyTorch张量
    X = torch.tensor(X, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.float32)
    return X, y

# 定义股价预测的参数
look_back_stock = 30  # 用过去30天预测
predict_steps_stock = 1  # 单步预测(预测明天的收盘价)
input_cols_stock = feature_cols_stock + [col for col in df_stock_norm.columns if any(suffix in col for suffix in ["lag", "roll", "dayofweek", "month", "is_weekend"])]
target_cols_stock = ["close"]  # 预测收盘价

# 构建股价序列样本
X_stock, y_stock = create_sequences(
    df_stock_norm,
    look_back=look_back_stock,
    predict_steps=predict_steps_stock,
    input_cols=input_cols_stock,
    target_cols=target_cols_stock
)

print(f"股价样本形状:X={X_stock.shape}, y={y_stock.shape}")  # X: [样本数, look_back, 输入特征数], y: [样本数, predict_steps, 目标特征数]
(2)按时间划分训练/测试集
def split_train_test(X, y, test_size=0.2):
    """按时间顺序划分训练/测试集"""
    split_idx = int(len(X) * (1 - test_size))
    X_train, X_test = X[:split_idx], X[split_idx:]
    y_train, y_test = y[:split_idx], y[split_idx:]
    return X_train, X_test, y_train, y_test

# 划分股价数据(测试集占20%)
X_stock_train, X_stock_test, y_stock_train, y_stock_test = split_train_test(X_stock, y_stock, test_size=0.2)

print(f"股价训练集:X={X_stock_train.shape}, y={y_stock_train.shape}")
print(f"股价测试集:X={X_stock_test.shape}, y={y_stock_test.shape}")
(3)封装为PyTorch Dataset
class TimeSeriesDataset(Dataset):
    """时间序列数据集"""
    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# 实例化股价数据集
train_dataset_stock = TimeSeriesDataset(X_stock_train, y_stock_train)
test_dataset_stock = TimeSeriesDataset(X_stock_test, y_stock_test)

# 加载数据(batch_size根据GPU显存调整)
batch_size = 32
train_loader_stock = DataLoader(train_dataset_stock, batch_size=batch_size, shuffle=False)  # 时序数据不shuffle!
test_loader_stock = DataLoader(test_dataset_stock, batch_size=batch_size, shuffle=False)

# 测试数据集
sample_X, sample_y = train_dataset_stock[0]
print(f"样本输入形状:{sample_X.shape}")  # [look_back, 输入特征数]
print(f"样本输出形状:{sample_y.shape}")  # [predict_steps, 目标特征数]

三、模型层:LSTM/TCN/Transformer的PyTorch实现与选型

时间序列模型的选型核心是“匹配数据特性”——没有“万能模型”,但有“最优适配”。以下是3种主流模型的实战实现。

1. LSTM:捕捉长时序依赖(经典之选)

LSTM(Long Short-Term Memory)通过“门控机制”解决了RNN的“梯度消失”问题,能有效捕捉长时序依赖,是时序预测的“入门首选”。

(1)核心原理

LSTM有3个关键门控:

  • 遗忘门:决定丢弃哪些历史信息;
  • 输入门:决定哪些新信息存入细胞状态;
  • 输出门:根据细胞状态输出当前时刻的结果。

适合场景:有明确时序依赖、中等噪声的数据(如工业设备振动、电力负荷)。

(2)PyTorch实现
class LSTMTimeSeries(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim, predict_steps, dropout=0.2):
        """
        :param input_dim: 输入特征维度
        :param hidden_dim: LSTM隐藏层维度
        :param num_layers: LSTM层数
        :param output_dim: 输出特征维度(目标特征数)
        :param predict_steps: 预测步数
        :param dropout:  dropout率
        """
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.predict_steps = predict_steps
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,  # 输入形状:[batch_size, seq_len, input_dim]
            dropout=dropout if num_layers > 1 else 0  # 多层时才用dropout
        )
        
        # 全连接层(将LSTM输出映射到预测步数)
        self.fc = nn.Linear(hidden_dim, output_dim * predict_steps)
    
    def forward(self, x):
        # x: [batch_size, look_back, input_dim]
        batch_size = x.size(0)
        
        # 初始化LSTM隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(x.device)
        
        # LSTM前向传播:输出shape=[batch_size, look_back, hidden_dim]
        lstm_out, _ = self.lstm(x, (h0, c0))
        
        # 取最后一个时刻的输出(包含所有历史信息)
        last_hidden = lstm_out[:, -1, :]  # [batch_size, hidden_dim]
        
        # 全连接层预测:输出shape=[batch_size, output_dim*predict_steps]
        output = self.fc(last_hidden)
        
        # 重塑为[batch_size, predict_steps, output_dim]
        output = output.view(batch_size, self.predict_steps, -1)
        
        return output

# 实例化LSTM模型(股价预测)
input_dim_stock = X_stock_train.shape[2]  # 输入特征维度
output_dim_stock = len(target_cols_stock)  # 输出特征维度(1:仅收盘价)

lstm_model = LSTMTimeSeries(
    input_dim=input_dim_stock,
    hidden_dim=128,
    num_layers=2,
    output_dim=output_dim_stock,
    predict_steps=predict_steps_stock,
    dropout=0.2
)

# 迁移到GPU(如果可用)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
lstm_model.to(device)

print(f"LSTM模型结构:")
print(lstm_model)
print(f"模型参数量:{sum(p.numel() for p in lstm_model.parameters()):,}")

2. TCN:捕捉多尺度周期性(气象/电力首选)

TCN(Temporal Convolutional Network)通过“扩张卷积”和“残差连接”,能同时捕捉短期和长期周期性,比LSTM更适合有强季节性、多尺度依赖的数据(如天气、电力负荷)。

(1)核心原理
  • 扩张卷积:通过扩大卷积核的感受野,捕捉不同尺度的时序模式(如天气的日周期、周周期、月周期);
  • 残差连接:解决深层网络的梯度消失问题,允许构建更深的网络。

适合场景:强季节性、多尺度依赖的数据(如气温、降水量、电力负荷)。

(2)PyTorch实现
class TemporalBlock(nn.Module):
    """TCN的残差块"""
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super().__init__()
        # 第一层卷积
        self.conv1 = nn.Conv1d(
            n_inputs, n_outputs, kernel_size,
            stride=stride, padding=padding, dilation=dilation
        )
        self.bn1 = nn.BatchNorm1d(n_outputs)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)
        
        # 第二层卷积
        self.conv2 = nn.Conv1d(
            n_outputs, n_outputs, kernel_size,
            stride=stride, padding=padding, dilation=dilation
        )
        self.bn2 = nn.BatchNorm1d(n_outputs)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)
        
        # 残差连接(若输入输出维度不同,用1x1卷积调整)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        
    def forward(self, x):
        # x: [batch_size, input_dim, seq_len]
        residual = x
        
        # 第一层
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu1(out)
        out = self.dropout1(out)
        
        # 第二层
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.dropout2(out)
        
        # 残差连接
        if self.downsample is not None:
            residual = self.downsample(residual)
        
        out += residual
        out = self.relu(out)
        
        return out

class TCNTimeSeries(nn.Module):
    def __init__(self, input_dim, output_dim, predict_steps, num_channels, kernel_size=3, dropout=0.2):
        """
        :param input_dim: 输入特征维度
        :param output_dim: 输出特征维度
        :param predict_steps: 预测步数
        :param num_channels: 各层输出通道数(如[64, 64, 128])
        :param kernel_size: 卷积核大小
        :param dropout: dropout率
        """
        super().__init__()
        self.predict_steps = predict_steps
        layers = []
        num_levels = len(num_channels)
        
        for i in range(num_levels):
            dilation_size = 2 ** i  # 扩张系数(2^i,指数增长)
            in_channels = input_dim if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            # 计算padding,保证输出序列长度与输入一致
            padding = (kernel_size - 1) * dilation_size
            # 添加TCN残差块
            layers += [TemporalBlock(
                in_channels, out_channels, kernel_size,
                stride=1, dilation=dilation_size, padding=padding, dropout=dropout
            )]
        
        # TCN卷积层
        self.tcn = nn.Sequential(*layers)
        # 全连接层(预测输出)
        self.fc = nn.Linear(num_channels[-1], output_dim * predict_steps)
    
    def forward(self, x):
        # x: [batch_size, look_back, input_dim] → TCN需要[batch_size, input_dim, look_back]
        x = x.permute(0, 2, 1)  # 维度转换
        
        # TCN前向传播:输出[batch_size, num_channels[-1], look_back]
        tcn_out = self.tcn(x)
        
        # 取最后一个时刻的输出
        last_out = tcn_out[:, :, -1]  # [batch_size, num_channels[-1]]
        
        # 全连接层预测
        output = self.fc(last_out)
        # 重塑为[batch_size, predict_steps, output_dim]
        output = output.view(output.size(0), self.predict_steps, -1)
        
        return output

# 实例化TCN模型(天气预测,假设已构建天气数据的X_weather_train)
# input_dim_weather = X_weather_train.shape[2]
# output_dim_weather = len(target_cols_weather)  # 如预测气温、降水量,output_dim=4

# tcn_model = TCNTimeSeries(
#     input_dim=input_dim_weather,
#     output_dim=output_dim_weather,
#     predict_steps=7,  # 预测未来7天天气
#     num_channels=[64, 64, 128],  # 三层TCN,通道数64→64→128
#     kernel_size=3,
#     dropout=0.2
# )

# tcn_model.to(device)

3. Transformer:捕捉非线性依赖(股价/金融首选)

Transformer通过“自注意力机制”能同时关注序列中的所有时刻,比LSTM更擅长捕捉非线性、长距离依赖,适合股价、金融等复杂时序数据。

(1)核心原理
  • 自注意力机制:计算每个时刻与其他所有时刻的关联权重,捕捉全局依赖;
  • 位置编码:给序列添加位置信息(Transformer本身无时序感知);
  • 多头注意力:并行计算多个注意力头,捕捉不同类型的依赖。

适合场景:高噪声、非线性、长距离依赖的数据(如股价、汇率、金融指数)。

(2)PyTorch实现
class PositionalEncoding(nn.Module):
    """位置编码"""
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        # 初始化位置编码矩阵
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # 位置编码公式:PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
        # PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        # 添加batch维度
        pe = pe.unsqueeze(0)
        # 不更新位置编码
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        x = x + self.pe[:, :x.size(1), :]
        return x

class TransformerTimeSeries(nn.Module):
    def __init__(self, input_dim, d_model, n_heads, n_layers, output_dim, predict_steps, dropout=0.2):
        """
        :param input_dim: 输入特征维度
        :param d_model: Transformer的模型维度(需为n_heads的倍数)
        :param n_heads: 多头注意力头数
        :param n_layers: Transformer层数
        :param output_dim: 输出特征维度
        :param predict_steps: 预测步数
        :param dropout: dropout率
        """
        super().__init__()
        self.predict_steps = predict_steps
        self.d_model = d_model
        
        # 输入投影(将input_dim映射到d_model)
        self.input_proj = nn.Linear(input_dim, d_model)
        # 位置编码
        self.pos_encoder = PositionalEncoding(d_model)
        # Transformer编码器层
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_model * 4,  # 前馈网络维度通常为d_model*4
            dropout=dropout,
            batch_first=True  # 输入shape=[batch_size, seq_len, d_model]
        )
        # Transformer编码器
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
        # 全连接层(预测输出)
        self.fc = nn.Linear(d_model, output_dim * predict_steps)
    
    def forward(self, x):
        # x: [batch_size, look_back, input_dim]
        batch_size = x.size(0)
        
        # 输入投影:[batch_size, look_back, input_dim] → [batch_size, look_back, d_model]
        x = self.input_proj(x) * np.sqrt(self.d_model)  # 缩放,稳定训练
        # 位置编码
        x = self.pos_encoder(x)
        # Transformer编码:输出[batch_size, look_back, d_model]
        transformer_out = self.transformer_encoder(x)
        # 取最后一个时刻的输出
        last_out = transformer_out[:, -1, :]  # [batch_size, d_model]
        # 全连接层预测
        output = self.fc(last_out)
        # 重塑为[batch_size, predict_steps, output_dim]
        output = output.view(batch_size, self.predict_steps, -1)
        
        return output

# 实例化Transformer模型(股价预测)
transformer_model = TransformerTimeSeries(
    input_dim=input_dim_stock,
    d_model=128,  # 需为n_heads的倍数(如128=8*16)
    n_heads=8,
    n_layers=2,
    output_dim=output_dim_stock,
    predict_steps=predict_steps_stock,
    dropout=0.2
)

transformer_model.to(device)
print(f"Transformer模型参数量:{sum(p.numel() for p in transformer_model.parameters()):,}")

4. 模型选型决策树(实战首选)

数据特性 推荐模型 理由 示例场景
强时序依赖、中等噪声 LSTM 门控机制捕捉长依赖,实现简单 设备振动预测、电力负荷预测
强季节性、多尺度依赖 TCN 扩张卷积捕捉多周期,精度高于LSTM 气温预测、降水量预测
高噪声、非线性、长依赖 Transformer 自注意力捕捉全局非线性依赖 股价预测、汇率预测
追求精度与效率平衡 LSTM+Transformer 融合LSTM的时序记忆与Transformer的全局依赖 金融指数预测、销量预估

四、实战案例:股价+天气预测全流程(从训练到预测)

以“股价单步预测”和“天气多步预测”为例,完整展示时序预测的实战流程。

案例1:股价单步预测(LSTM+Transformer融合)

任务:用过去30天的股价数据(开盘价、收盘价、成交量等)预测明天的收盘价,目标RMSE<5%。

(1)训练配置
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

# 选择模型(这里用Transformer,也可替换为LSTM)
model = transformer_model
# 损失函数(MSE,适合回归任务)
criterion = nn.MSELoss()
# 优化器(AdamW,比Adam更稳定)
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
# 学习率调度器(验证损失不下降时衰减lr)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)
# 训练轮数
num_epochs = 50
# 最佳模型保存路径
best_model_path = "best_stock_transformer.pth"
# 记录最佳验证损失
best_val_loss = float('inf')
(2)训练与验证循环
def train_one_epoch(model, train_loader, criterion, optimizer, device):
    """训练一个epoch"""
    model.train()
    total_loss = 0.0
    for X_batch, y_batch in train_loader:
        # 迁移到设备
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        
        # 前向传播
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        
        # 反向传播与优化
        optimizer.zero_grad()
        loss.backward()
        # 梯度裁剪(防止梯度爆炸)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        # 累积损失
        total_loss += loss.item() * X_batch.size(0)
    
    # 平均损失
    avg_loss = total_loss / len(train_loader.dataset)
    return avg_loss

def validate(model, val_loader, criterion, device):
    """验证模型"""
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            
            total_loss += loss.item() * X_batch.size(0)
    
    avg_loss = total_loss / len(val_loader.dataset)
    return avg_loss

# 开始训练
for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")
    # 训练
    train_loss = train_one_epoch(model, train_loader_stock, criterion, optimizer, device)
    # 验证
    val_loss = validate(model, test_loader_stock, criterion, device)
    # 学习率调度
    scheduler.step(val_loss)
    # 保存最佳模型
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), best_model_path)
        print(f"最佳模型更新:验证损失 {val_loss:.6f}")
    
    print(f"训练损失:{train_loss:.6f}, 验证损失:{val_loss:.6f}")

# 加载最佳模型
model.load_state_dict(torch.load(best_model_path))
(3)预测与结果可视化
import matplotlib.pyplot as plt

def inverse_transform_prediction(y_pred, y_true, scaler, target_cols, feature_cols):
    """将预测结果反归一化(恢复原始尺度)"""
    # 创建空数组,形状与归一化前的特征一致
    y_pred_inv = np.zeros((y_pred.shape[0], len(feature_cols)))
    y_true_inv = np.zeros((y_true.shape[0], len(feature_cols)))
    # 填入目标特征的预测值和真实值
    target_idx = [feature_cols.index(col) for col in target_cols]
    y_pred_inv[:, target_idx] = y_pred.squeeze()
    y_true_inv[:, target_idx] = y_true.squeeze()
    # 反归一化
    y_pred_inv = scaler.inverse_transform(y_pred_inv)[:, target_idx]
    y_true_inv = scaler.inverse_transform(y_true_inv)[:, target_idx]
    return y_pred_inv, y_true_inv

def plot_predictions(y_true, y_pred, title):
    """绘制预测结果对比图"""
    plt.figure(figsize=(12, 6))
    plt.plot(y_true, label='真实值', color='blue', linewidth=1.5)
    plt.plot(y_pred, label='预测值', color='red', linewidth=1.5, alpha=0.8)
    plt.title(title, fontsize=14)
    plt.xlabel('时间步', fontsize=12)
    plt.ylabel('收盘价(元)', fontsize=12)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# 生成测试集预测结果
model.eval()
y_pred = []
with torch.no_grad():
    for X_batch, _ in test_loader_stock:
        X_batch = X_batch.to(device)
        outputs = model(X_batch)
        y_pred.append(outputs.cpu().numpy())

# 合并预测结果
y_pred = np.concatenate(y_pred, axis=0)
y_true = y_stock_test.cpu().numpy()

# 反归一化(恢复原始股价尺度)
y_pred_inv, y_true_inv = inverse_transform_prediction(
    y_pred, y_true, scaler_stock, target_cols_stock, feature_cols_stock
)

# 计算评估指标
from sklearn.metrics import mean_absolute_error, mean_squared_error

mae = mean_absolute_error(y_true_inv, y_pred_inv)
mse = mean_squared_error(y_true_inv, y_pred_inv)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((y_true_inv - y_pred_inv) / y_true_inv)) * 100

print(f"股价预测评估指标:")
print(f"MAE:{mae:.2f} 元")
print(f"MSE:{mse:.2f}")
print(f"RMSE:{rmse:.2f} 元")
print(f"MAPE:{mape:.2f}%")

# 绘制预测结果(取最后100个时刻的结果,更直观)
plot_predictions(y_true_inv[-100:], y_pred_inv[-100:], title="股价预测结果对比(最后100个时刻)")
(4)结果分析
  • 若预测值“滞后于”真实值:说明模型只学习了短期依赖,可增加look_back长度(如从30增至60),或添加更多滞后特征;
  • 若预测值波动过大:说明模型过拟合噪声,可增大dropout率(如从0.2增至0.3),或减少模型参数量;
  • 若RMSE过高(>5%):可尝试LSTM+Transformer融合,或添加更多特征(如宏观经济指标)。

案例2:天气多步预测(TCN)

任务:用过去14天的天气数据预测未来7天的平均气温、最高气温、最低气温和降水量,目标MAE<2℃(气温)、<5mm(降水量)。

(1)核心差异与调整

多步预测比单步预测更复杂,需调整以下参数:

  • predict_steps=7(预测未来7天);
  • 损失函数:用“多步MSE”,对远期预测可加权(如远期预测的权重降低,减少误差影响);
  • 模型选择:优先用TCN(捕捉多尺度季节性)。
(2)关键代码(省略重复部分)
# 1. 数据预处理(假设已完成清洗、特征工程)
look_back_weather = 14  # 用过去14天预测
predict_steps_weather = 7  # 预测未来7天
input_cols_weather = feature_cols_weather + [时间特征、滞后特征、滚动特征]
target_cols_weather = ["temp_avg", "temp_max", "temp_min", "precip"]

# 2. 构建序列样本
X_weather, y_weather = create_sequences(
    df_weather_norm,
    look_back=look_back_weather,
    predict_steps=predict_steps_weather,
    input_cols=input_cols_weather,
    target_cols=target_cols_weather
)

# 3. 划分训练/测试集
X_weather_train, X_weather_test, y_weather_train, y_weather_test = split_train_test(X_weather, y_weather, test_size=0.2)

# 4. 实例化TCN模型
tcn_model = TCNTimeSeries(
    input_dim=X_weather_train.shape[2],
    output_dim=len(target_cols_weather),
    predict_steps=predict_steps_weather,
    num_channels=[64, 64, 128],
    kernel_size=3,
    dropout=0.2
)
tcn_model.to(device)

# 5. 训练(同股价预测,损失函数用MSE)
# ...(训练代码省略)

# 6. 预测与反归一化
model.eval()
y_weather_pred = []
with torch.no_grad():
    for X_batch, _ in test_loader_weather:
        X_batch = X_batch.to(device)
        outputs = tcn_model(X_batch)
        y_weather_pred.append(outputs.cpu().numpy())

y_weather_pred = np.concatenate(y_weather_pred, axis=0)
y_weather_true = y_weather_test.cpu().numpy()

# 反归一化(天气用Min-Max Scaling)
def inverse_transform_multi(y_pred, y_true, scaler, target_cols, feature_cols):
    y_pred_inv = np.zeros((y_pred.shape[0], y_pred.shape[1], len(feature_cols)))
    y_true_inv = np.zeros((y_true.shape[0], y_true.shape[1], len(feature_cols)))
    target_idx = [feature_cols.index(col) for col in target_cols]
    for t in range(y_pred.shape[1]):
        y_pred_inv[:, t, target_idx] = y_pred[:, t, :]
        y_true_inv[:, t, target_idx] = y_true[:, t, :]
    # 反归一化(需遍历每个时间步)
    for i in range(y_pred_inv.shape[0]):
        y_pred_inv[i] = scaler.inverse_transform(y_pred_inv[i])
        y_true_inv[i] = scaler.inverse_transform(y_true_inv[i])
    return y_pred_inv[:, :, target_idx], y_true_inv[:, :, target_idx]

y_weather_pred_inv, y_weather_true_inv = inverse_transform_multi(
    y_weather_pred, y_weather_true, scaler_weather, target_cols_weather, feature_cols_weather
)

# 7. 绘制7天气温预测结果
plt.figure(figsize=(14, 8))
# 真实值
plt.plot(y_weather_true_inv[0, :, 0], 'o-', label='真实平均气温', color='blue')
# 预测值
plt.plot(y_weather_pred_inv[0, :, 0], 's-', label='预测平均气温', color='red')
plt.title('未来7天平均气温预测结果(第1个测试样本)', fontsize=14)
plt.xlabel('预测天数', fontsize=12)
plt.ylabel('平均气温(℃)', fontsize=12)
plt.xticks(range(7), [f'第{i+1}天' for i in range(7)])
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

五、模型融合:提升预测精度的“终极技巧”

单一模型的预测能力有限,模型融合能通过组合多个模型的优势,进一步提升精度(通常能降低10%-20%的误差)。以下是3种实战融合方法。

1. 加权平均融合(最简单有效)

将多个模型的预测结果按权重平均,权重可根据模型的验证精度设定(精度高的模型权重更大)。

代码实现

def weighted_average_fusion(predictions, weights):
    """
    加权平均融合
    :param predictions: 模型预测结果列表(如[lstm_pred, transformer_pred, tcn_pred])
    :param weights: 权重列表(和为1,如[0.3, 0.5, 0.2])
    :return: 融合后的预测结果
    """
    # 检查预测结果形状是否一致
    assert len(predictions) == len(weights), "模型数量与权重数量不一致"
    for pred in predictions[1:]:
        assert pred.shape == predictions[0].shape, "所有模型的预测形状必须一致"
    
    # 加权平均
    fused_pred = np.zeros_like(predictions[0])
    for pred, w in zip(predictions, weights):
        fused_pred += pred * w
    return fused_pred

# 假设已有3个模型的预测结果
lstm_pred = ...  # LSTM的预测结果
transformer_pred = ...  # Transformer的预测结果
tcn_pred = ...  # TCN的预测结果(若适用)

# 根据验证精度设定权重(如Transformer精度最高,权重0.5)
weights = [0.3, 0.5, 0.2]
fused_pred = weighted_average_fusion([lstm_pred, transformer_pred, tcn_pred], weights)

# 计算融合后的指标
fused_rmse = np.sqrt(mean_squared_error(y_true_inv, fused_pred))
print(f"融合后RMSE:{fused_rmse:.2f} 元")
print(f"融合前Transformer RMSE:{rmse_transformer:.2f} 元")

2. Stacking融合(精度更高)

Stacking是“两级模型融合”:用多个基础模型的预测结果作为“新特征”,训练一个元模型(如线性回归、LightGBM)来输出最终预测。

代码实现

from sklearn.linear_model import LinearRegression
from sklearn.model_selection import KFold

def stacking_fusion(X_train, y_train, X_test, base_models, meta_model, n_folds=5):
    """
    Stacking融合
    :param X_train: 训练集输入序列
    :param y_train: 训练集目标
    :param X_test: 测试集输入序列
    :param base_models: 基础模型列表(如[lstm_model, transformer_model])
    :param meta_model: 元模型(如LinearRegression)
    :param n_folds: K折交叉验证的折数
    :return: 测试集的融合预测结果
    """
    batch_size = 32
    n_base_models = len(base_models)
    n_train = len(X_train)
    n_test = len(X_test)
    
    # 初始化基础模型的训练集预测矩阵(n_train, n_base_models, output_dim)
    train_meta_features = np.zeros((n_train, n_base_models, y_train.shape[2]))
    # 初始化基础模型的测试集预测矩阵(n_test, n_base_models, output_dim)
    test_meta_features = np.zeros((n_test, n_base_models, y_train.shape[2]))
    
    # K折交叉验证训练基础模型,生成元特征
    kf = KFold(n_splits=n_folds, shuffle=False)  # 时序数据不shuffle
    for model_idx, model in enumerate(base_models):
        print(f"训练基础模型 {model_idx+1}/{n_base_models}")
        for train_idx, val_idx in kf.split(X_train):
            # 划分K折的训练/验证集
            X_kf_train, X_kf_val = X_train[train_idx], X_train[val_idx]
            y_kf_train, y_kf_val = y_train[train_idx], y_train[val_idx]
            
            # 加载基础模型并训练(简化代码,实际需重新初始化训练)
            # ...(基础模型训练代码省略)
            
            # 生成验证集元特征
            model.eval()
            with torch.no_grad():
                val_loader = DataLoader(TimeSeriesDataset(X_kf_val, y_kf_val), batch_size=batch_size, shuffle=False)
                val_pred = []
                for X_batch, _ in val_loader:
                    X_batch = X_batch.to(device)
                    outputs = model(X_batch)
                    val_pred.append(outputs.cpu().numpy())
                val_pred = np.concatenate(val_pred, axis=0)
                train_meta_features[val_idx, model_idx, :] = val_pred.squeeze()
        
        # 生成测试集元特征(用完整训练的基础模型)
        test_loader = DataLoader(TimeSeriesDataset(X_test, y_test), batch_size=batch_size, shuffle=False)
        test_pred = []
        with torch.no_grad():
            for X_batch, _ in test_loader:
                X_batch = X_batch.to(device)
                outputs = model(X_batch)
                test_pred.append(outputs.cpu().numpy())
        test_pred = np.concatenate(test_pred, axis=0)
        test_meta_features[:, model_idx, :] = test_pred.squeeze()
    
    # 训练元模型(需将元特征展平)
    # 训练集:[n_train, n_base_models*output_dim]
    X_meta_train = train_meta_features.reshape(n_train, -1)
    y_meta_train = y_train.squeeze()
    # 测试集:[n_test, n_base_models*output_dim]
    X_meta_test = test_meta_features.reshape(n_test, -1)
    
    # 训练元模型
    meta_model.fit(X_meta_train, y_meta_train)
    # 生成融合预测
    fused_pred = meta_model.predict(X_meta_test).reshape(n_test, 1, -1)
    
    return fused_pred

# 实例化基础模型和元模型
base_models = [lstm_model, transformer_model]
meta_model = LinearRegression()

# 生成Stacking融合结果
stacked_pred = stacking_fusion(
    X_stock_train, y_stock_train, X_stock_test, base_models, meta_model, n_folds=5
)

# 评估融合效果
stacked_rmse = np.sqrt(mean_squared_error(y_true_inv, stacked_pred.squeeze()))
print(f"Stacking融合后RMSE:{stacked_rmse:.2f} 元")

3. 融合避坑指南

(1)坑1:融合“相似模型”

错误:融合LSTM和GRU(结构相似,捕捉的依赖关系一致)。
后果:融合增益极低(误差降低<5%),浪费计算资源。
解决:选择“互补模型”(如LSTM+TCN+Transformer),分别捕捉时序依赖、周期性、非线性。

(2)坑2:权重设定不合理

错误:给所有模型相同的权重(如[0.33, 0.33, 0.33])。
后果:精度低的模型拉低整体效果。
解决:按“验证集误差的倒数”设定权重(误差越小,权重越大),或用交叉验证优化权重。

(3)坑3:过拟合元模型

错误:用复杂的元模型(如深度神经网络)且训练数据少。
后果:元模型过拟合基础模型的噪声,测试集精度下降。
解决:优先用简单元模型(如线性回归、Ridge),或增加训练数据量。

六、学习路径

1. 学习路径(分3个阶段)

(1)入门阶段(1-2个月):基础能力搭建
  • 目标:掌握时序数据预处理和LSTM实现,能完成单步预测;
  • 核心任务
    1. 用LSTM预测股价单步收盘价,RMSE<8%;
    2. 掌握数据清洗(缺失值、异常值)和特征工程(滞后、滚动特征);
    3. 实现预测结果的反归一化和可视化。
  • 推荐资源
    • 书籍《Python时间序列分析与应用》;
    • PyTorch官方教程《Sequence Models and Long-Short Term Memory Networks》。
(2)进阶阶段(2-3个月):模型进阶与多步预测
  • 目标:掌握TCN/Transformer实现,能完成多步预测和模型融合;
  • 核心任务
    1. 用TCN预测未来7天的气温,MAE<2℃;
    2. 用Transformer预测股价,对比LSTM的效果差异;
    3. 实现加权平均融合,将预测误差降低10%+。
  • 推荐资源
    • 论文《An Empirical Evaluation of Generic Convolutional and Recurrent Networks for Sequence Modeling》(TCN);
    • 论文《Attention Is All You Need》(Transformer);
    • 开源项目《PyTorch-Time-Series》。
(3)专家阶段(3-6个月):工业级落地
  • 目标:能独立设计时序预测系统,解决工业场景问题;
  • 核心任务
    1. 完成一个工业项目(如设备故障预测、电力负荷预测),包含全流程落地;
    2. 用ONNX Runtime/TensorRT优化推理速度,满足实时性要求;
    3. 设计监控系统,检测模型漂移并触发重训练。
  • 推荐资源
    • 书籍《Time Series Forecasting with Python》;
    • 工业案例《Kaggle Energy Load Forecasting Competitions》;
    • ONNX Runtime官方文档《Time Series Optimization》。

我是南木 需要学习规划、就业指导、论文辅导和技术答疑的小伙伴欢迎扫码交流
在这里插入图片描述

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐