Skip to content

Conversation

DongheJin
Copy link
Collaborator

No description provided.

ep_size_ = parallel_args.ep_size();
ep_local_tp_size_ = parallel_args.world_size() / ep_size_;
CHECK_EQ(parallel_args.world_size(), ep_size_ * ep_local_tp_size_);
ep_local_tp_rank_ = parallel_args.rank() % ep_local_tp_size_;
num_experts_per_partition_ = model_args.n_routed_experts() / ep_size_;
if (FLAGS_enable_eplb) {
num_experts_per_partition_++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to make the num of redundant experts as a configurable param ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The redundant experts num has been modified to be a configurable parameter.

std::vector<int32_t> values;

for (const auto& [k, v] : expert_routing_map) {
keys.emplace_back(k);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be combined with the loop above?

  std::vector<int64_t> keys;
  std::vector<int32_t> values;
  for (auto& [key, indices] : expert_routing_map) {
    int num_of_duplications = indices.size();
    int selected_index = ep_rank_ % num_of_duplications;
    indices = {indices[selected_index]};

    keys.emplace_back(key);
    values.emplace_back(static_cast<int32_t>(indices[0]));
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for the suggestion!

matches_pos.emplace_back(
std::distance(device_expert_list_.begin(), iter) - start_idx);
}
}
std::lock_guard<std::mutex> lock(experts_mutex_);
torch::Tensor tmp_tensor =
is_sharded ? get_sharded_tensor(state_dict,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmp_tensor and tmp_tensor_shm are the same tensor. We should be able to use one of them, maybe no need to retrieve it again from the shard tensor ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for the suggestion!

bool transpose) {
auto merge_experts_weights_sart = std::chrono::high_resolution_clock::now();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The variable merge_experts_weights_sart is unused.

target_buffer = at_npu::native::npu_format_cast(target_buffer.contiguous(), 2)
.reshape({num_experts, gate_dim + up_dim, hidden_dim});

prepare_experts_weights_start = std::chrono::high_resolution_clock::now();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prepare_experts_weights_start is not used

{num_layers, num_device_experts},
torch::TensorOptions().dtype(torch::kInt64))
.clone());
layer_ids[worker_rank] = result.value().prepared_layer_id;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t understand this part. The size of the layer_ids array is num_device_experts - 1, but the loop below uses the size of results (where results.size() equals worker_clients_.size()). Can these two sizes match?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The redundant experts num has been modified to be a configurable parameter.

// For multi-node serving
// engine brpc server, all workers connect to engine_server_,
// engine_server_ will send a UniqueId for workers to
// create process group. And workers send worker brpc server
// address to engine, engine will create WorkerClient for each worker.
// Engine call workers to step via these WorkerClients.
std::shared_ptr<DistManager> dist_manager_ = nullptr;

std::unique_ptr<EplbManager> eplb_manager_ = nullptr;
std::unique_ptr<EplbPolicy> eplb_policy_ = nullptr;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: eplb_policy_ is only used in eplb_manager_, maybe it's more elegant to put it in eplb_manager_.

auto prev_max_val = torch::max(prev_load).item<double>() + 1e-6f;

current_load = (current_load / current_max_val).unsqueeze(0);
;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delete ;

state_.expert_load_queue.pop();
int64_t current_time = absl::ToUnixSeconds(absl::Now());
if (current_time - latest_record_time >= FLAGS_eplb_update_rate) {
latest_record_time = current_time;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: FLAGS_eplb_update_rate -> FLAGS_eplb_update_interval ?

std::vector<int32_t>(pb_forward_input->eplb_info().expert_ids().begin(),
pb_forward_input->eplb_info().expert_ids().end());
eplb_info.update_layer_id = pb_forward_input->eplb_info().update_layer_id();
forward_inputs.eplb_info = eplb_info;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: forward_inputs.eplb_info = eplb_info; This line of code is redundant. eplb_info is a reference of forward_inputs.eplb_info

@@ -236,4 +236,10 @@ struct JsonTool {
: type(tool_type), function(func) {}
};

struct EplbInfo {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments for struct / class and its public fields / methods please.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Added detailed comments.

#pragma once

#include <torch/torch.h>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list declarations in alphabetical order please.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clang-format's output based on our current rules. The ordering follows these specific priority rules:
1. Headers with .h suffix (like torch/torch.h) get highest priority
2. Other system headers (like functional) come next

@DongheJin DongheJin force-pushed the feature/eplb branch 5 times, most recently from 6943a2d to bfd630d Compare August 29, 2025 08:07
@liutongxuan liutongxuan merged commit e25e7a6 into jd-opensource:main Aug 30, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants