-
Notifications
You must be signed in to change notification settings - Fork 103
【腾讯犀牛鸟计划】体验trpc RPC 流式服务开发 #218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅ |
I have read the CLA Document and I hereby sign the CLA |
调用方的 P99 延时在 10ms 左右时,测量服务的 QPS为50.8W |
运行
几秒后输出
阅读run.sh源码,发现其启动stream_server.cc const auto& config = ::trpc::TrpcConfig::GetInstance()->GetServerConfig();
service_name = fmt::format("{}.{}.{}.{}", "trpc", config.app, config.server, "RawDataStreamService");
if (status.OK()) {
++request_counter;
request_bytes += request.msg().size();
TRPC_FMT_INFO("server got request: {}", request.msg());
continue;
}
if (status.StreamEof()) {
std::stringstream reply_msg;
reply_msg << "server got EOF, reply to client, server got request"
<< ", count:" << request_counter << ", received bytes:" << request_bytes;
reply->set_msg(reply_msg.str());
TRPC_FMT_INFO("reply to the client: {}", reply_msg.str());
status = ::trpc::Status{0, 0, "OK"};
break;
}
调用 // Client streaming RPC.
::trpc::Status StreamGreeterServiceImpl::ClientStreamSayHello(
const ::trpc::ServerContextPtr& context,
const ::trpc::stream::StreamReader<::trpc::test::helloworld::HelloRequest>& reader,
::trpc::test::helloworld::HelloReply* reply) {
::trpc::Status status{};
uint32_t request_counter{0};
uint32_t request_bytes{0};
for (;;) {
::trpc::test::helloworld::HelloRequest request{};
status = reader.Read(&request, 3000);
if (status.OK()) {
++request_counter;
request_bytes += request.msg().size();
TRPC_FMT_INFO("server got request: {}", request.msg());
continue;
}
if (status.StreamEof()) {
std::stringstream reply_msg;
reply_msg << "server got EOF, reply to client, server got request"
<< ", count:" << request_counter << ", received bytes:" << request_bytes;
reply->set_msg(reply_msg.str());
TRPC_FMT_INFO("reply to the client: {}", reply_msg.str());
status = ::trpc::Status{0, 0, "OK"};
break;
}
TRPC_FMT_ERROR("stream got error: {}", status.ToString());
break;
}
return status;
} 客户端流式RPC完整代码 for (int i = 0; i < request_count; ++i) {
std::stringstream reply_msg;
reply_msg << " reply: " << request.msg() << "#" << (i + 1);
::trpc::test::helloworld::HelloReply reply{};
reply.set_msg(reply_msg.str());
status = writer->Write(reply);
if (status.OK()) {
continue;
}
TRPC_FMT_ERROR("stream got error: {}", status.ToString());
break;
// Server streaming RPC.
::trpc::Status StreamGreeterServiceImpl::ServerStreamSayHello(
const ::trpc::ServerContextPtr& context,
const ::trpc::test::helloworld::HelloRequest& request, // NO LINT
::trpc::stream::StreamWriter<::trpc::test::helloworld::HelloReply>* writer) {
::trpc::Status status{};
// A simple case, try to reply 10 response messages to the client.
int request_count = 10;
for (int i = 0; i < request_count; ++i) {
std::stringstream reply_msg;
reply_msg << " reply: " << request.msg() << "#" << (i + 1);
::trpc::test::helloworld::HelloReply reply{};
reply.set_msg(reply_msg.str());
status = writer->Write(reply);
if (status.OK()) {
continue;
}
TRPC_FMT_ERROR("stream got error: {}", status.ToString());
break;
}
return status;
} 完整服务端流式代码 |
客户端流式处理 RPC,其中客户端写入一系列消息并将其发送到服务器,再次使用提供的流。客户端完成消息写入后,它会等待服务器读取所有消息并返回其响应。客户端发送一个或者多个请求消息,服务端发送一个响应消息。 // Client Streaming RPC (客户端流式)
rpc ClientStreamSayHello (stream Request) returns (Reply) {} 服务器端流式处理 RPC,其中客户端向服务器发送请求并获取流以读回一系列消息。客户端从返回的流中读取,直到没有更多消息。客户端发送一个请求消息,服务端发送一个或多个响应消息。 // Server Streaming RPC (服务端流式)
rpc ServerStreamSayHello (Request) returns (stream Reply) {} 双向流式处理 RPC,其中双方都使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按他们喜欢的任何顺序进行读取和写入。客户端发送一个或者多个请求消息,服务端发送一个或者多个响应消息。 // Bidirectional Streaming RPC (双向流式)
rpc BidiStreamSayHello(stream Request) returns (stream Reply) {}
int GetRequestCount(int request_count) {
if (request_count > 0) {
return std::min(100, FLAGS_request_count);
}
return std::numeric_limits<int>::max();
} 若 |
syntax = "proto3";
package trpc.test.shopping;
// 定义商品抢购请求消息
message FlashSaleRequest {
int32 user_id = 1;
int32 product_id = 2;
int32 quantity = 3;
}
// 定义商品抢购响应消息
message FlashSaleReply {
bool success = 1;
string message = 2;
}
// 定义商品抢购服务
service FlashSaleService {
// 商品抢购 RPC 方法
rpc FlashSale (FlashSaleRequest) returns (FlashSaleReply) {}
} |
服务端 class StreamGreeterServiceImpl final : public ::trpc::test::helloworld::StreamGreeter {
public:
::trpc::Status FlashSale(::trpc::ServerContextPtr context,
const ::trpc::test::helloworld::FlashSaleRequest* request,
::trpc::test::helloworld::FlashSaleReply* reply) override {
int32_t user_id = request->user_id();
int32_t product_id = request->product_id();
int32_t quantity = request->quantity();
TRPC_FMT_INFO("User {} is trying to flash sale product {} with quantity {}", user_id, product_id, quantity);
// 这里简单模拟抢购成功,实际需实现库存检查等逻辑
reply->set_success(true);
reply->set_message("Flash sale successful");
return ::trpc::kSuccStatus;
}
}; |
客户端 bool CallFlashSale(const StreamGreeterServiceProxyPtr& proxy, int user_id, int product_id, int quantity) {
auto context = ::trpc::MakeClientContext(proxy);
::trpc::test::helloworld::FlashSaleRequest request;
::trpc::test::helloworld::FlashSaleReply reply;
// 设置请求参数
request.set_user_id(user_id);
request.set_product_id(product_id);
request.set_quantity(quantity);
// 调用商品抢购服务
::trpc::Status status = proxy->FlashSale(context, &request, &reply);
if (status.OK()) {
if (reply.success()) {
std::cout << "Flash sale successful. Message: " << reply.message() << std::endl;
return true;
} else {
std::cerr << "Flash sale failed. Message: " << reply.message() << std::endl;
}
} else {
std::cerr << "RPC call failed: " << status.ToString() << std::endl;
}
return false;
}
std::string calling_name{""};
std::function<bool()> calling_executor{nullptr};
if (rpc_method == "ClientStreamSayHello") {
calling_name = "Streaming RPC, ClientStreamSayHello";
calling_executor = [&stream_greeter_proxy, request_count]() {
return CallClientStreamSayHello(stream_greeter_proxy, request_count);
};
} else if (rpc_method == "ServerStreamSayHello") {
calling_name = "Streaming RPC, ServerStreamSayHello";
calling_executor = [&stream_greeter_proxy]() { return CallServerStreamSayHello(stream_greeter_proxy); };
} else if (rpc_method == "BidiStreamSayHello") {
calling_name = "Streaming RPC, BidiStreamSayHello";
calling_executor = [&stream_greeter_proxy, request_count]() {
return CallBidiStreamSayHello(stream_greeter_proxy, request_count);
};
} else if (rpc_method == "FlashSale") {
calling_name = "Flash Sale RPC";
calling_executor = [&stream_greeter_proxy]() {
return CallFlashSale(stream_greeter_proxy, FLAGS_user_id, FLAGS_product_id, FLAGS_quantity);
};
} else {
std::cout << "RPC method is invalid, nothing todo" << std::endl;
return 0;
}
// Executing multiple cases is to send concurrent requests.
for (int i = 0; i < 8; i++) {
callings.push_back({calling_name + std::to_string(i + 1), calling_executor, false});
}
auto latch_count = static_cast<std::ptrdiff_t>(callings.size());
::trpc::FiberLatch callings_latch{latch_count};
for (auto& c : callings) {
::trpc::StartFiberDetached([&callings_latch, &c]() {
c.ok = c.calling_executor();
callings_latch.CountDown();
});
}
callings_latch.Wait();
for (const auto& c : callings) {
final_ok &= c.ok;
std::cout << "name: " << c.calling_name << ", ok: " << c.ok << std::endl;
}
std::cout << "final result of streaming RPC calling: " << final_ok << std::endl;
return final_ok ? 0 : -1;
} |
SseEventSseEvent结构 /// @brief SSE event structure
struct SseEvent {`
std::string event_type; // event field
std::string data; // data field
std::string id; // id field
std::optional<int> retry; // retry field (milliseconds) 处理带有event的字段核心代码 if (!event_type.empty()) {
result += "event: " + event_type + "\n";
} 处理字段核心代码 if (!data.empty()) {
// Handle multi-line data
std::istringstream iss(data);
std::string line;
while (std::getline(iss, line)) {
result += "data: " + line + "\n";
}
} 测试夹具 class SseEventTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
}; 典型测试用例,参考Sse规范文档 TEST_F(SseEventTest, ToStringBasicMessage) {
SseEvent event;
event.data = "This is the first message.";
std::string result = event.ToString();
std::string expected = "data: This is the first message.\n\n";
EXPECT_EQ(result, expected);
} BUILD构建测试 cc_library(
name = "http_sse_event",
hdrs = ["http_sse_event.h"],
visibility = ["//visibility:public"],
deps = [
],
)
cc_test(
name = "http_sse_event_test",
srcs = ["http_sse_event_test.cc"],
deps = [
":http_sse_event",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
) |
SseParserSseParseer结构 static std::vector<SseEvent> Parse(const std::string& text) {
std::vector<SseEvent> events;
std::istringstream stream(text);
std::string line; 统一不同系统换行符 / Remove \\r if present (handle \\r\\n line endings)
if (!line.empty() && line.back() == '\\r') {
line.pop_back();
} 空行判断是否有效 if (line.empty()) {
if (!data_lines.empty() || !current_event.event_type.empty() || !current_event.id.empty() ||
current_event.retry.has_value()) {
// Join data lines with newlines
`if (!data_lines.empty()) {`
current_event.data = JoinDataLines(data_lines);
} 设置默认message if (current_event.event_type.empty()) {
current_event.event_type = "message";
} SSE格式判断核心 auto colon_pos = line.find(':');
if (colon_pos == std::string::npos) {
continue; // Skip malformed lines
} 处理解析出的字段值,去除值前的空格 if (!value.empty() && value[0] == ' ') {
value = value.substr(1);
}
if (field == "data") {
data_lines.push_back(value);
} else if (field == "event") {
current_event.event_type = value;
} else if (field == "id") {
current_event.id = value;
} else if (field == "retry") {
try {
current_event.retry = std::stoi(value);
} catch (const std::exception&) {
// Ignore invalid retry values
}
} |
在scql的开源实践中,采用trpc的设计方式添加explain 功能 // Copyright 2023 Ant Group Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
syntax = "proto3";
package scql.pb;
import "api/core.proto";
import "api/common.proto";
import "api/status.proto";
import "google/api/annotations.proto";
import "google/api/field_behavior.proto";
import "google/protobuf/empty.proto";
option go_package = "proto-gen/scql";
// SCDBService provides a collection of APIs,
// that client-user can connect to the SCQL system, execute queries and fetch
// results.
service SCDBService {
// Submit
//
// Asynchronous query interface.
// Submit the query (DDL/DCL/DQL) to SCQL, and return immediately.
// It will allocate a new `scdb_session_id` for the query, and set it in
// response.
rpc Submit(SCDBQueryRequest) returns (SCDBSubmitResponse) {
option (google.api.http) = {
post: "/public/submit_query"
body: "*"
};
}
// Fetch
//
// Fetch the result of the query submitted asynchronously.
// It will return `NOT_READY` status code if the query is still running.
rpc Fetch(SCDBFetchRequest) returns (SCDBQueryResultResponse) {
option (google.api.http) = {
post: "/public/fetch_result"
body: "*"
};
}
// SubmitAndGet
//
// The synchronous query interface allows users to submit a query,
// wait for it to finish, and get the query result in one RPC.
// This interface is suitable for executing fast queries,
// such as DDL, DCL, and simple DQL. However,
// if the query takes a long time to run, it may result in a timeout.
// Therefore, it is recommended to use the synchronous query API to run
// complex queries.
rpc SubmitAndGet(SCDBQueryRequest) returns (SCDBQueryResultResponse) {
option (google.api.http) = {
post: "/public/submit_and_get"
body: "*"
};
}
}
// SCDBQueryResultCallback defines an API that SCQL could use it to notify the
// caller when the query result is ready, either because it has finished
// successfully or an error has occurred.
service SCDBQueryResultCallback {
// ReportQueryResult reports the query result once the query job done.
rpc ReportQueryResult(SCDBQueryResultResponse)
returns (google.protobuf.Empty);
}
// SCDBQueryRequest designed for Client(Biz Service) which allow callback url
// and traceid
message SCDBQueryRequest {
RequestHeader header = 1;
// User information
SCDBCredential user = 2 [(google.api.field_behavior) = REQUIRED];
// SCQL query to be run.
string query = 3 [(google.api.field_behavior) = REQUIRED];
// Optional call back URL to report query result.
// If provided, it should implements the
// `SCDBQueryResultCallback.ReportQueryResult` method.
string query_result_callback_url = 4;
// Biz request id(trace_id provided by the biz client), which often be unique
// per biz action, e.g. can be value of order_id, transaction_id, etc.
string biz_request_id = 5;
// Current database name
string db_name = 6;
// Indicates whether to execute an EXPLAIN statement and return the execution graph in Graphviz DOT format.
bool explain = 7;
}
message SCDBSubmitResponse {
// Status of response
Status status = 1;
// Scdb session id
string scdb_session_id = 2;
}
message SCDBFetchRequest {
RequestHeader header = 1;
SCDBCredential user = 2 [(google.api.field_behavior) = REQUIRED];
// Scdb session id
string scdb_session_id = 3 [(google.api.field_behavior) = REQUIRED];
}
// SCDB query result representation (table view by columns).
message SCDBQueryResultResponse {
// Status of response
Status status = 1;
// Output columns.
repeated Tensor out_columns = 2;
// Scdb session id
string scdb_session_id = 3;
// The number of rows affected by a select into, update, insert, or delete
int64 affected_rows = 4;
// Warnings for the query
repeated SQLWarning warnings = 5;
}
message User {
enum AccountSystemType {
UNKNOWN = 0;
NATIVE_USER = 1;
}
message NativeUser {
// User name, e.g. "zhang_san"
string name = 1;
// Password, e.g. "123456"
string password = 2;
}
AccountSystemType account_system_type = 1;
oneof user {
NativeUser native_user = 2;
}
}
message SCDBCredential {
User user = 1;
} |
能够参与腾讯这样的顶级开源项目,对我来说是一次难得的机会。 我懂得了追求将自己负责的部分做到极致,之前参与的小公司实习或学校项目只追求效率,实现就可以;在腾讯的开源实践中,无论是社区导师,开源学长,竞争对手,都追求将自己的负责部分做到想能最好,完全理解 还进一步了解fiber纤程,c++并发编程;此前我只接触过go的并发编程和连接池,通过测试,发现c++并发性能稍优于golang 同时在SSE实现的issue中,我系统学习AI时代下的最佳通信实践,对计算机网络的理解进一步加深,之前背八股,优化通信速率方法,only想到缓存,合并,压缩这些传统方式;SSE通信方式是AI时代最新力作,让我们能平滑体验AI 特别感谢腾讯开源团队及贡献者,感谢提供宝贵的学习机会。 希望两年后能加入腾讯!基础微信团队是我的目标! |
基于tRPC框架电商场景,实现了抢购的功能;测试QPS和latency