Skip to content

Conversation

@JYMiracle305
Copy link
Contributor

@JYMiracle305 JYMiracle305 commented Nov 4, 2025

1. 主要修改:

新增支持Pipeline Parallel(PP)特性,支持把原始model按最外层结构平均分配到多个rank,各个rank持有部分layer以及相应parameters,训练过程forward按计算图顺序从rank 0向rank n计算,barkward从rank n向rank 0计算,各个rank之间通过点对点通信。

net.cc: 根据PP_size和pp_rank在构建模型时构建属于本rank的子块和对应参数。
pipeline_parallel.cc: PipelineParallel封装model,每个rank对应1个PipelineParallel,完成关联PipelineSchedule、PipelineStage、Optimizers,提供TrainStep函数为训练入口,调用调度器的训练方法Step。
pipeline_schedule.cc: PipelineSchedule调度器基类,提供Step函数为完整一轮训练的方法;ScheduleGPipe为调度器子类GPipe实现(示意图如下),StepMicrobatches为调度具体实现。
image
pipeline_stage.cc: PipelineStage表示当前rank所持有的子图,提供ForwardOneChunk方法执行当前子图内部的forward的计算。
send_recv.cc:ISend和IRecv是两个autograd节点,用于在rank间定向发送张量,依赖于autograd机制,在rank x的反向中最后一步为发送梯度到rank x-1,然后调用rank x-1上的ISend::Backward接收梯度,并开始rank x-1的反向过程

2. 命令参数:

--pipeline_parallel #uint32类型,表示打开pipeline parallel,参数值为并行的设备数,即stage数量

示例:
./llama3 --input_bin <input_path> --llmc_filepath <model_path> --device cuda --nthread_per_process 8 --batch_size 10 --total_batch_size 5120 --num_iteration 10 --pipeline_parallel 8

@JYMiracle305 JYMiracle305 force-pushed the add_pp branch 2 times, most recently from 7552d8e to 1083190 Compare November 5, 2025 09:20
ifs.seekg(base + std::streamoff(len * sizeof(float)));
}

std::vector<int> GetPipelineParallelGroupRanks(int pp_world_size) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数挪到 infini_train/src/nn/parallel/utils.cc 里,与 ddp/tp 保持一致

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数挪到 infini_train/src/nn/parallel/utils.cc 里,与 ddp/tp 保持一致

OK


void ReadVectorShardFloat(std::ifstream &ifs, float *dst, int64_t len, int64_t start, int64_t cnt);

std::vector<int> GetPipelineParallelGroupRanks(int rank);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

挪到 infini_train/src/nn/parallel/utils.h 里

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

挪到 infini_train/src/nn/parallel/utils.h 里

OK

DEFINE_bool(sequence_parallel, false, "Whether to enable Sequence Parallel");
DEFINE_uint32(
pipeline_parallel, 1,
"Pipeline Parallel world size, will always use device=cuda and use all cuda visible devices when set to true");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use all cuda visible devices when set to true

pp 现在应该是能设置使用的卡数的?这里是忘改了吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对,这个注释忘改了

auto optimizer_factory = [lr](const std::vector<std::shared_ptr<Tensor>> &params) {
return std::make_shared<optimizers::SGD>(params, lr);
};
auto optimizer = optimizer_factory(model->Parameters());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么不直接初始化 optimizer 呢?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PP传入了参数optimizer_factory,这里是为了和PP统一,现在先分块,PP应该也可以直接传optimizer了

auto num_microbatches = FLAGS_total_batch_size / (FLAGS_batch_size * FLAGS_sequence_length * ddp_world_size);
DistributedDataLoader train_loader(std::make_shared<TinyShakespeareDataset>(FLAGS_input_bin, FLAGS_sequence_length),
FLAGS_batch_size, ddp_rank, ddp_world_size);
FLAGS_batch_size * num_microbatches, ddp_rank, ddp_world_size);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里得为 pp 专门区分,因为在非 pp 情况下,num_microbatches=grad_accum_steps,而非 pp 在梯度累积情况下每次是只读取 batch_size 大小数据,靠外层循环多次进行的梯度累计

}

void ISend::SetupContext(const std::vector<std::shared_ptr<Tensor>> &input_tensors,
const std::vector<std::shared_ptr<Tensor>> &output_tensors) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

啥也没干可以不 override 这个方法

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

啥也没干可以不 override 这个方法

OK

const std::vector<std::shared_ptr<Tensor>> &output_tensors) {}

std::vector<std::shared_ptr<Tensor>> ISend::Backward(const std::vector<std::shared_ptr<Tensor>> &grad_outputs) {
auto shapes = shapes_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shapes 没修改的话,直接用 shapes_ 就行

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shapes 没修改的话,直接用 shapes_ 就行

OK

auto shapes = shapes_;
std::vector<std::shared_ptr<Tensor>> recv_tensors;
for (int shape_i = 0; shape_i < shapes.size(); ++shape_i) {
auto r_tensor = std::make_shared<Tensor>(shapes[shape_i], DataType::kFLOAT32, input_device_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

写死 fp32 在低精度时会不会有问题?

}

std::vector<std::shared_ptr<Tensor>> IRecv::Forward(const std::vector<std::shared_ptr<Tensor>> &recv_tensors) {
CHECK_NE(src_device_, nullptr) << "src_device_ must be set";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CHECK_NOTNULL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CHECK_NOTNULL

OK


void SGD::Step() {
for (auto param : params_) {
// FIXME(jym): skip parameters with empty gradients
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是不是不需要 FIXME 了,因为下面已经做了 skip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是不是不需要 FIXME 了,因为下面已经做了 skip

OK

@JYMiracle305 JYMiracle305 force-pushed the add_pp branch 4 times, most recently from adcd7ab to 74e517f Compare November 14, 2025 10:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants