【LLMs篇】18:基于EasyR1的Qwen2.5-VL GRPO训练
本文档详细描述了使用EasyR1框架在Geometry3K数据集上运行Qwen2.5-VL GRPO训练的完整流程
·

概述
本文档详细描述了使用 EasyR1 框架在 Geometry3K 数据集上运行 Qwen2.5-VL-7B-Instruct 模型进行 GRPO (Group Robust Policy Optimization) 训练的完整流程,包括算法原理、数据处理、模型集成和训练过程。
1. 训练启动脚本分析
1.1 脚本文件:examples/qwen2_5_vl_7b_geo3k_grpo.sh
#!/bin/bash
set -x
export PYTHONUNBUFFERED=1
MODEL_PATH=/home/ubuntu/.cache/huggingface/hub/models--Qwen--Qwen2.5-VL-7B-Instruct/snapshots/cc594898137f460bfe9f0759e9844b3ce807cfb5
python3 -m verl.trainer.main \
config=examples/config.yaml \
data.train_files=hiyouga/geometry3k@train \
data.val_files=hiyouga/geometry3k@test \
worker.actor.model.model_path=${MODEL_PATH} \
trainer.experiment_name=qwen2_5_vl_7b_geo_grpo \
trainer.n_gpus_per_node=4
1.2 关键参数说明
- 模型路径: 指定本地 Qwen2.5-VL-7B-Instruct 模型位置
- 数据集: 使用 HuggingFace 的
hiyouga/geometry3k数据集 - GPU配置: 使用 4 GPUs per node
- 实验名称:
qwen2_5_vl_7b_geo_grpo
2. 配置文件详细解析
2.1 数据配置 (config.yaml)
data:
train_files: hiyouga/math12k@train # 被脚本覆盖为geometry3k
val_files: hiyouga/math12k@test # 被脚本覆盖为geometry3k
prompt_key: problem # 几何问题文本
answer_key: answer # 标准答案
image_key: images # 图像数据列表
video_key: videos # 视频数据列表
max_prompt_length: 2048 # 最大prompt长度
max_response_length: 2048 # 最大回复长度
rollout_batch_size: 512 # 数据生成批大小
val_batch_size: 1024 # 验证批大小
format_prompt: ./examples/format_prompt/math.jinja # Prompt模板
min_pixels: 262144 # 图像最小像素数
max_pixels: 4194304 # 图像最大像素数
filter_overlong_prompts: true # 过滤超长prompt
2.2 算法配置
algorithm:
adv_estimator: grpo # 使用GRPO算法
disable_kl: false # 启用KL散度
use_kl_loss: true # 使用KL损失
kl_penalty: low_var_kl # KL惩罚类型
kl_coef: 1.0e-2 # KL系数
online_filtering: false # 在线过滤
filter_key: overall # 过滤指标
2.3 Worker配置
Actor (策略网络)
worker.actor:
global_batch_size: 128 # 全局批大小
micro_batch_size_per_device_for_update: 1 # 更新时每设备微批大小
micro_batch_size_per_device_for_experience: 2 # 体验生成时每设备微批大小
max_grad_norm: 1.0 # 梯度裁剪
padding_free: true # 无填充训练
dynamic_batching: true # 动态批处理
model:
model_path: Qwen/Qwen2.5-7B-Instruct # 模型路径
enable_gradient_checkpointing: true # 梯度检查点
freeze_vision_tower: false # 不冻结视觉塔
optim:
lr: 1.0e-6 # 学习率
weight_decay: 1.0e-2 # 权重衰减
strategy: adamw # 优化器策略
fsdp:
enable_full_shard: true # 启用完全分片
enable_cpu_offload: false # CPU卸载
Rollout (推理生成)
worker.rollout:
n: 5 # 每个prompt生成5个回复
temperature: 1.0 # 采样温度
top_p: 1.0 # Top-p采样
gpu_memory_utilization: 0.6 # GPU内存利用率
tensor_parallel_size: 2 # 张量并行大小
val_override_config:
temperature: 0.6 # 验证时温度
top_p: 0.95 # 验证时top-p
n: 1 # 验证时每个prompt生成1个回复
Reward (奖励函数)
worker.reward:
reward_type: batch # 批量奖励
reward_function: ./examples/reward_function/math.py:compute_score # 奖励函数
2.4 训练器配置
trainer:
total_epochs: 15 # 总训练轮数
project_name: easy_r1 # 项目名称
experiment_name: qwen2_5_7b_math_grpo # 实验名称
nnodes: 1 # 节点数
n_gpus_per_node: 8 # 每个节点GPU数
val_freq: 5 # 验证频率
val_before_train: true # 训练前验证
save_freq: 5 # 保存频率
save_limit: 3 # 保存限制
3. GRPO算法原理与实现
3.1 GRPO核心思想
Group Robust Policy Optimization (GRPO) 是一种针对强化学习中奖励分布不均匀问题的优化算法。其核心思想是:
- 组内标准化: 对于同一个prompt生成的多个回复,在组内进行标准化
- 相对比较: 重点关注同一prompt下不同回复的相对质量
- 鲁棒性: 减少不同prompt间奖励分布差异的影响
3.2 算法实现 (verl/trainer/core_algos.py:171-212)
def compute_grpo_outcome_advantage(
token_level_rewards: torch.Tensor, # token级奖励
response_mask: torch.Tensor, # 回复掩码
index: torch.Tensor, # 数据索引(用于分组)
eps: float = 1e-6 # 数值稳定性参数
) -> torch.Tensor:
"""
GRPO优势计算函数
算法步骤:
1. 计算每个回复的总奖励分数
2. 根据index将回复分组
3. 计算每组的均值和标准差
4. 进行组内标准化: (score - group_mean) / (group_std + eps)
5. 将标准化结果广播到所有token位置
"""
# 计算每个回复的总分数
scores = token_level_rewards.sum(dim=-1) # [batch_size]
# 按index分组并计算统计量
id2score = {}
for i, idx in enumerate(index):
idx = idx.item()
if idx not in id2score:
id2score[idx] = []
id2score[idx].append(scores[i].item())
# 计算每组的均值和标准差
id2mean, id2std = {}, {}
for idx in id2score:
id2mean[idx] = torch.mean(torch.tensor(id2score[idx], device=scores.device))
id2std[idx] = torch.std(torch.tensor(id2score[idx], device=scores.device))
# 标准化并广播
advantages = torch.zeros_like(token_level_rewards)
for i in range(len(index)):
idx = index[i].item()
normalized_score = (scores[i] - id2mean[idx]) / (id2std[idx] + eps)
# 将标准化分数广播到所有token位置
advantages[i] = normalized_score * response_mask[i]
return advantages
3.3 算法要求和特点
- 多样本要求:
rollout.n > 1(配置中设为5) - 结果导向: 只考虑最终奖励,不使用token级奖励
- 稳定性: 通过组内标准化提供更稳定的梯度信号
- 适用场景: 特别适合数学推理等有明确正确答案的任务
4. Geometry3K数据集处理流程
4.1 数据集结构
Geometry3K数据集包含几何问题及其对应的图像:
{
"problem": "几何问题描述,包含<image>标记",
"answer": "数值答案,如 48",
"images": [PIL.Image对象列表]
}
4.2 数据加载与预处理 (verl/utils/dataset.py)
4.2.1 图像处理流程
def process_image(image, processor, min_pixels=262144, max_pixels=4194304):
"""
图像预处理函数
处理步骤:
1. 支持多种输入格式:路径字符串、字典、PIL.Image对象
2. 转换为RGB格式
3. 基于像素数限制调整图像尺寸
4. 返回处理后的PIL.Image对象
"""
if isinstance(image, str):
# 从路径加载图像
image = Image.open(image).convert("RGB")
elif isinstance(image, dict):
# 处理字典格式
if "bytes" in image:
image = Image.open(io.BytesIO(image["bytes"])).convert("RGB")
elif "path" in image:
image = Image.open(image["path"]).convert("RGB")
# 调整图像尺寸以符合像素限制
width, height = image.size
current_pixels = width * height
if current_pixels > max_pixels:
# 缩小图像
scale_factor = (max_pixels / current_pixels) ** 0.5
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
image = image.resize((new_width, new_height), Image.LANCZOS)
elif current_pixels < min_pixels:
# 放大图像
scale_factor = (min_pixels / current_pixels) ** 0.5
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
image = image.resize((new_width, new_height), Image.LANCZOS)
return image
4.2.2 多模态消息构建
def _build_messages(self, data_point):
"""
构建多模态消息
处理流程:
1. 解析问题文本中的<image>标记
2. 提取对应的图像数据
3. 构建content_list,交替包含文本和图像
4. 生成符合HuggingFace格式的消息列表
"""
problem = data_point[self.prompt_key]
images = data_point.get(self.image_key, [])
# 解析<image>标记并构建content列表
content_list = []
parts = problem.split("<image>")
for i, part in enumerate(parts):
if part.strip():
content_list.append({"type": "text", "text": part.strip()})
# 在每个文本部分后添加对应的图像(如果存在)
if i < len(images):
processed_image = self.process_image(images[i])
content_list.append({"type": "image", "image": processed_image})
messages = [{"role": "user", "content": content_list}]
return messages, [img for img in images if img is not None]
4.3 Prompt格式化 (examples/format_prompt/math.jinja)
{{ content | trim }} You FIRST think about the reasoning process as an internal monologue and then provide the final answer. The reasoning process MUST BE enclosed within <think> </think> tags. The final answer MUST BE put in \boxed{}.
这个模板要求模型:
- 思考过程: 在
<think>标签内进行推理 - 最终答案: 在
\boxed{}内给出答案
4.4 数据转换流程
原始Geometry3K数据
↓
{problem: "在三角形ABC中...<image>...求面积", answer: "48", images: [PIL.Image]}
↓
应用math.jinja模板
↓
"在三角形ABC中...<image>...求面积 You FIRST think about the reasoning process..."
↓
构建多模态消息
↓
[{"role": "user", "content": [
{"type": "text", "text": "在三角形ABC中..."},
{"type": "image", "image": <PIL.Image>},
{"type": "text", "text": "...求面积 You FIRST think..."}
]}]
↓
Qwen2VL处理器处理
↓
{
"input_ids": tensor([...]),
"attention_mask": tensor([...]),
"position_ids": tensor(3, seq_len), # 3D位置编码
"pixel_values": tensor([...]), # 图像像素值
"image_grid_thw": tensor([...]) # 图像网格信息
}
5. Qwen2.5-VL模型集成详解
5.1 模型加载 (verl/workers/fsdp_workers.py)
def load_model():
"""
加载Qwen2.5-VL多模态模型
"""
model = AutoModelForImageTextToText.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
attn_implementation="flash_attention_2", # 使用Flash Attention
trust_remote_code=False,
device_map=None # 由FSDP管理设备分配
)
# 启用梯度检查点以节省内存
if enable_gradient_checkpointing:
model.gradient_checkpointing_enable()
# 视觉塔冻结选项
if freeze_vision_tower:
if hasattr(model, "model") and hasattr(model.model, "visual"):
model.model.visual.requires_grad_(False)
elif hasattr(model, "visual"):
model.visual.requires_grad_(False)
return model
5.2 多模态位置编码 (verl/models/transformers/qwen2_vl.py)
Qwen2.5-VL使用特殊的3D旋转位置编码(MRoPE)处理多模态输入:
def get_rope_index(input_ids, image_grid_thw, video_grid_thw, attention_mask):
"""
生成多模态旋转位置编码索引
返回形状:[3, seq_len]
- 第0维:时间维度位置
- 第1维:高度维度位置
- 第2维:宽度维度位置
"""
position_ids = torch.zeros(3, seq_len, dtype=torch.long, device=device)
# 文本token使用标准位置编码
text_positions = attention_mask.cumsum(dim=-1) - 1
position_ids[0] = text_positions # 时间维度
position_ids[1] = text_positions # 高度维度
position_ids[2] = text_positions # 宽度维度
# 图像token使用2D位置编码
for image_info in image_grid_thw:
t, h, w = image_info
for i in range(h):
for j in range(w):
pos = get_image_token_position(i, j)
position_ids[0, pos] = 0 # 时间=0
position_ids[1, pos] = i # 高度位置
position_ids[2, pos] = j # 宽度位置
return position_ids
5.3 FSDP集成和内存优化
# FSDP配置
fsdp_config = FSDPConfig(
enable_full_shard=True, # 完全分片
enable_cpu_offload=False, # CPU卸载
enable_rank0_init=True, # rank0初始化
mixed_precision=torch.bfloat16 # 混合精度
)
# 参数和优化器卸载
offload_config = OffloadConfig(
offload_params=True, # 参数卸载到CPU
offload_optimizer=True # 优化器状态卸载到CPU
)
# 模型包装
model = FSDP(
model,
fsdp_config=fsdp_config,
offload_config=offload_config
)
5.4 vLLM推理集成 (verl/workers/rollout/vllm_rollout_spmd.py)
def setup_vllm_engine():
"""
配置vLLM推理引擎以支持多模态推理
"""
engine_args = EngineArgs(
model=model_path,
tensor_parallel_size=2, # 张量并行
gpu_memory_utilization=0.6, # GPU内存利用率
enforce_eager=False, # 启用CUDA图优化
enable_chunked_prefill=False, # 分块预填充
limit_mm_per_prompt={"image": 10}, # 限制每个prompt的图像数量
trust_remote_code=False
)
engine = LLMEngine.from_engine_args(engine_args)
return engine
def generate_responses(prompts, images_list, sampling_params):
"""
使用vLLM生成多模态响应
"""
# 构建多模态输入
multi_modal_inputs = []
for images in images_list:
if images:
multi_modal_inputs.append({"image": images})
else:
multi_modal_inputs.append({})
# 批量生成
outputs = engine.generate(
prompts=prompts,
sampling_params=sampling_params,
multi_modal_data=multi_modal_inputs,
use_tqdm=not disable_tqdm
)
return [output.outputs[0].text for output in outputs]
6. 奖励函数设计 (examples/reward_function/math.py)
6.1 奖励函数组成
def compute_score(reward_inputs: list[dict[str, Any]], format_weight: float = 0.1):
"""
计算综合奖励分数
奖励组成:
1. format_reward: 格式奖励(是否包含<think>标签和\boxed{}答案)
2. accuracy_reward: 准确性奖励(答案是否正确)
3. overall: 综合得分 = (1-format_weight) * accuracy + format_weight * format
"""
scores = []
for reward_input in reward_inputs:
response = reward_input["response"]
ground_truth = reward_input["ground_truth"]
# 格式奖励:检查是否有思考过程和boxed答案
format_score = format_reward(response)
# 准确性奖励:使用mathruler库验证答案
accuracy_score = accuracy_reward(response, ground_truth)
# 综合得分
overall_score = (1 - format_weight) * accuracy_score + format_weight * format_score
scores.append({
"overall": overall_score,
"format": format_score,
"accuracy": accuracy_score
})
return scores
6.2 格式检查
def format_reward(response: str) -> float:
"""
检查回复格式是否正确
要求:
1. 包含<think>...</think>思考过程
2. 包含\boxed{}最终答案
"""
pattern = re.compile(r"<think>.*</think>.*\\boxed\{.*\}.*", re.DOTALL)
format_match = re.fullmatch(pattern, response)
return 1.0 if format_match else 0.0
6.3 准确性检查
def accuracy_reward(response: str, ground_truth: str) -> float:
"""
检查答案准确性
使用mathruler库:
1. 从回复中提取\boxed{}中的答案
2. 与标准答案进行比较
"""
answer = extract_boxed_content(response)
return 1.0 if grade_answer(answer, ground_truth) else 0.0
7. 完整训练流程
7.1 训练循环概览
def training_loop():
"""
GRPO训练主循环
"""
for epoch in range(total_epochs):
for batch in dataloader:
# 1. 数据准备阶段
prompts, ground_truths, images = prepare_batch(batch)
# 2. Rollout阶段:生成多个回复
responses = generate_multiple_responses(
prompts=prompts,
images=images,
n_responses=5, # 每个prompt生成5个回复
temperature=1.0,
top_p=1.0
)
# 3. 奖励计算阶段
rewards = compute_batch_rewards(responses, ground_truths)
# 4. GRPO优势估计
advantages = compute_grpo_outcome_advantage(
token_level_rewards=rewards,
response_mask=response_masks,
index=batch_indices
)
# 5. 策略更新
actor_loss = compute_policy_loss(
log_probs=response_log_probs,
advantages=advantages,
old_log_probs=old_response_log_probs
)
# 6. KL散度惩罚
kl_loss = compute_kl_loss(
new_log_probs=response_log_probs,
old_log_probs=old_response_log_probs
)
total_loss = actor_loss + kl_coef * kl_loss
# 7. 反向传播和优化
total_loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
optimizer.step()
optimizer.zero_grad()
# 8. 验证和保存
if step % val_freq == 0:
validate_model()
if step % save_freq == 0:
save_checkpoint()
7.2 关键训练阶段详解
7.2.1 Rollout阶段
def rollout_phase(actor_model, prompts, images, n_responses=5):
"""
使用vLLM进行高效的多回复生成
"""
all_responses = []
all_log_probs = []
# 为每个prompt生成n个回复
for prompt, image_list in zip(prompts, images):
responses = vllm_engine.generate(
prompts=[prompt] * n_responses,
multi_modal_data=[{"image": image_list}] * n_responses,
sampling_params=SamplingParams(
temperature=1.0,
top_p=1.0,
max_tokens=2048
)
)
# 计算回复的log概率(用于后续策略更新)
log_probs = actor_model.compute_log_probs(prompt, responses, image_list)
all_responses.extend(responses)
all_log_probs.extend(log_probs)
return all_responses, all_log_probs
7.2.2 奖励计算阶段
def reward_phase(responses, ground_truths):
"""
批量计算奖励
"""
reward_inputs = []
for response, ground_truth in zip(responses, ground_truths):
reward_inputs.append({
"response": response,
"ground_truth": ground_truth
})
# 使用批量奖励函数
scores = compute_score(reward_inputs, format_weight=0.1)
# 转换为tensor
overall_rewards = torch.tensor([s["overall"] for s in scores])
format_rewards = torch.tensor([s["format"] for s in scores])
accuracy_rewards = torch.tensor([s["accuracy"] for s in scores])
return overall_rewards, format_rewards, accuracy_rewards
7.2.3 GRPO优势计算阶段
def grpo_advantage_phase(rewards, batch_indices):
"""
计算GRPO优势
"""
# 将奖励reshape为token级别(但值在每个position相同)
token_rewards = rewards.unsqueeze(-1).expand(-1, max_seq_len)
# 计算response mask(标识回复部分的token)
response_masks = create_response_masks(responses, tokenizer)
# 调用GRPO算法
advantages = compute_grpo_outcome_advantage(
token_level_rewards=token_rewards,
response_mask=response_masks,
index=batch_indices,
eps=1e-6
)
return advantages
7.3 分布式训练协调
def distributed_training_setup():
"""
分布式训练设置
"""
# Ray集群初始化
ray.init(address="auto")
# FSDP进程组初始化
torch.distributed.init_process_group("nccl")
# 工作角色分配
actor_workers = create_fsdp_workers(
model_path=model_path,
role="actor",
n_gpus=4
)
rollout_workers = create_vllm_workers(
model_path=model_path,
role="rollout",
tensor_parallel_size=2
)
ref_workers = create_fsdp_workers(
model_path=model_path,
role="reference",
n_gpus=2,
cpu_offload=True
)
reward_workers = create_reward_workers(
reward_function_path="examples/reward_function/math.py:compute_score"
)
return actor_workers, rollout_workers, ref_workers, reward_workers
7.4 内存和性能优化
7.4.1 内存优化策略
- FSDP完全分片: 将模型参数分片到多个GPU
- CPU卸载: 将不活跃参数卸载到CPU内存
- 梯度检查点: 重新计算部分前向传递以节省显存
- 混合精度: 使用bfloat16减少内存使用
- 动态批处理: 根据序列长度动态调整批大小
7.4.2 性能优化技术
- Flash Attention: 高效的注意力机制实现
- vLLM推理: 高吞吐量的推理引擎
- 张量并行: 在推理阶段使用多GPU并行
- Padding-Free训练: 避免不必要的填充token
- Chunked Prefill: 分块处理长序列的prefill阶段
8. 实验配置和预期结果
8.1 硬件要求
- GPU: 4x A100 80GB 或同等性能GPU
- 内存: 每GPU至少64GB系统内存
- 存储: 高速SSD存储,至少500GB可用空间
8.2 训练时间估算
- 数据集大小: Geometry3K约3000个样本
- 每轮训练时间: 约2-3小时(4xA100)
- 总训练时间: 15轮约30-45小时
- 验证频率: 每5轮进行一次验证
8.3 预期性能指标
- 格式准确率: >95% (模型学会使用正确的输出格式)
- 答案准确率: 在Geometry3K测试集上预期达到70-80%
- 综合得分: overall score预期达到0.75-0.85
- 收敛性: 通常在10-15轮内收敛
8.4 监控指标
-
训练指标:
- Actor loss (策略损失)
- KL divergence (与初始模型的KL散度)
- Reward scores (奖励分数分布)
- Gradient norms (梯度范数)
-
验证指标:
- Validation accuracy (验证集准确率)
- Format compliance (格式符合率)
- Response quality (回复质量)
-
系统指标:
- GPU memory utilization (GPU内存使用率)
- Training speed (训练速度)
- Convergence rate (收敛速率)
9. 故障排除和调试
9.1 常见问题
- OOM错误: 减小batch size或启用更多CPU卸载
- 收敛问题: 调整学习率或KL系数
- 格式问题: 检查prompt模板和奖励函数
- 性能问题: 优化数据加载和预处理流程
9.2 调试技巧
- 日志分析: 监控训练日志中的损失变化
- 样本检查: 定期检查生成的样本质量
- 梯度监控: 检查梯度的范数和分布
- 资源监控: 监控GPU和CPU资源使用情况
10. 总结
本文档详细描述了使用EasyR1框架在Geometry3K数据集上运行Qwen2.5-VL GRPO训练的完整流程。该流程集成了最新的多模态学习、强化学习和分布式训练技术,展现了现代AI系统的复杂性和先进性。
主要特色:
- 多模态RLHF: 支持图像+文本的强化学习训练
- GRPO算法: 提供更稳定的策略梯度优化
- 分布式高效: 通过FSDP和vLLM实现高效训练和推理
- 内存优化: 多层次的内存管理策略
- 结果导向: 专门为数学推理任务优化的奖励设计
这个系统为多模态推理任务的强化学习训练提供了一个完整的解决方案,可以作为类似任务的参考实现。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)