预训练大语言模型后,要使LLM更能应用的真正的各个生活领域中,还需要根据需求取微调模型。微调LLM的两个主要方式是用于分类的微调和用于执行指令的微调。

分类微调,即模型被训练出来识别一组特定的类别标签。如在消息中过滤垃圾信息和非垃圾信息,从图像中识别不同的植物种类,给新闻分类等。但是,经过分类微调的模型只能预测训练过程中遇到的类别。

指令微调能够执行更广泛的任务,适合处理需要应对多种任务的模型,指令微调提升了模型基于特定用户指令理解和生成响应的能力。但是,它需要更大的数据集和更多的计算资源来开发精通多种任务的模型。

在此,我们主要使用之前构建并预训练的GPT进行修改和分类微调,让模型对包含垃圾信息和非垃圾信息的邮件文本进行筛选。

1、构建数据集

下载数据集储存在.tsv文件中,并且查看数据的标签分布

#下载数据集
import urllib.request
import zipfile
import os
from pathlib import Path

url = "https://archive.ics.uci.edu/static/public/228/sms+spam+collection.zip"
zip_path = "sms+spam+collection.zip"
extracted_path = "sms_spam_collection"
data_file_path = Path(extracted_path) / "SMSSpamCollection.tsv"

def download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path):
    if data_file_path.exists():
        print("{data_file_path} already exists. Skipping download.")
        return

    #下载文件
    with urllib.request.urlopen(url) as response:
        with open(zip_path, "wb") as out_file:
            out_file.write(response.read())

    #解压文件
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall(extracted_path)

    original_file_path = Path(extracted_path) / "SMSSpamCollection"
    #添加.tsv扩展名
    os.rename(original_file_path, data_file_path)
    print(f"File downloaded and saved as {data_file_path}")

download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path)
#加载文件并且查看类别分布
import pandas as pd
df = pd.read_csv(data_file_path, sep="\t", header=None, names=["Label", "Text"])
print(df["Label"].value_counts())
"""
ham     4825
spam     747
Name: Label, dtype: int64
"""

可以看到非垃圾信息(ham)数量远高于垃圾信息(spam),为了建立一个小数据集用于展示,并且平衡数据集类别分布,我们创建一个每个类别均包含747个实例的数据集。

#构建一个平衡的数据集
def crate_balanced_dataset(df):
    #垃圾样本信息
    num_spam = df[df["Label"] == "spam"].shape[0]
    #随机采样非垃圾信息,使其数量与垃圾信息一致
    ham_subset = df[df["Label"] == "ham"].sample(
        num_spam, random_state=123
    )
    #组合垃圾信息和非垃圾信息,构建新的数据集
    balanced_df = pd.concat([
        ham_subset, df[df["Label"] == "spam"]
    ])

    return balanced_df

balanced_df = crate_balanced_dataset(df)
print(balanced_df["Label"].value_counts())
"""
ham     747
spam    747
Name: Label, dtype: int64
"""

将标签修改为0和1,分别代表ham和spam,按照训练集:验证集:测试集=7:1:2比例划分数据集。

#构建一个平衡的数据集
def crate_balanced_dataset(df):
    #垃圾样本信息
    num_spam = df[df["Label"] == "spam"].shape[0]
    #随机采样非垃圾信息,使其数量与垃圾信息一致
    ham_subset = df[df["Label"] == "ham"].sample(
        num_spam, random_state=123
    )
    #组合垃圾信息和非垃圾信息,构建新的数据集
    balanced_df = pd.concat([
        ham_subset, df[df["Label"] == "spam"]
    ])

    return balanced_df

# 调整标签为0、1
balanced_df["Label"] = balanced_df["Label"].map({"ham": 0, "spam": 1})
#划分数据集
train_df, val_df, test_df = random_split(balanced_df, 0.7, 0.1)
#保存为csv文件
train_df.to_csv("train.csv", index=None)
val_df.to_csv("val.csv", index=None)
test_df.to_csv("test.csv", index=None)

接下来就是构建用于pytorch训练的数据加载器了。之前使用滑动窗口来生成统一大小的文本块,然后分组为批次。当处理包含长度不一的文本消息数据集时,可以截断到数据集中最短消息长度或批次长度,或者填充到数据集中最长消息的长度或批次长度。第一种虽然减少计算开销但是会降低模型性能,所以此处选择第二种填充的方案。

将"<|endoftext|>"的词元ID50256填充到编码的文本消息中。

