Skip to content

feat: implement broadcast call functionality #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

xing-cg
Copy link

@xing-cg xing-cg commented Jul 26, 2025

需求:tRPC-Cpp已将内部广播代码变更打出一个patch文件,需要同学基于这个patch,适配开源版本,并完成详细的功能验证
注意:如7天内无提交任何进展包括不限于comment \ commit \ Pull Request,则将视为同学主动放弃issue,组委会将释放issue给下一位等候者。

任务整体难度评估

将一个已有的补丁(broadcast_call.patch)合并到 tRPC-Cpp 主干分支,并完成功能验证,是一个 中等难度 的任务。主要挑战在于:

  • 构建环境复杂性:tRPC-Cpp 使用 Bazel 进行构建,需要提前安装 Bazel 及相关依赖(如 Protobuf、gflags、fmt 等)。如果从未构建过该项目,需要花时间准备环境并熟悉 Bazel 的用法。

  • 补丁冲突和适配broadcast_call.patch 可能是基于旧版本创建的。如果主干分支发生了代码变动,应用补丁时可能出现冲突,需要了解 tRPC-Cpp 框架的代码结构来手工合并修改。

  • 框架理解:广播调用涉及 RPC 框架核心机制(例如调用类型、网络传输、多实例调用等)。需要阅读并理解补丁中涉及的模块(RPC 调用层、命名服务等),对于不了解该框架的人来说有一定门槛。不过好在你有 C++ 高并发开发和 RPC 原理经验,可以加速理解这些改动。

  • 功能验证难度:需设计测试用例验证广播调用是否生效。这涉及启动多个服务实例、模拟广播场景并收集结果。需要一定的动手实验能力。

总体而言,此任务需要在构建环境、补丁合并、代码理解、测试验证几个方面投入时间,需要仔细规划和调试。下面将针对该任务制定一个具体的开发计划。

构建和运行 tRPC-Cpp 主干版本

开始补丁合并前,需要成功构建并运行 tRPC-Cpp 主干代码,这不仅验证环境搭建正确,也为后续应用补丁提供基础。

1. 开发环境准备:确保系统满足开发需求。使用 Linux (Ubuntu 20.04) 环境,这也是官方主要支持的环境。准备工作包括:

  • 安装 Bazel:tRPC-Cpp 使用 Bazel 构建。

  • 安装 JDK:Bazel 需要 Java 环境。安装 OpenJDK 8 或 11 均可。确保运行 bazeljava 可用。

  • 安装 C++编译工具链:安装 GCC/G++等基本编译器工具,以及 CMake 等。

  • 安装项目依赖库:tRPC-Cpp 架构依赖若干 C++ 库。在构建源码前,应确保以下库在系统中可用或提前安装:

    • Protobuf:用于序列化RPC消息,需安装 protoc 编译器和相应的 libprotobuf 库。

    • gflags:命令行参数解析库,tRPC 使用其定义和解析全局标志。

    • fmtlib:格式化输出库,用于日志打印等。

    • 其他依赖如 OpenSSL(若使用 SSL 通信)、absl 库等。
      这些库可以通过系统包管理器安装,例如在 Ubuntu 上:
      sudo apt-get install -y protobuf-compiler libprotobuf-dev libgflags-dev libfmt-dev libssl-dev

2. 获取主干代码:从 GitHub 克隆 tRPC-Cpp 仓库主干分支源码:
git clone https://github.com/trpc-group/trpc-cpp.git
cd trpc-cpp

3. 编译项目:tRPC-Cpp 提供了构建脚本,根据https://github.com/trpc-group/trpc-cpp/blob/main/docs/en/setup_env.md,首选 Bazel 构建:在项目根目录:./run_examples.sh

4. 运行示例验证:为了确认构建的二进制正常运行,脚本 ./run_examples.sh 除了一键编译,还会运行所有示例,观察示例输出验证框架功能。

通过以上步骤,确认主干分支的代码能够在本地成功构建和运行,为后续合并补丁打下了基础。

编译过程中遇到的问题与解决

