Colossal-AI多卡并行训练ViT大模型(代码+流程)
TENSOR_PARALLEL_SIZE(张量并行的GPU数目,影响单卡的显存,根据GPU的数量设置)NUM_CLASSES(最终输出的分类数,根据数据集的分类数设置,如CIFAR-10就配置为10)MLP_RATIO(中间层神经元个数为MLP_RATIO*HIDDEN_SIZE,影响显存)(2)编写训练文件standard_trainer_with_cifar10.py。HIDDEN_SIZE(
(一)Colossal-AI系统下载
(1)进入github官网搜索Colossal-AI下载代码至本地:hpcaitech/ColossalAI: Making large AI models cheaper, faster and more accessible (github.com)
但是该系统源码较大,且有时候github连不上或下载速度慢,因此将资源同步在了csdn方便下载,下载链接:【免费】免费下载加速deepseek、llama等模型训练的Colossal-AI并行训练系统代码(包括系统源码,依赖文件,模型训练样例代码以及安装使用教程)资源-CSDN文库
(2)进入根目录安装依赖
pip install -r requirements/requirements.txt
(3)安装Colossal-AI系统
pip install .
(二)ViT模型训练
(1)配置config.py文件,需要手动修改的核心配置如下:
TENSOR_PARALLEL_SIZE(张量并行的GPU数目,影响单卡的显存,根据GPU的数量设置)
HIDDEN_SIZE(隐藏层大小,影响显存,建议8的倍数)
BATCH_SIZE(批量大小,影响显存,建议8的倍数)
MLP_RATIO(中间层神经元个数为MLP_RATIO*HIDDEN_SIZE,影响显存)
NUM_CLASSES(最终输出的分类数,根据数据集的分类数设置,如CIFAR-10就配置为10)
from colossalai.amp import AMP_TYPE
# hyperparameters
# BATCH_SIZE is as per GPU
# global batch size = BATCH_SIZE x data parallel size
BATCH_SIZE = 64
LEARNING_RATE = 3e-3
#LEARNING_RATE = 0.01
WEIGHT_DECAY = 0.3
NUM_EPOCHS = 120
WARMUP_EPOCHS = 3
# model config
IMG_SIZE = 32
PATCH_SIZE = 4
HIDDEN_SIZE = 2048
DEPTH = 12
NUM_HEADS = 16
MLP_RATIO = 4
NUM_CLASSES = 10
CHECKPOINT = False
SEQ_LENGTH = (IMG_SIZE // PATCH_SIZE)**2 + 1 # add 1 for cls token
# parallel setting
TENSOR_PARALLEL_SIZE = 8
TENSOR_PARALLEL_MODE = '1d'
parallel = dict(
pipeline=1,
tensor=dict(mode=TENSOR_PARALLEL_MODE, size=TENSOR_PARALLEL_SIZE),
)
#fp16 = dict(mode=AMP_TYPE.NAIVE)
#cliip_grad_norm = 1.0
clip_grad_norm = 1.0
#gradient_accumulation = 16
# pipeline config
NUM_MICRO_BATCHES = parallel['pipeline']
(2)编写训练文件standard_trainer_with_cifar10.py
import os
import time
import colossalai
import torch
from colossalai.context import ParallelMode
from colossalai.core import global_context as gpc
from colossalai.logging import get_dist_logger
from colossalai.nn import Accuracy, CrossEntropyLoss
from colossalai.trainer import Trainer, hooks
from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR
from colossalai.utils import is_using_pp, get_dataloader, MultiTimer
from colossalai.pipeline.pipelinable import PipelinableContext
from titans.model.vit.vit import _create_vit_model
from tqdm import tqdm
from titans.dataloader.cifar10 import build_cifar
def main():
# initialize distributed setting
parser = colossalai.get_default_parser()
args = parser.parse_args()
# launch from torch
colossalai.launch_from_torch(config=args.config)
# get logger
logger = get_dist_logger()
logger.info("initialized distributed environment", ranks=[0])
if hasattr(gpc.config, 'LOG_PATH'):
if gpc.get_global_rank() == 0:
log_path = gpc.config.LOG_PATH
if not os.path.exists(log_path):
os.mkdir(log_path)
logger.log_to_file(log_path)
use_pipeline = is_using_pp()
# create dataloaders
root = os.environ.get('DATA', './data')
train_dataloader, test_dataloader = build_cifar(gpc.config.BATCH_SIZE, root, pad_if_needed=True)
#zx:设置iterations_per_epoch
#iterations_per_epoch = len(train_dataloader) + len(test_dataloader)
#gpc.set_iterations_per_epoch(iterations_per_epoch)
# create model
model_kwargs = dict(img_size=gpc.config.IMG_SIZE,
patch_size=gpc.config.PATCH_SIZE,
hidden_size=gpc.config.HIDDEN_SIZE,
depth=gpc.config.DEPTH,
num_heads=gpc.config.NUM_HEADS,
mlp_ratio=gpc.config.MLP_RATIO,
num_classes=10,
init_method='torch',
checkpoint=gpc.config.CHECKPOINT)
if use_pipeline:
pipelinable = PipelinableContext()
with pipelinable:
model = _create_vit_model(**model_kwargs)
pipelinable.to_layer_list()
pipelinable.policy = "uniform"
model = pipelinable.partition(1, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE))
else:
model = _create_vit_model(**model_kwargs)
# count number of parameters
total_numel = 0
for p in model.parameters():
total_numel += p.numel()
if not gpc.is_initialized(ParallelMode.PIPELINE):
pipeline_stage = 0
else:
pipeline_stage = gpc.get_local_rank(ParallelMode.PIPELINE)
logger.info(f"number of parameters: {total_numel} on pipeline stage {pipeline_stage}")
# create dataloaders
#root = os.environ.get('DATA', './data')
#train_dataloader, test_dataloader = build_cifar(gpc.config.BATCH_SIZE, root, pad_if_needed=True)
# create loss function
criterion = CrossEntropyLoss(label_smoothing=0.1)
# create optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY)
# create lr scheduler
lr_scheduler = CosineAnnealingWarmupLR(optimizer=optimizer,
total_steps=gpc.config.NUM_EPOCHS,
warmup_steps=gpc.config.WARMUP_EPOCHS)
# initialize
engine, train_dataloader, test_dataloader, _ = colossalai.initialize(model=model,
optimizer=optimizer,
criterion=criterion,
train_dataloader=train_dataloader,
test_dataloader=test_dataloader)
logger.info("Engine is built", ranks=[0])
if gpc.is_initialized(ParallelMode.PARALLEL_1D):
scatter_gather = True
else:
scatter_gather = False
logger.info("Engine is built", ranks=[0])
# create timer
timer = MultiTimer()
# create trainer
trainer = Trainer(engine=engine, logger=logger, timer=timer)
logger.info("Trainer is built", ranks=[0])
# create a list of useful hooks
hook_list = [
hooks.LogMetricByEpochHook(logger=logger),
hooks.LogMetricByStepHook(),
hooks.AccuracyHook(accuracy_func=Accuracy()),
hooks.LossHook(),
hooks.ThroughputHook(),
hooks.LRSchedulerHook(lr_scheduler=lr_scheduler, by_epoch=True)
]
#统计时间
start_total = time.time()
trainer.fit(
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
epochs=gpc.config.NUM_EPOCHS,
hooks=hook_list,
display_progress=True,
test_interval=1,
)
if __name__ == '__main__':
main()
(3)运行训练代码
--nproc_per_node:GPU个数
--config:配置文件
standard_trainer_with_cifar10.py:训练文件
colossalai run --nproc_per_node 8 standard_trainer_with_cifar10.py --config maxoneconfig.py
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)