class SpamDataset(Dataset):
    def __init__(self, csv_file, tokenizer, max_length=None, pad_token_id=50256):
        self.data = pd.read_csv(csv_file)
        self.encoded_texts = [tokenizer.encode(text) for text in self.data["Text"]]
        if max_length is None:
            self.max_length = self._longest_encoded_length()
        else:
            self.max_length = max_length
            #序列长度超过最大长度,截断
            self.encoded_texts = [text[:self.max_length] for text in self.encoded_texts]
        
        #填充到最长序列长度
        self.encoded_texts = [text + [pad_token_id] * (self.max_length - len(text))
                              for text in self.encoded_texts]
    
    def __getitem__(self, item):
        encoded = self.encoded_texts[item]
        label = self.data.iloc[item]["Label"]
        return (torch.tensor(encoded, dtype=torch.long),
                torch.tensor(label, dtype=torch.long))
    
    def __len__(self):
        return len(self.data)
    
    def _longest_encoded_length(self):
        max_length = 0
        for text in self.encoded_texts:
            text_length = len(text)
            if text_length > max_length:
                max_length = text_length
        return  max_length

构造数据加载器

tokenizer = tiktoken.get_encoding("gpt2")
#验证集测试集的最大长度不要超过训练集的
train_dataset = SpamDataset("train.csv", max_length=None, tokenizer=tokenizer)
val_dataset = SpamDataset("val.csv", max_length=train_dataset.max_length, tokenizer=tokenizer)
test_dataset = SpamDataset("test.csv", max_length=train_dataset.max_length, tokenizer=tokenizer)

num_workers = 0
batch_size = 8
torch.manual_seed(123)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=num_workers, drop_last=False)

2、初始化带有预训练权重的模型

参照上一讲的,给模型加载预训练权重。下载权重后,使用load_weight_into_gpt方法

加载到GPTModel实例化中。

#设置模型配置
    CHOOSE_MODEL = "gpt2-small (124M)"
    INPUT_PROMPT = "Every effort moves"
    BASE_CONFIG = {
        "vocab_size": 50257,
        "context_length": 1024,
        "drop_rate": 0.0,
        "qkv_bias": True
    }
    model_configs = {
        "gpt2-small (124M)": {"emb_dim": 768, "num_layers": 12, "num_heads": 12},
        "gpt2-medium (355M)": {"emb_dim": 1024, "num_layers": 24, "num_heads": 16},
        "gpt2-large (774M)": {"emb_dim": 1280, "num_layers": 36, "num_heads": 20},
        "gpt2-xl (1558M)": {"emb_dim": 1600, "num_layers": 48, "num_heads": 25}
    }
    BASE_CONFIG.update(model_configs[CHOOSE_MODEL])
    #参考第五讲的加载预训练方法
    from gpt_download import download_and_load_gpt2
    from PreTrain import GPTModel, load_weight_into_gpt
    
    model_size = CHOOSE_MODEL.split(" ")[-1].lstrip("(").rstrip(")")
    settings, params = download_and_load_gpt2(model_size=model_size, models_dir="gpt2")
    model = GPTModel(BASE_CONFIG)
    load_weight_into_gpt(model, params)
    model.eval()

3、设置分类头和评估函数

之前构造的gpt模型是用于生成下一个词元的,不符合这里要求的分类任务,所以需要将原始的输出层(映射到包含50257个词汇的词汇表中)修改为一个较小的输出层(只需要包含0和1两个类别即可)。

此外,刚刚我们给模型导入了预训练权重,而微调模型时是不需要微调所有的层的。神经网络模型中,低层捕捉的是基本的语义信息,适用于广泛的任务和数据集,最后几层关注的更多的是细微的语言模式和特定的任务特征,所以我们只需要微调最后几层即可。我们选择微调最后一个trainsformer块、最终归一化层和输出层。

#微调模型的相关修改
def fine_tuning_config(model, num_class, BASE_CONFIG):
    #冻结模型参数
    for param in model.parameters():
        param.requires_grad = False
    #修改最后的输出层输出维度
    model.out_head = torch.nn.Linear(in_features=BASE_CONFIG["emb_dim"], out_features=num_class)
    #最后一个transformer块和最终归一化层设为可训练
    for param in model.transformer_blocks[-1].parameters():
        param.requires_grad = True
    for param in model.final_norm.parameters():
        param.requires_grad = True

用示例展示下效果,输入维度为(1,4),batct size是1,后面的4是因为输入了四个词元。输出的shape是(1,4,2)是因为最后的输出层输出维度是2。在将其转换成类别标签预测之前,我们只需要取输出向量的最后一行数据(outputs[:, -1, :])即可。因为在GPT模型中常会用到之前说过的因果注意力掩码,这使得当前词元只关注当前位置及之前位置的词元,而序列中最后一个词元无疑是关注的词元数量最多,其包含的信息也是最多的。因此,在分类任务中,微调过程中只需要关注最后一个即可。

num_class = 2
fine_tuning_config(model, num_class, BASE_CONFIG)
#演示效果
inputs = tokenizer.encode("Do you have time")
inputs = torch.tensor(inputs).unsqueeze(0) #添加batch维度
print("inputs shape:", inputs.shape)      #torch.Size([1, 4])
with torch.no_grad():
    outputs = model(inputs)
print("outputs shape:", outputs.shape)    #torch.Size([1, 4, 2])

最后,引入计算分类精确度和损失方法。基本和上讲一致,只需要关注,输出的logits取向量的最后一行,然后使用argmax取概率最大的索引即可。