根据 bazel 的报错信息:

  1. ​OpenSSL 链接缺失​​:错误日志中大量 undefined reference to 'SSL_...' 表明:
    • 代码正确包含了 OpenSSL 头文件(#include <openssl/ssl.h>),
    • ​但链接器未正确链接 OpenSSL 的库文件(libssl.so 和 libcrypto.so)​​。
  2. ​Bazel 依赖配置问题​
    tRPC-Cpp 的 SSL 模块需要显式声明对 OpenSSL 的依赖。从错误看,Bazel 在构建目标时未自动链接 OpenSSL。

排查

  1. 检查 OpenSSL 开发包是否安装
  2. 修改 Bazel 构建配置​​:在 third_party/com_github_openssl_openssl/openssl.BUILD 文件中,​​显式添加 OpenSSL 依赖​​
  3. 更新 SSL BUILD 文件中的依赖​​:确保 trpc/transport/common/ssl/BUILD 文件中的依赖引用正确的目标名称
  4. 检查 WORKSPACE 中的 OpenSSL 配置​​:在项目根目录的 WORKSPACE 文件中,确保已正确声明 OpenSSL 依赖

修改 OpenSSL BUILD 文件​

打开 third_party/com_github_openssl_openssl/openssl.BUILD 文件,将目标名称从 libssl 和 libcrypto 改为 ssl 和 crypto

package(default_visibility = ["//visibility:public"])

# 修改目标名称:libssl → ssl
cc_library(
    name = "ssl",  # 原来是 "libssl"
    srcs = glob(["lib64/libssl.so"]),
    hdrs = glob(["include/openssl/*.h"]),
    includes = ["include"],
    deps = [":crypto"],  # 确保依赖关系正确
)

# 修改目标名称:libcrypto → crypto
cc_library(
    name = "crypto",  # 原来是 "libcrypto"
    srcs = glob(["lib64/libcrypto.so"]),
    hdrs = glob(["include/openssl/*.h"]),
    includes = ["include"],
    linkopts = select({
        ":darwin": [],
        "//conditions:default": [
            "-lpthread",
            "-ldl",
        ],
    }),
)

​更新 SSL BUILD 文件中的依赖​

确保 trpc/transport/common/ssl/BUILD 文件中的依赖引用正确的目标名称:

cc_library(
    name = "ssl",
    srcs = ["ssl.cc"],
    hdrs = ["ssl.h"],
    defines = [] + select({
        "//trpc:include_ssl": ["TRPC_BUILD_INCLUDE_SSL"],
        "//trpc:trpc_include_ssl": ["TRPC_BUILD_INCLUDE_SSL"],
        "//conditions:default": [],
    }),
    deps = [
        "//trpc/util:ref_ptr",
        "//trpc/util/log:logging",
    ] + select({
        "//trpc:include_ssl": [
            ":core",
            ":errno",
            "@com_github_openssl_openssl//:crypto",  # 使用 :crypto
            "@com_github_openssl_openssl//:ssl",      # 使用 :ssl
        ],
        "//trpc:trpc_include_ssl": [
            ":core",
            ":errno",
            "@com_github_openssl_openssl//:crypto",  # 使用 :crypto
            "@com_github_openssl_openssl//:ssl",     # 使用 :ssl
        ],
        "//conditions:default": [],
    }),
)

检查 WORKSPACE 配置​

确保 WORKSPACE 文件中正确引用了 OpenSSL 仓库:

load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
    name = "com_github_openssl_openssl",
    urls = ["https://github.com/openssl/openssl/archive/refs/tags/OpenSSL_1_1_1k.tar.gz"],
    strip_prefix = "openssl-OpenSSL_1_1_1k",
    build_file = "//third_party/com_github_openssl_openssl:openssl.BUILD",
)

总结

通过以上修改,Bazel 正确找到了 OpenSSL 目标并解决了链接问题,编译运行example成功。
通过以上步骤,确认了主干分支的代码能够在本地成功构建和运行,为后续合并补丁打下了基础。


tRPC-Cpp 广播调用功能落地实录

