Skip to content

Commit 872be2a

Browse files
committed
Feat: Support circuit breaking for faulty nodes in direct and domain selector modes
1 parent bacbc06 commit 872be2a

39 files changed

+2424
-350
lines changed

trpc/common/config/BUILD

-28
Original file line numberDiff line numberDiff line change
@@ -288,34 +288,6 @@ cc_test(
288288
],
289289
)
290290

291-
cc_library(
292-
name = "domain_naming_conf",
293-
srcs = ["domain_naming_conf.cc"],
294-
hdrs = ["domain_naming_conf.h"],
295-
deps = [
296-
"//trpc/util/log:logging",
297-
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
298-
],
299-
)
300-
301-
cc_library(
302-
name = "domain_naming_conf_parser",
303-
hdrs = ["domain_naming_conf_parser.h"],
304-
deps = [
305-
":domain_naming_conf",
306-
],
307-
)
308-
309-
cc_test(
310-
name = "domain_naming_conf_test",
311-
srcs = ["domain_naming_conf_test.cc"],
312-
deps = [
313-
":domain_naming_conf",
314-
":domain_naming_conf_parser",
315-
"@com_google_googletest//:gtest_main",
316-
],
317-
)
318-
319291
cc_library(
320292
name = "local_file_provider_conf",
321293
srcs = ["local_file_provider_conf.cc"],

trpc/naming/BUILD

+2
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ cc_library(
173173
":selector_factory",
174174
"//trpc/filter:filter_manager",
175175
"//trpc/naming:load_balance_factory",
176+
"//trpc/naming/common/util/circuit_break:circuit_breaker_creator_factory",
177+
"//trpc/naming/common/util/circuit_break:default_circuit_breaker",
176178
"//trpc/naming/direct:direct_selector_filter",
177179
"//trpc/naming/direct:selector_direct",
178180
"//trpc/naming/domain:domain_selector_filter",
+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
licenses(["notice"])
2+
3+
package(default_visibility = ["//visibility:public"])
4+
5+
cc_library(
6+
name = "bucket_circular_array",
7+
srcs = [
8+
"bucket_circular_array.cc",
9+
],
10+
hdrs = [
11+
"bucket_circular_array.h",
12+
],
13+
visibility = [
14+
"//visibility:public",
15+
],
16+
deps = [
17+
"//trpc/util/log:logging",
18+
],
19+
)
20+
21+
cc_library(
22+
name = "circuit_break_whitelist",
23+
srcs = [
24+
"circuit_break_whitelist.cc",
25+
],
26+
hdrs = [
27+
"circuit_break_whitelist.h",
28+
],
29+
visibility = [
30+
"//visibility:public",
31+
],
32+
deps = [
33+
"//trpc/codec/trpc",
34+
],
35+
)
36+
37+
cc_test(
38+
name = "circuit_break_whitelist_test",
39+
srcs = [
40+
"circuit_break_whitelist_test.cc",
41+
],
42+
deps = [
43+
":circuit_break_whitelist",
44+
"@com_google_googletest//:gtest_main",
45+
],
46+
)
47+
48+
cc_library(
49+
name = "circuit_breaker_config",
50+
hdrs = [
51+
"circuit_breaker_config.h",
52+
],
53+
deps = [
54+
"//trpc/util/log:logging",
55+
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
56+
],
57+
)
58+
59+
cc_library(
60+
name = "default_circuit_breaker_config",
61+
hdrs = [
62+
"default_circuit_breaker_config.h",
63+
],
64+
visibility = [
65+
"//visibility:public",
66+
],
67+
deps = [
68+
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
69+
],
70+
)
71+
72+
cc_library(
73+
name = "default_circuit_breaker",
74+
srcs = [
75+
"default_circuit_breaker.cc",
76+
],
77+
hdrs = [
78+
"default_circuit_breaker.h",
79+
],
80+
visibility = [
81+
"//visibility:public",
82+
],
83+
deps = [
84+
":bucket_circular_array",
85+
":circuit_breaker",
86+
":default_circuit_breaker_config",
87+
"//trpc/util/log:logging",
88+
],
89+
)
90+
91+
cc_library(
92+
name = "circuit_breaker",
93+
srcs = [],
94+
hdrs = [
95+
"circuit_breaker.h",
96+
],
97+
visibility = [
98+
"//visibility:public",
99+
],
100+
deps = [
101+
"//trpc/naming/common:common_defs",
102+
],
103+
)
104+
105+
cc_library(
106+
name = "circuit_breaker_creator_factory",
107+
srcs = [],
108+
hdrs = [
109+
"circuit_breaker_creator_factory.h",
110+
],
111+
visibility = [
112+
"//visibility:public",
113+
],
114+
deps = [
115+
":circuit_breaker",
116+
"@com_github_jbeder_yaml_cpp//:yaml-cpp",
117+
],
118+
)
119+
120+
cc_test(
121+
name = "default_circuit_breaker_test",
122+
srcs = ["default_circuit_breaker_test.cc"],
123+
deps = [
124+
":default_circuit_breaker",
125+
"//trpc/util:time",
126+
"@com_google_googletest//:gtest",
127+
"@com_google_googletest//:gtest_main",
128+
],
129+
)
130+
131+
cc_test(
132+
name = "default_circuit_beaker_config_test",
133+
srcs = ["default_circuit_beaker_config_test.cc"],
134+
deps = [
135+
":default_circuit_breaker_config",
136+
"@com_google_googletest//:gtest",
137+
"@com_google_googletest//:gtest_main",
138+
],
139+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//
2+
//
3+
// Tencent is pleased to support the open source community by making tRPC available.
4+
//
5+
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
6+
// All rights reserved.
7+
//
8+
// If you have downloaded a copy of the tRPC source code from Tencent,
9+
// please note that tRPC source code is licensed under the Apache 2.0 License,
10+
// A copy of the Apache 2.0 License is included in this file.
11+
//
12+
//
13+
14+
#include "trpc/naming/common/util/circuit_break/bucket_circular_array.h"
15+
16+
#include "trpc/util/log/logging.h"
17+
18+
namespace trpc::naming {
19+
20+
BucketCircularArray::BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num)
21+
: buckets_num_(buckets_num), stat_window_ms_per_bucket_(stat_window_ms / buckets_num), buckets_(buckets_num) {
22+
for (auto& bucket : buckets_) {
23+
bucket.bucket_time.store(0, std::memory_order_relaxed);
24+
bucket.total_count.store(0, std::memory_order_relaxed);
25+
bucket.error_count.store(0, std::memory_order_relaxed);
26+
}
27+
}
28+
29+
void BucketCircularArray::AddMetrics(uint64_t current_ms, bool success) {
30+
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
31+
int bucket_index = bucket_time % buckets_num_;
32+
auto& bucket = buckets_[bucket_index];
33+
// If it is data from the previous round, reset the data for that window.
34+
uint64_t store_bucket_time = bucket.bucket_time;
35+
if (bucket_time != store_bucket_time) {
36+
if (bucket.bucket_time.compare_exchange_weak(store_bucket_time, bucket_time, std::memory_order_relaxed)) {
37+
bucket.total_count.store(0, std::memory_order_relaxed);
38+
bucket.error_count.store(0, std::memory_order_relaxed);
39+
}
40+
}
41+
42+
bucket.total_count.fetch_add(1, std::memory_order_relaxed);
43+
if (!success) {
44+
bucket.error_count.fetch_add(1, std::memory_order_relaxed);
45+
}
46+
}
47+
48+
void BucketCircularArray::ClearMetrics() {
49+
// Since the time of data is checked when adding metrics, here we only need to reset the bucket_time.
50+
for (auto& bucket : buckets_) {
51+
bucket.bucket_time = 0;
52+
}
53+
}
54+
55+
float BucketCircularArray::GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold) {
56+
uint64_t bucket_time = current_ms / stat_window_ms_per_bucket_;
57+
uint64_t error_count = 0;
58+
uint64_t total_count = 0;
59+
for (auto& bucket : buckets_) {
60+
// Only collect data from the most recent round.
61+
if (bucket.bucket_time.load(std::memory_order_relaxed) > (bucket_time - buckets_num_)) {
62+
total_count += bucket.total_count.load(std::memory_order_relaxed);
63+
error_count += bucket.error_count.load(std::memory_order_relaxed);
64+
}
65+
}
66+
67+
if (total_count >= request_volume_threshold) {
68+
return static_cast<float>(error_count) / total_count;
69+
}
70+
71+
// If the minimum number of requests is not reached, return a failure rate of 0.
72+
return 0;
73+
}
74+
75+
} // namespace trpc::naming
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
2+
//
3+
//
4+
// Tencent is pleased to support the open source community by making tRPC available.
5+
//
6+
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
7+
// All rights reserved.
8+
//
9+
// If you have downloaded a copy of the tRPC source code from Tencent,
10+
// please note that tRPC source code is licensed under the Apache 2.0 License,
11+
// A copy of the Apache 2.0 License is included in this file.
12+
//
13+
//
14+
15+
#pragma once
16+
17+
#include <atomic>
18+
#include <cstdint>
19+
#include <vector>
20+
21+
namespace trpc::naming {
22+
23+
/// @brief A thread-safe class for tracking invocation statistics using a sliding window implementation.
24+
class BucketCircularArray {
25+
public:
26+
/// @brief Construct a bucket circular array
27+
/// @note It is necessary to ensure that stat_window_ms is divisible by buckets_num.
28+
BucketCircularArray(uint32_t stat_window_ms, uint32_t buckets_num);
29+
30+
/// @brief Add statistical data
31+
void AddMetrics(uint64_t current_ms, bool success);
32+
33+
/// @brief Clear statistical data
34+
void ClearMetrics();
35+
36+
/// @brief Retrieve the failure rate within the statistical time period
37+
float GetErrorRate(uint64_t current_ms, uint32_t request_volume_threshold);
38+
39+
private:
40+
struct Metrics {
41+
std::atomic<uint64_t> bucket_time{0}; // The start time of the bucket
42+
std::atomic<uint32_t> total_count{0}; // The request count during current time period
43+
std::atomic<uint32_t> error_count{0}; // The error count during current time period
44+
};
45+
46+
uint32_t buckets_num_;
47+
uint32_t stat_window_ms_per_bucket_;
48+
std::vector<Metrics> buckets_;
49+
};
50+
51+
} // namespace trpc::naming
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//
2+
//
3+
// Tencent is pleased to support the open source community by making tRPC available.
4+
//
5+
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
6+
// All rights reserved.
7+
//
8+
// If you have downloaded a copy of the tRPC source code from Tencent,
9+
// please note that tRPC source code is licensed under the Apache 2.0 License,
10+
// A copy of the Apache 2.0 License is included in this file.
11+
//
12+
//
13+
14+
#include "trpc/naming/common/util/circuit_break/circuit_break_whitelist.h"
15+
16+
#include "trpc/codec/trpc/trpc.pb.h"
17+
18+
namespace trpc::naming {
19+
20+
CircuitBreakWhiteList::CircuitBreakWhiteList() {
21+
// Add default error code to whitelist
22+
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_OVERLOAD_ERR);
23+
circuitbreak_whitelist_.Writer().insert(TrpcRetCode::TRPC_SERVER_LIMITED_ERR);
24+
circuitbreak_whitelist_.Swap();
25+
}
26+
27+
void CircuitBreakWhiteList::SetCircuitBreakWhiteList(const std::vector<int>& retcodes) {
28+
auto& writer = circuitbreak_whitelist_.Writer();
29+
writer.clear();
30+
writer.insert(retcodes.begin(), retcodes.end());
31+
circuitbreak_whitelist_.Swap();
32+
}
33+
34+
bool CircuitBreakWhiteList::Contains(int retcode) {
35+
auto& reader = circuitbreak_whitelist_.Reader();
36+
if (reader.find(retcode) != reader.end()) {
37+
return true;
38+
}
39+
40+
return false;
41+
}
42+
43+
} // namespace trpc::naming

0 commit comments

Comments
 (0)