#计算分类准确率
def calc_accuracy_loader(data_loader, model, device, num_batches=None):
    model.eval()
    correct_preds, num_examples = 0, 0
    if batch_size is None:
        num_batches = len(data_loader)
    else:
        num_batches = min(num_batches, len(data_loader))

    for i, (input_batch, target_batch) in enumerate(data_loader):
        if i > num_batches:
            break
        input_batch = input_batch.to(device)
        target_batch = target_batch.to(device)
        with torch.no_grad():
            logits = model(input_batch)[:, -1, :] #保存最后一个词元的logits
        predict_labels = torch.argmax(logits, dim=-1) #argmax取最大概率为预测标签

        num_examples += predict_labels.shape[0] #batch size
        correct_preds += (predict_labels == target_batch).sum().item()

    return correct_preds / num_examples

#计算损失
def calc_loss_batch(input_batch, target_batch, model, device):
    input_batch = input_batch.to(device)
    target_batch = target_batch.to(device)
    logits = model(input_batch)[:, -1, :]
    loss = torch.nn.functional.cross_entropy(logits, target_batch)
    return loss

def calc_loss_loader(data_loader, model, device, num_batches=None):
    total_loss = 0
    if len(data_loader) == 0:
        return float("nan")
    elif num_batches is None:
        num_batches = len(data_loader)
    else:
        num_batches = min(num_batches, len(data_loader))

    for i, (input_batch, target_batch) in enumerate(data_loader):
        if i > num_batches:
            break
        loss = calc_loss_batch(input_batch, target_batch, model, device)
        total_loss += loss.item() #item()取出数值,不包含梯度信息

    return total_loss / num_batches

4、微调模型

接下来就是训练模型了,基本上和之前类似,唯一的区别就是要计算分类准确率,而不是生成文本样本来评估模型。

def evaluate_model(model, train_loader, val_loader, device, eval_iter):
    model.eval()
    with torch.no_grad():
        train_loss = calc_loss_loader(train_loader, model, device, num_batches=eval_iter)
        val_loss = calc_loss_loader(val_loader, model, device, num_batches=eval_iter)

    model.train()
    return train_loss, val_loss

#训练样本
def train_classifier_simple(model, train_loader, val_loader, optimizer, device, epochs, eval_freq, eval_iter):
    train_losses, val_losses, train_accs, val_accs = [], [], [], []
    examples_seen, global_step = 0, -1

    for epoch in range(epochs):
        model.train()
        for input_batch, target_batch in train_loader:
            optimizer.zero_grad()
            loss = calc_loss_batch(input_batch, target_batch, model, device)
            loss.backward()
            optimizer.step()
            examples_seen += input_batch.shape[0] #统计样本数量
            global_step += 1

            #评估模型
            if global_step % eval_freq == 0:
                train_loss, val_loss = evaluate_model(model, train_loader, val_loader, device, eval_iter)
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                print(f"Ep {epoch + 1} (Step{global_step:06d}: Train loss: {train_loss:.3f},Val loss:{val_loss:.3f}")

        #每轮训练后计算准确率
        train_accuracy = calc_accuracy_loader(train_loader, model, device, num_batches=eval_iter)
        val_accuracy = calc_accuracy_loader(val_loader, model, device, num_batches=eval_iter)
        print(f"Training accuracy: {train_accuracy}, Validation accuracy: {val_accuracy}")

    return train_losses, val_losses, train_accs, val_accs, examples_seen

#开始训练
def train():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    torch.manual_seed(123)
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.1)
    epochs = 10
    train_losses, val_losses, train_accs, val_accs, examples_seen = train_classifier_simple(
        model, train_loader, val_loader, optimizer, device, epochs, eval_freq=50, eval_iter=5
    )

输入一段文本,查看模型效果

#对新文本分类测试
def classify_review(text, model, tokenizer, device, max_length=None, pad_token_id=50256):
    model.eval()
    #准备数据
    input_ids = tokenizer.encode(text)
    suppoerted_context_length = model.pos_emb.weight.shape[1]
    input_ids = input_ids[:min(max_length, suppoerted_context_length)] #截断过长文本
    input_ids += [pad_token_id] * (max_length - len(input_ids))  #填充至最长序列
    input_tensor= torch.tensor(input_ids, device=device).unsqueeze(0)
    #推理
    with torch.no_grad():
        logits = model(input_tensor)[:, -1, :]
    predict = torch.argmax(logits, dim=-1).item()

    return "spam" if predict == 1 else "not spam"


text = "You are a winner you have been specially selected to receive $1000 cash"
print(classify_review(text, model, tokenizer, device, max_length=train_dataset.max_length))
#spam

保存模型权重和加载权重

#保存权重
torch.save(model.state_dict(), "review_classifier.pth")
# #加载权重
model_state_dict = torch.load("review_classifier.pth", map_location=device)
model.load_state_dict(model_state_dict)

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