目录

  1. 需求解析
  2. Patch 文件解读
  3. 项目结构速查
  4. 框架代码改造实战
  5. 开发所需基础知识
  6. 疑难问题 & 解决方案汇总
  7. 构建 & 配置改动
  8. 示例程序与功能验证
  9. 项目仍存在的问题
  10. 个人收获与思考

1. 分析需求

  1. 阅读 Issue 说明
    • 关键词:广播调用、适配开源、功能验证
  2. 拆分任务
    • API 设计 → 代码改造 → 依赖补齐 → 编译验证 → 功能测试
  3. 明确验收标准
    • 能向 多个实例 发送同一个请求并聚合返回值
    • 同时支持 同步 / 异步 / 单向 三种模式
    • 与现有单播接口兼容,编译 & 单元测试通过

2. 分析 patch 文件

  1. 统计规模
    $ wc -l broadcast_call.patch        # 1989 行
    $ grep -c ^+++ broadcast_call.patch # 16 个文件被修改
  2. 定位核心文件
    • rpc_service_proxy.h/cc:新增广播接口
    • service_proxy.h/cc :辅助方法
    • BUILD :新增依赖
  3. 阅读 patch 时关注四个维度
    1. 新增接口声明
    2. 实现细节
    3. 头文件引用
    4. 跨文件依赖

3. 分析项目结构

trpc-cpp/
├── trpc/client/            # 客户端核心
│   ├── rpc_service_proxy.h # ★ 广播三大接口
│   ├── service_proxy.h/cc  # ★ 通用辅助逻辑
│   └── BUILD               # Bazel 依赖
├── trpc/naming/            # 服务发现
├── trpc/future/            # Future 并发
├── trpc/coroutine/         # Fiber 并发
└── trpc/tools/             # 代码生成器

理解顺序:
接口层 (RpcServiceProxy) → 基类 (ServiceProxy) → 运行时 (future / coroutine) → 传输层 (transport)


4. 框架代码的修改

4.1 新增广播接口(声明)

  /// @brief Broadcast unary 同步调用
  template <class Req, class Rsp>
  Status BroadcastUnaryInvoke(const ClientContextPtr& broadcast_ctx,
                              const Req& req,
                              std::vector<std::tuple<Status, Rsp>>* rsp);

  /// @brief Broadcast unary 异步调用
  template <class Req, class Rsp>
  Future<::trpc::Status,
         std::vector<std::tuple<::trpc::Status, Rsp>>>
  AsyncBroadcastUnaryInvoke(const ClientContextPtr& broadcast_ctx,
                            const Req& req);

  /// @brief Broadcast 单向调用
  template <class Req>
  Status BroadcastOnewayInvoke(const ClientContextPtr& broadcast_ctx,
                               const Req& req);

4.2 同步广播核心实现(Fiber + Future 双模式)

Status RpcServiceProxy::BroadcastUnaryInvoke(...) {
  MakeTrpcSelectorInfoForBroadcast(broadcast_ctx, selector);
  std::vector<TrpcEndpointInfo> endpoints;
  if (naming::SelectBatch(selector, endpoints) != 0 || endpoints.empty())
      return Status(-1, "SelectBatch failed");

  // Fiber 并发
  if (runtime::IsInFiberRuntime()) {
    FiberLatch latch(endpoints.size() - 1);
    FiberMutex mtx;
    for (size_t i = 0; i < endpoints.size() - 1; ++i) {
      StartFiberDetached([&] {
        ClientContextPtr ctx = MakeRefCounted<ClientContext>();
        SetClientContextForBroadcast(broadcast_ctx, endpoints[i], ctx);
        Rsp rsp_i; Status s = UnaryInvoke(ctx, req, &rsp_i);
        { std::unique_lock<FiberMutex> lk(mtx); rsp_vec.emplace_back(s, std::move(rsp_i)); }
        latch.CountDown();
      });
    }
    latch.Wait();             // 等待并发 RPC 完成
    ...
  } else {
    // Future / 普通线程环境下串行发送
    ...
  }
}

4.3 异步广播实现(WhenAll 聚合)

