如何对任意LLM在GLUE Benchmark上进行LoRA微调
本文探讨了如何修改run_glue.py代码以支持Qwen2.5-14B模型在GLUE基准测试上进行LoRA微调。关键修改包括:1)在模型加载后通过peft工具包注入LoRA配置,指定目标模块为"q_proj"和"v_proj";2)提供完整的运行脚本示例,包括8GPU并行训练设置和关键超参数配置;3)展示完整的代码实现,涵盖数据处理、模型训练和评估流程。该
·
原LoRA代码中的NLU任务在GLUE benchmark上进行了微调,仅支持Robert和Debert两种模型,为了扩展对比在其他不同模型上的性能,本文主要探索如何修改run_glue.py代码实现对Qwen2.5-14B 在GLUE上LoRA所需的修改:
核心代码,在对模型加载完毕后使用peft对模型进行包裹:
# === 1. 导入 ===
from peft import LoraConfig, get_peft_model
# === 2. 加载模型 ===
model = AutoModelForSequenceClassification.from_pretrained(
model_args.model_name_or_path,
...
)
# === 3. 注入 LoRA ===
if model_args.apply_lora:
peft_config = LoraConfig(
r=model_args.lora_r,
lora_alpha=model_args.lora_alpha,
target_modules=["q_proj", "v_proj"],
lora_dropout=0.1,
bias="none",
task_type="SEQ_CLS",
)
model = get_peft_model(model, peft_config)
model.print_trainable_parameters()
运行脚本:
export num_gpus=8
export CUBLAS_WORKSPACE_CONFIG=":16:8" # https://docs.nvidia.com/cuda/cublas/index.html#cublasApi_reproducibility
export PYTHONHASHSEED=0
export output_dir="./mnli"
export HF_ENDPOINT=https://hf-mirror.com
torchrun --nproc_per_node=$num_gpus \
examples/text-classification/run_glue.py \
--model_name_or_path Qwen/Qwen2.5-14B \
--task_name mnli \
--do_train \
--do_eval \
--max_seq_length 128 \
--per_device_train_batch_size 1 \
--learning_rate 5e-4 \
--num_train_epochs 5 \
--output_dir $output_dir/model \
--logging_steps 1 \
--logging_dir $output_dir/log \
--eval_strategy epoch \
--save_strategy epoch \
--warmup_ratio 0.06 \
--apply_lora \
--lora_r 8 \
--lora_alpha 16 \
--seed 0 \
--weight_decay 0.1
完整的run_glue.py文件代码:
from peft import LoraConfig, get_peft_model
import logging
import os
import random
import sys
import torch
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
from datasets import load_dataset
import evaluate
import transformers
from transformers import (
AutoConfig,
AutoModelForSequenceClassification,
AutoTokenizer,
DataCollatorWithPadding,
EvalPrediction,
HfArgumentParser,
PretrainedConfig,
Trainer,
TrainingArguments,
default_data_collator,
set_seed,
)
from transformers.trainer_utils import get_last_checkpoint, is_main_process
from transformers.utils import check_min_version
# Will error if the minimal version of Transformers is not installed. Remove at your own risks.
check_min_version("4.4.0")
task_to_keys = {
"cola": ("sentence", None),
"mnli": ("premise", "hypothesis"),
"mrpc": ("sentence1", "sentence2"),
"qnli": ("question", "sentence"),
"qqp": ("question1", "question2"),
"rte": ("sentence1", "sentence2"),
"sst2": ("sentence", None),
"stsb": ("sentence1", "sentence2"),
"wnli": ("sentence1", "sentence2"),
}
logger = logging.getLogger(__name__)
@dataclass
class DataTrainingArguments:
"""
Arguments pertaining to what data we are going to input our model for training and eval.
Using `HfArgumentParser` we can turn this class
into argparse arguments to be able to specify them on
the command line.
"""
task_name: Optional[str] = field(
default=None,
metadata={"help": "The name of the task to train on: " + ", ".join(task_to_keys.keys())},
)
max_seq_length: int = field(
default=128,
metadata={
"help": "The maximum total input sequence length after tokenization. Sequences longer "
"than this will be truncated, sequences shorter will be padded."
},
)
overwrite_cache: bool = field(
default=False, metadata={"help": "Overwrite the cached preprocessed datasets or not."}
)
pad_to_max_length: bool = field(
default=True,
metadata={
"help": "Whether to pad all samples to `max_seq_length`. "
"If False, will pad the samples dynamically when batching to the maximum length in the batch."
},
)
max_train_samples: Optional[int] = field(
default=None,
metadata={
"help": "For debugging purposes or quicker training, truncate the number of training examples to this "
"value if set."
},
)
max_val_samples: Optional[int] = field(
default=None,
metadata={
"help": "For debugging purposes or quicker training, truncate the number of validation examples to this "
"value if set."
},
)
max_test_samples: Optional[int] = field(
default=None,
metadata={
"help": "For debugging purposes or quicker training, truncate the number of test examples to this "
"value if set."
},
)
train_file: Optional[str] = field(
default=None, metadata={"help": "A csv or a json file containing the training data."}
)
validation_file: Optional[str] = field(
default=None, metadata={"help": "A csv or a json file containing the validation data."}
)
test_file: Optional[str] = field(default=None, metadata={"help": "A csv or a json file containing the test data."})
def __post_init__(self):
if self.task_name is not None:
self.task_name = self.task_name.lower()
if self.task_name not in task_to_keys.keys():
raise ValueError("Unknown task, you should pick one in " + ",".join(task_to_keys.keys()))
elif self.train_file is None or self.validation_file is None:
raise ValueError("Need either a GLUE task or a training/validation file.")
else:
train_extension = self.train_file.split(".")[-1]
assert train_extension in ["csv", "json"], "`train_file` should be a csv or a json file."
validation_extension = self.validation_file.split(".")[-1]
assert (
validation_extension == train_extension
), "`validation_file` should have the same extension (csv or json) as `train_file`."
@dataclass
class ModelArguments:
"""
Arguments pertaining to which model/config/tokenizer we are going to fine-tune from.
"""
model_name_or_path: str = field(
metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"}
)
config_name: Optional[str] = field(
default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"}
)
tokenizer_name: Optional[str] = field(
default=None, metadata={"help": "Pretrained tokenizer name or path if not the same as model_name"}
)
cache_dir: Optional[str] = field(
default=None,
metadata={"help": "Where do you want to store the pretrained models downloaded from huggingface.co"},
)
use_fast_tokenizer: bool = field(
default=True,
metadata={"help": "Whether to use one of the fast tokenizer (backed by the tokenizers library) or not."},
)
model_revision: str = field(
default="main",
metadata={"help": "The specific model version to use (can be a branch name, tag name or commit id)."},
)
use_auth_token: bool = field(
default=False,
metadata={
"help": "Will use the token generated when running `transformers-cli login` (necessary to use this script "
"with private models)."
},
)
apply_lora: Optional[bool] = field(
default=False,
metadata={"help": "Whether to apply LoRA or not."},
)
lora_alpha: Optional[int] = field(
default=None,
metadata={"help": "LoRA alpha"},
)
lora_r: Optional[int] = field(
default=None,
metadata={"help": "LoRA r"},
)
lora_path: Optional[str] = field(
default=None,
metadata={"help": "The file path of LoRA parameters."},
)
apply_adapter: Optional[bool] = field(
default=False,
metadata={"help": "Whether to apply adapter or not."},
)
adapter_path: Optional[str] = field(
default=None,
metadata={"help": "The file path of adapter parameters."},
)
adapter_type: Optional[str] = field(
default='houlsby',
metadata={"help": "houlsby or pfeiffer"},
)
adapter_size: Optional[int] = field(
default=64,
metadata={"help": "8, 16, 32, 64"},
)
apply_bitfit: Optional[bool] = field(
default=False,
metadata={"help": "Whether to apply bitfit or not."},
)
reg_loss_wgt: Optional[float] = field(
default=0.0,
metadata={"help": "Regularization Loss Weight"},
)
masking_prob: Optional[float] = field(
default=0.0,
metadata={"help": "Token Masking Probability"},
)
def main():
# See all possible arguments in src/transformers/training_args.py
# or by passing the --help flag to this script.
# We now keep distinct sets of args, for a cleaner separation of concerns.
parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
if len(sys.argv) == 2 and sys.argv[1].endswith(".json"):
# If we pass only one argument to the script and it's the path to a json file,
# let's parse it to get our arguments.
model_args, data_args, training_args = parser.parse_json_file(json_file=os.path.abspath(sys.argv[1]))
else:
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
# torch.use_deterministic_algorithms(training_args.use_deterministic_algorithms)
# logger.info("use_deterministic_algorithms: " + str(torch.are_deterministic_algorithms_enabled()))
# Detecting last checkpoint.
last_checkpoint = None
if os.path.isdir(training_args.output_dir) and training_args.do_train and not training_args.overwrite_output_dir:
last_checkpoint = get_last_checkpoint(training_args.output_dir)
if last_checkpoint is None and len(os.listdir(training_args.output_dir)) > 0:
raise ValueError(
f"Output directory ({training_args.output_dir}) already exists and is not empty. "
"Use --overwrite_output_dir to overcome."
)
elif last_checkpoint is not None:
logger.info(
f"Checkpoint detected, resuming training at {last_checkpoint}. To avoid this behavior, change "
"the `--output_dir` or add `--overwrite_output_dir` to train from scratch."
)
# Setup logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger.setLevel(logging.INFO if is_main_process(training_args.local_rank) else logging.WARN)
# Log on each process the small summary:
logger.warning(
f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}"
+ f"distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}"
)
# Set the verbosity to info of the Transformers logger (on main process only):
if is_main_process(training_args.local_rank):
transformers.utils.logging.set_verbosity_info()
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()
logger.info(f"Training/evaluation parameters {training_args}")
# Set seed before initializing model.
set_seed(training_args.seed)
if data_args.task_name is not None:
# Downloading and loading a dataset from the hub.
datasets = load_dataset("glue", data_args.task_name)
else:
# Loading a dataset from your local files.
# CSV/JSON training and evaluation files are needed.
data_files = {"train": data_args.train_file, "validation": data_args.validation_file}
# Get the test dataset: you can provide your own CSV/JSON test file (see below)
# when you use `do_predict` without specifying a GLUE benchmark task.
if training_args.do_predict:
if data_args.test_file is not None:
train_extension = data_args.train_file.split(".")[-1]
test_extension = data_args.test_file.split(".")[-1]
assert (
test_extension == train_extension
), "`test_file` should have the same extension (csv or json) as `train_file`."
data_files["test"] = data_args.test_file
else:
raise ValueError("Need either a GLUE task or a test file for `do_predict`.")
for key in data_files.keys():
logger.info(f"load a local file for {key}: {data_files[key]}")
if data_args.train_file.endswith(".csv"):
# Loading a dataset from local csv files
datasets = load_dataset("csv", data_files=data_files)
else:
# Loading a dataset from local json files
datasets = load_dataset("json", data_files=data_files)
# See more about loading any type of standard or custom dataset at
# https://huggingface.co/docs/datasets/loading_datasets.html.
# Labels
if data_args.task_name is not None:
is_regression = data_args.task_name == "stsb"
if not is_regression:
label_list = datasets["train"].features["label"].names
num_labels = len(label_list)
else:
num_labels = 1
else:
# Trying to have good defaults here, don't hesitate to tweak to your needs.
is_regression = datasets["train"].features["label"].dtype in ["float32", "float64"]
if is_regression:
num_labels = 1
else:
# A useful fast method:
# https://huggingface.co/docs/datasets/package_reference/main_classes.html#datasets.Dataset.unique
label_list = datasets["train"].unique("label")
label_list.sort() # Let's sort it for determinism
num_labels = len(label_list)
# Load pretrained model and tokenizer
#
# In distributed training, the .from_pretrained methods guarantee that only one local process can concurrently
# download model & vocab.
config = AutoConfig.from_pretrained(
model_args.config_name if model_args.config_name else model_args.model_name_or_path,
num_labels=num_labels,
finetuning_task=data_args.task_name,
cache_dir=model_args.cache_dir,
revision=model_args.model_revision,
use_auth_token=True if model_args.use_auth_token else None,
#cls_dropout=training_args.cls_dropout,
apply_lora=model_args.apply_lora,
lora_alpha=model_args.lora_alpha,
lora_r=model_args.lora_r,
apply_adapter=model_args.apply_adapter,
adapter_type=model_args.adapter_type,
adapter_size=model_args.adapter_size,
reg_loss_wgt=model_args.reg_loss_wgt,
masking_prob=model_args.masking_prob,
)
tokenizer = AutoTokenizer.from_pretrained(
model_args.tokenizer_name if model_args.tokenizer_name else model_args.model_name_or_path,
cache_dir=model_args.cache_dir,
use_fast=model_args.use_fast_tokenizer,
revision=model_args.model_revision,
use_auth_token=True if model_args.use_auth_token else None,
)
model = AutoModelForSequenceClassification.from_pretrained(
model_args.model_name_or_path,
from_tf=bool(".ckpt" in model_args.model_name_or_path),
config=config,
cache_dir=model_args.cache_dir,
revision=model_args.model_revision,
use_auth_token=True if model_args.use_auth_token else None,
)
if model_args.apply_lora:
# === 定义 LoRA 配置 ===
peft_config = LoraConfig(
r=model_args.lora_r,
lora_alpha=model_args.lora_alpha,
target_modules=["q_proj", "v_proj"], # Qwen结构里推荐这两个,或者加上 "o_proj"
lora_dropout=0.1,
bias="none",
task_type="SEQ_CLS", # 序列分类
)
# === 用 peft 包装你的模型 ===
model = get_peft_model(model, peft_config)
# === 选配:打印可训练参数,确认成功 ===
model.print_trainable_parameters()
trainable_params = []
if model_args.apply_lora:
if model_args.lora_path is not None:
lora_state_dict = torch.load(model_args.lora_path)
logger.info(f"Apply LoRA state dict from {model_args.lora_path}.")
logger.info(lora_state_dict.keys())
model.load_state_dict(lora_state_dict, strict=False)
trainable_params.append('lora')
if model_args.apply_adapter:
if model_args.adapter_path is not None:
adapter_state_dict = torch.load(os.path.join(model_args.adapter_path, 'pytorch_adapter.bin'))
head_state_dict = torch.load(os.path.join(model_args.adapter_path, 'pytorch_model_head.bin'))
added_state_dict = {}
for k, v in adapter_state_dict.items():
new_k = k.replace(data_args.task_name + '.', '').replace('adapter_down.0.', 'adapter_A.').replace('adapter_up.', 'adapter_B.').replace('.adapters.', '.adapter.')
added_state_dict[new_k] = v
for k, v in head_state_dict.items():
new_k = k.replace('heads.' + data_args.task_name + '.1', 'classifier.dense').replace('heads.' + data_args.task_name + '.4', 'classifier.out_proj')
added_state_dict[new_k] = v
logger.info(f"Apply adapter state dict from {model_args.adapter_path}.")
logger.info(added_state_dict.keys())
missing_keys, unexpected_keys = model.load_state_dict(added_state_dict, strict=False)
for missing_key in missing_keys:
assert 'adapter' not in missing_key, missing_key + ' is missed in the model'
assert len(unexpected_keys) == 0, 'Unexpected keys ' + str(unexpected_keys)
trainable_params.append('adapter')
if model_args.apply_bitfit:
trainable_params.append('bias')
# Preprocessing the datasets
if data_args.task_name is not None:
sentence1_key, sentence2_key = task_to_keys[data_args.task_name]
else:
# Again, we try to have some nice defaults but don't hesitate to tweak to your use case.
non_label_column_names = [name for name in datasets["train"].column_names if name != "label"]
if "sentence1" in non_label_column_names and "sentence2" in non_label_column_names:
sentence1_key, sentence2_key = "sentence1", "sentence2"
else:
if len(non_label_column_names) >= 2:
sentence1_key, sentence2_key = non_label_column_names[:2]
else:
sentence1_key, sentence2_key = non_label_column_names[0], None
# Padding strategy
if data_args.pad_to_max_length:
padding = "max_length"
else:
# We will pad later, dynamically at batch creation, to the max sequence length in each batch
padding = False
# Some models have set the order of the labels to use, so let's make sure we do use it.
label_to_id = None
if (
model.config.label2id != PretrainedConfig(num_labels=num_labels).label2id
and data_args.task_name is not None
and not is_regression
):
# Some have all caps in their config, some don't.
label_name_to_id = {k.lower(): v for k, v in model.config.label2id.items()}
if list(sorted(label_name_to_id.keys())) == list(sorted(label_list)):
label_to_id = {i: int(label_name_to_id[label_list[i]]) for i in range(num_labels)}
else:
logger.warn(
"Your model seems to have been trained with labels, but they don't match the dataset: ",
f"model labels: {list(sorted(label_name_to_id.keys()))}, dataset labels: {list(sorted(label_list))}."
"\nIgnoring the model labels as a result.",
)
elif data_args.task_name is None and not is_regression:
label_to_id = {v: i for i, v in enumerate(label_list)}
if data_args.max_seq_length > tokenizer.model_max_length:
logger.warn(
f"The max_seq_length passed ({data_args.max_seq_length}) is larger than the maximum length for the"
f"model ({tokenizer.model_max_length}). Using max_seq_length={tokenizer.model_max_length}."
)
max_seq_length = min(data_args.max_seq_length, tokenizer.model_max_length)
def preprocess_function(examples):
# Tokenize the texts
args = (
(examples[sentence1_key],) if sentence2_key is None else (examples[sentence1_key], examples[sentence2_key])
)
result = tokenizer(*args, padding=padding, max_length=max_seq_length, truncation=True)
# Map labels to IDs (not necessary for GLUE tasks)
if label_to_id is not None and "label" in examples:
result["label"] = [(label_to_id[l] if l != -1 else -1) for l in examples["label"]]
return result
datasets = datasets.map(preprocess_function, batched=True, load_from_cache_file=not data_args.overwrite_cache)
if training_args.do_train:
if "train" not in datasets:
raise ValueError("--do_train requires a train dataset")
train_dataset = datasets["train"]
if data_args.max_train_samples is not None:
train_dataset = train_dataset.select(range(data_args.max_train_samples))
if training_args.do_eval:
if "validation" not in datasets and "validation_matched" not in datasets:
raise ValueError("--do_eval requires a validation dataset")
eval_dataset = datasets["validation_matched" if data_args.task_name == "mnli" else "validation"]
if data_args.max_val_samples is not None:
eval_dataset = eval_dataset.select(range(data_args.max_val_samples))
if training_args.do_predict or data_args.task_name is not None or data_args.test_file is not None:
if "test" not in datasets and "test_matched" not in datasets:
raise ValueError("--do_predict requires a test dataset")
test_dataset = datasets["test_matched" if data_args.task_name == "mnli" else "test"]
if data_args.max_test_samples is not None:
test_dataset = test_dataset.select(range(data_args.max_test_samples))
# Log a few random samples from the training set:
if training_args.do_train:
for index in random.sample(range(len(train_dataset)), 3):
logger.info(f"Sample {index} of the training set: {train_dataset[index]}.")
# Get the metric function
if data_args.task_name is not None:
#metric = load_metric("glue", data_args.task_name)
metric = evaluate.load("glue", data_args.task_name,
cache_dir=model_args.cache_dir)
# TODO: When datasets metrics include regular accuracy, make an else here and remove special branch from
# compute_metrics
# You can define your custom compute_metrics function. It takes an `EvalPrediction` object (a namedtuple with a
# predictions and label_ids field) and has to return a dictionary string to float.
def compute_metrics(p: EvalPrediction):
preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)
if data_args.task_name is not None:
result = metric.compute(predictions=preds, references=p.label_ids)
if len(result) > 1:
result["combined_score"] = np.mean(list(result.values())).item()
return result
elif is_regression:
return {"mse": ((preds - p.label_ids) ** 2).mean().item()}
else:
return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}
# Data collator will default to DataCollatorWithPadding, so we change it if we already did the padding.
if data_args.pad_to_max_length:
data_collator = default_data_collator
elif training_args.fp16:
data_collator = DataCollatorWithPadding(tokenizer, pad_to_multiple_of=8)
else:
data_collator = None
# Initialize our Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset if training_args.do_train else None,
eval_dataset=eval_dataset if training_args.do_eval else None,
compute_metrics=compute_metrics,
tokenizer=tokenizer,
data_collator=data_collator,
)
# Training
if training_args.do_train:
checkpoint = None
if last_checkpoint is not None:
checkpoint = last_checkpoint
elif os.path.isdir(model_args.model_name_or_path):
# Check the config from that potential checkpoint has the right number of labels before using it as a
# checkpoint.
if AutoConfig.from_pretrained(model_args.model_name_or_path).num_labels == num_labels:
checkpoint = model_args.model_name_or_path
train_result = trainer.train(resume_from_checkpoint=checkpoint)
metrics = train_result.metrics
max_train_samples = (
data_args.max_train_samples if data_args.max_train_samples is not None else len(train_dataset)
)
metrics["train_samples"] = min(max_train_samples, len(train_dataset))
trainer.save_model() # Saves the tokenizer too for easy upload
trainer.log_metrics("train", metrics)
trainer.save_metrics("train", metrics)
trainer.save_state()
# Evaluation
if training_args.do_eval:
logger.info("*** Evaluate ***")
# Loop to handle MNLI double evaluation (matched, mis-matched)
tasks = [data_args.task_name]
eval_datasets = [eval_dataset]
if data_args.task_name == "mnli":
tasks.append("mnli-mm")
eval_datasets.append(datasets["validation_mismatched"])
for eval_dataset, task in zip(eval_datasets, tasks):
metrics = trainer.evaluate(eval_dataset=eval_dataset)
max_val_samples = data_args.max_val_samples if data_args.max_val_samples is not None else len(eval_dataset)
metrics["eval_samples"] = min(max_val_samples, len(eval_dataset))
trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)
if training_args.do_predict:
logger.info("*** Test ***")
# Loop to handle MNLI double evaluation (matched, mis-matched)
tasks = [data_args.task_name]
test_datasets = [test_dataset]
if data_args.task_name == "mnli":
tasks.append("mnli-mm")
test_datasets.append(datasets["test_mismatched"])
for test_dataset, task in zip(test_datasets, tasks):
# Removing the `label` columns because it contains -1 and Trainer won't like that.
test_dataset.remove_columns_("label")
predictions = trainer.predict(test_dataset=test_dataset).predictions
predictions = np.squeeze(predictions) if is_regression else np.argmax(predictions, axis=1)
output_test_file = os.path.join(training_args.output_dir, f"test_results_{task}.txt")
if trainer.is_world_process_zero():
with open(output_test_file, "w") as writer:
logger.info(f"***** Test results {task} *****")
writer.write("index\tprediction\n")
for index, item in enumerate(predictions):
if is_regression:
writer.write(f"{index}\t{item:3.3f}\n")
else:
item = label_list[item]
writer.write(f"{index}\t{item}\n")
def _mp_fn(index):
# For xla_spawn (TPUs)
main()
if __name__ == "__main__":
main()
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)