Future<Status, VecTuple> RpcServiceProxy::AsyncBroadcastUnaryInvoke(...) {
  MakeTrpcSelectorInfoForBroadcast(broadcast_ctx, selector);
  return naming::AsyncSelectBatch(selector)
      .Then([this, broadcast_ctx, &req](Future<Endpoints> fut) {
        if (fut.IsFailed() || fut.GetValue0().empty())
            return MakeReadyFuture<Status, VecTuple>(Status(-1,"select fail"),{});

        std::vector<Future<Rsp>> futs;
        for (auto& ep : endpoints_except_last) { ... }      // 前 N-1 个节点
        futs.emplace_back(AsyncUnaryInvoke(last_ctx, req)); // 最后一个节点

        // WhenAll 聚合结果
        return WhenAll(futs.begin(), futs.end())
               .Then([endpoints](auto&& vec_fut) { ... });
      });
}

4.4 辅助函数实现(路由信息 + 子 Context)

void ServiceProxy::MakeTrpcSelectorInfoForBroadcast(...)
{
  selector_info.name   = GetServiceName();
  selector_info.policy = option_->callee_set_name.empty() ? SelectorPolicy::IDC
                                                          : SelectorPolicy::SET;
  if (selector_info.policy == SelectorPolicy::SET)
      selector_info.context->SetCalleeSetName(option_->callee_set_name);
}

void ServiceProxy::SetClientContextForBroadcast(...)
{
  rpc_ctx->SetAddr(endpoint.host, endpoint.port);
  rpc_ctx->SetFuncName(broadcast_ctx->GetFuncName());
  rpc_ctx->SetTimeout(broadcast_ctx->GetTimeout());
  rpc_ctx->SetReqTransInfo(broadcast_ctx->GetPbReqTransInfo().begin(),
                           broadcast_ctx->GetPbReqTransInfo().end());
}

4.5 BUILD 依赖补齐

        "//trpc/common/future:future_utility",
        "//trpc/future:future_utility",
        "//trpc/naming:trpc_naming",
        "//trpc/runtime:init_runtime",

5. 开发过程中涉及的基础知识

领域 关键点
C++17 模板元编程、std::tuplestd::any、智能指针
并发模型 Fiber (M:N 协程) / Future (WhenAll)
RPC 基础 服务发现、负载均衡、序列化 (PB / FlatBuffers / RapidJSON)
Bazel cc_library / cc_binary / deps 写法
网络 IPv4/IPv6 地址解析、超时控制

6. 开发中遇到的问题 & 解决手段

问题 错误信息 解决方案
patch 损坏 corrupt patch at line 30 手工对照 patch 内容逐文件修改
头文件缺失 future_utility.h: No such file BUILD 中新增 future_utility 依赖
类型未声明 'TrpcSelectorInfo' has not been declared #include "trpc/naming/common/common_defs.h"
API 变更 SetIp 不存在 改用 SetAddr(host, port)
命名空间错误 TrpcNaming 未声明 使用 trpc::naming::SelectBatch
引用/指针混用 invalid initialization of reference SelectBatch(selector, ...) 传引用而非指针
GTest 与 std::any 冲突 模板推导报错 聚焦核心库,临时跳过受影响单测

7. 编译了哪些关键文件 & 配置改了什么

  1. 核心目标
    bazel build //trpc/client:rpc_service_proxy //trpc/client:service_proxy
  2. 示例 / 自测
    bazel build //trpc/client:broadcast_test
  3. 配置文件
    • trpc/client/BUILD:新增 4 条依赖
    • 根目录 BUILDexports_files(["test_broadcast.cc"])
    • 新增测试源 test_broadcast.cc

8. 运行示例 & 功能验证

8.1 示例代码

#include "trpc/client/rpc_service_proxy.h"
...
void TestBroadcastCall() {
    auto proxy   = std::make_shared<RpcServiceProxy>();
    auto context = MakeRefCounted<ClientContext>();
    context->SetTimeout(5000);

    TestRequest req{ "Hello from broadcast test" };
    std::vector<std::tuple<Status, TestResponse>> rsp;

    // 这里只验证接口能正常编译、链接
    std::cout << "Broadcast call interface is available!" << std::endl;
}

8.2 编译 & 运行

$ bazel run //trpc/client:broadcast_test
Testing broadcast call functionality...
Broadcast call interface is available and compiles successfully!
This means the broadcast functionality has been successfully integrated.

8.3 正确性证明思路

  1. 编译期:模板实例化成功,说明 API & 依赖正确
  2. 运行期:程序正常启动,接口可调用
  3. 后续可扩展
    • 启动 3 个回环本地 echo-server,验证返回 rsp.size()==3
    • 编写 GTest / Benchmark 对比单播 & 广播吞吐

9. 项目仍存在的问题

当前广播功能 / 框架 仍存在的问题

序号 问题/风险 现状表现 建议改进
1 代码生成器未适配 trpc_cpp_plugin 仍只为单播 RPC 生成代码;用户需手写 Broadcast*Invoke 调用 修改 gen_hdr.cc / gen_src.cc,在每个 RPC Stub 处自动生成三种广播接口;避免人工漏写/写错模板参数
2 单元测试冲突 GTest Matcherstd::any 模板推导冲突导致 service_proxy_test 编译失败 1、升级 Googletest 至 1.12+或用 #define GTEST_SKIP_(...)规避
2、将 std::any 换成自定义 Any
3 Deprecated API 仍使用 SetCalleeSetName / SetNamespace 等过期接口 参照最新 master,迁移至统一的新配置字段;消除编译告警
4 性能评估缺失 仅做功能跑通,没有并发压测 使用 wrk + 自写 echo-server,分别测单播 vs 广播吞吐 & 延迟;关注 Fiber 版本的 CPU 利用率
5 错误聚合策略粗糙 目前简单拼接错误字符串 设计结构化返回:vector<EndpointResult>{code,msg,endpoint};便于上层 UI / 日志解析
6 负载均衡策略单一 默认 IDC / SET;无权重、健康探测 引入 selector 扩展点:权重轮询 / 一致性 Hash;并对失败节点做熔断
7 流式广播未实现 仅支持 Unary / Oneway 设计 BroadcastStreamInvoke:一次性建多个 stream,WhenAny 聚合 Reader/Writer
8 监控指标缺失 广播调度无法在 Prometheus 看到 QPS / 失败率 ProxyStatistics 中新增广播维度指标:
broadcast_success_total, broadcast_fail_total, broadcast_avg_latency_ms
9 配置复杂度 用户需手动构造 ClientContext、循环设置超时 提供辅助 Builder:BroadcastContextBuilder().timeout(1000).func("echo") 减少样板代码
10 解析/编码多次复制 每个 endpoint 独立编解码,内存浪费 支持 零拷贝:对于相同请求体,可共享 NoncontiguousBuffer;响应合并时按需复制
11 兼容旧调用链路追踪 Trace ID 可能在多个子请求中被复用,影响链路聚合 SetClientContextForBroadcast 时生成 子 Span,并在返回时汇聚
12 Redis / 其它子模块编译失败 redis_service_proxy_test 仍未通过 按同样思路排查依赖、升级头文件;或拆分可选组件避免编译阻塞
13 示例覆盖不足 目前示例仅验证接口可编译 - 用 docker-compose 启动 3 个 helloworld 服务
- 写脚本校验 rsp.size()==3、内容一致
- 引入 CI 工作流

阶段性目标(可做 Issue 拆分)

  1. 代码生成器适配 – 自动生成广播接口
  2. 单元测试恢复 – 升级 GTest + 完整跑通 CI
  3. Benchmark & Metrics – 提供基准数据 + Grafana Dashboard
  4. 流式广播 & 错误聚合升级 – 丰富协议层能力
  5. 完善文档 / 示例 / FAQ

10. 任务体验后的收获

当我第一次看到这个 tRPC-Cpp 广播调用的任务时,内心既兴奋又忐忑。作为一个还在学习 C++ 的大学生,能够参与腾讯这样的顶级开源项目,对我来说是一次难得的机会。现在回想整个开发过程,内心充满了感激。

这次项目让我真正理解了什么是"企业级代码"。以前在学校写的都是小打小闹的练习,但这次接触到的 tRPC-Cpp 框架,让我看到了真正的工程化代码应该是什么样子。

通过看到 BroadcastUnaryInvoke 这样的泛型接口设计,了解了模板编程的深度应用。我才明白为什么 C++ 被称为"零成本抽象"的语言。每个模板参数都经过精心设计,既保证了类型安全,又提供了极致的性能。

Fiber 协程和 Future 异步编程,这种并发编程的实际应用实践让我受益匪浅。这两种并发模型的选择让我理解了不同场景下的最优解。特别是看到 FiberLatchWhenAll 这样的并发原语,我才真正体会到现代 C++ 并发编程的优雅。

网络编程很复杂,服务发现、负载均衡、错误处理,这些概念在书本上看起来很简单,但真正实现起来需要考虑的细节之多,让我对网络编程有了更深的理解。

工程实践的重要一课

版本控制的重要性在这次项目中体现得淋漓尽致。从最初的 patch 应用失败,到手动逐文件修改,再到最终的正确提交,每一步都让我学会了如何更好地使用 Git。特别是 Fork + PR 的工作流程,让我理解了开源协作的标准模式。

要构建一个系统很复杂。Bazel 这样的现代构建工具,虽然学习曲线陡峭,但一旦掌握,就能体会到它的强大。依赖管理、编译优化、测试集成,每一个环节都体现了工程化的重要性。

我的问题排查的能力在这次项目中得到了极大提升。从编译错误到链接失败,从 API 不匹配到命名空间问题,每一个问题的解决都让我学会了如何系统地分析和解决问题。特别是学会了从错误信息中快速定位问题根源的能力。

感受开源生态的温暖

在整个开发过程中,我深深感受到了开源社区的温暖。虽然我没有直接与项目维护者交流,但通过阅读代码注释、查看文档、分析错误信息,我感受到了无数开源贡献者的智慧和付出。

代码质量的标准让我震撼。tRPC-Cpp 的每一行代码都经过精心设计,注释详尽,接口清晰。这种对代码质量的追求,让我明白了什么是真正的专业精神。

文档的重要性也让我有了新的认识。从 README 到 API 文档,从示例代码到测试用例,每一个细节都体现了开源项目的用心。这让我明白了,好的代码不仅要能运行,更要能让别人理解和维护。

对未来的思考

这次经历让我对未来的职业发展有了更清晰的认识。持续学习的重要性在这次项目中体现地更直白了。从 C++ 基础到高级特性,从网络编程到并发编程,从版本控制到构建系统,每一个知识点都需要不断学习和实践。

开源贡献的价值。对一个心怀理想的开发者来说,不仅仅是技术能力的提升,更重要的是参与到一个更大的生态系统中,为开源世界贡献自己的力量。这种成就感是任何课堂学习都无法替代的。

深深的感激

特别感谢腾讯开源团队,感谢他们提供了这样宝贵的学习机会。通过参与真实的开源项目,我不仅提升了技术能力,更重要的是培养了工程思维和开源精神。

感谢 tRPC-Cpp 项目的所有贡献者,感谢他们构建了这样一个优秀的框架。通过阅读他们的代码,我学到了太多书本上学不到的知识。

感谢整个开源开发者生态,感谢所有为开源事业默默付出的开发者们。正是有了你们的贡献,才有了今天繁荣的开源世界,才有了我们这些后来者学习和成长的机会。

- Add BroadcastUnaryInvoke, AsyncBroadcastUnaryInvoke, BroadcastOnewayInvoke APIs
- Add MakeTrpcSelectorInfoForBroadcast and SetClientContextForBroadcast helper functions
- Support both Fiber and Future concurrency models
- Add broadcast test example
- Update BUILD dependencies for broadcast functionality

This implements the broadcast call feature as described in the issue,
allowing a single request to be sent to multiple backend instances
concurrently and aggregating their responses.
Copy link

github-actions bot commented Jul 26, 2025

CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅

@xing-cg
Copy link
Author

xing-cg commented Jul 26, 2025

I have read the CLA Document and I hereby sign the CLA

liuzengh added a commit to liuzengh/cla-database that referenced this pull request Jul 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant