Skip to content

Commit 0bcd204

Browse files
authored
fix:修复熔断规则atomic.Value使用存在panic问题 (polarismesh#191)
1 parent ca98b72 commit 0bcd204

File tree

16 files changed

+528
-70
lines changed

16 files changed

+528
-70
lines changed

api/config_file.go

+7
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@ package api
1919

2020
import "github.com/polarismesh/polaris-go/pkg/model"
2121

22+
type GetConfigFileRequest struct {
23+
*model.GetConfigFileRequest
24+
}
25+
2226
// ConfigFileAPI 配置文件的 API
2327
type ConfigFileAPI interface {
2428
SDKOwner
29+
// Deprecated: please use FetchConfigFile
2530
// GetConfigFile 获取配置文件
2631
GetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error)
32+
// FetchConfigFile 获取配置文件
33+
FetchConfigFile(*GetConfigFileRequest) (model.ConfigFile, error)
2734
// CreateConfigFile 创建配置文件
2835
CreateConfigFile(namespace, fileGroup, fileName, content string) error
2936
// UpdateConfigFile 更新配置文件

api/config_file_impl.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,17 @@ func newConfigFileAPIBySDKContext(context SDKContext) ConfigFileAPI {
5454

5555
// GetConfigFile 获取配置文件
5656
func (c *configFileAPI) GetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error) {
57-
return c.context.GetEngine().SyncGetConfigFile(namespace, fileGroup, fileName)
57+
return c.context.GetEngine().SyncGetConfigFile(&model.GetConfigFileRequest{
58+
Namespace: namespace,
59+
FileGroup: fileGroup,
60+
FileName: fileName,
61+
Subscribe: true,
62+
})
63+
}
64+
65+
// FetchConfigFile 获取配置文件
66+
func (c *configFileAPI) FetchConfigFile(req *GetConfigFileRequest) (model.ConfigFile, error) {
67+
return c.context.GetEngine().SyncGetConfigFile(req.GetConfigFileRequest)
5868
}
5969

6070
// CreateConfigFile 创建配置文件

pkg/flow/configuration/config_flow.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ func (c *ConfigFileFlow) Destroy() {
8787
}
8888

8989
// GetConfigFile 获取配置文件
90-
func (c *ConfigFileFlow) GetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error) {
90+
func (c *ConfigFileFlow) GetConfigFile(req *model.GetConfigFileRequest) (model.ConfigFile, error) {
9191
configFileMetadata := &model.DefaultConfigFileMetadata{
92-
Namespace: namespace,
93-
FileGroup: fileGroup,
94-
FileName: fileName,
92+
Namespace: req.Namespace,
93+
FileGroup: req.FileGroup,
94+
FileName: req.FileName,
9595
}
9696

9797
cacheKey := genCacheKeyByMetadata(configFileMetadata)
@@ -116,11 +116,13 @@ func (c *ConfigFileFlow) GetConfigFile(namespace, fileGroup, fileName string) (m
116116
if err != nil {
117117
return nil, err
118118
}
119-
c.addConfigFileToLongPollingPool(fileRepo)
120-
c.repos = append(c.repos, fileRepo)
121-
122119
configFile = newDefaultConfigFile(configFileMetadata, fileRepo)
123-
c.configFileCache[cacheKey] = configFile
120+
121+
if req.Subscribe {
122+
c.addConfigFileToLongPollingPool(fileRepo)
123+
c.repos = append(c.repos, fileRepo)
124+
c.configFileCache[cacheKey] = configFile
125+
}
124126
return configFile, nil
125127
}
126128

pkg/flow/sync_flow.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -594,8 +594,8 @@ func (e *Engine) realInitCalleeService(req *model.InitCalleeServiceRequest,
594594
}
595595

596596
// SyncGetConfigFile 同步获取配置文件
597-
func (e *Engine) SyncGetConfigFile(namespace, fileGroup, fileName string) (model.ConfigFile, error) {
598-
return e.configFlow.GetConfigFile(namespace, fileGroup, fileName)
597+
func (e *Engine) SyncGetConfigFile(req *model.GetConfigFileRequest) (model.ConfigFile, error) {
598+
return e.configFlow.GetConfigFile(req)
599599
}
600600

601601
// SyncGetConfigGroup 同步获取配置文件

pkg/model/config.go

+7
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,10 @@ type ConfigFileGroup interface {
117117
// AddChangeListener 增加配置文件变更监听器
118118
AddChangeListener(cb OnConfigGroupChange)
119119
}
120+
121+
type GetConfigFileRequest struct {
122+
Namespace string
123+
FileGroup string
124+
FileName string
125+
Subscribe bool
126+
}

pkg/model/engine.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ type Engine interface {
109109
// InitCalleeService 所需的被调初始化
110110
InitCalleeService(req *InitCalleeServiceRequest) error
111111
// SyncGetConfigFile 同步获取配置文件
112-
SyncGetConfigFile(namespace, fileGroup, fileName string) (ConfigFile, error)
112+
SyncGetConfigFile(req *GetConfigFileRequest) (ConfigFile, error)
113113
// SyncGetConfigGroup 同步获取配置文件
114114
SyncGetConfigGroup(namespace, fileGroup string) (ConfigFileGroup, error)
115115
// SyncCreateConfigFile 同步创建配置文件

pkg/plugin/healthcheck/healthcheck.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
type HealthChecker interface {
3232
plugin.Plugin
3333
// DetectInstance 对单个实例进行探测,返回探测结果, 每个探测方法自己去判断当前周期是否需要探测,如果无需探测,则返回nil
34-
DetectInstance(model.Instance) (DetectResult, error)
34+
DetectInstance(model.Instance, *fault_tolerance.FaultDetectRule) (DetectResult, error)
3535
// Protocol .
3636
Protocol() fault_tolerance.FaultDetectRule_Protocol
3737
}
@@ -57,9 +57,7 @@ type DetectResultImp struct {
5757
Success bool
5858
DetectTime time.Time // 探测时间
5959
DetectInstance model.Instance // 探测的实例
60-
delay time.Duration
61-
code string
62-
status model.RetStatus
60+
Code string
6361
}
6462

6563
// IsSuccess 探测类型,与探测插件名相同
@@ -79,16 +77,19 @@ func (r *DetectResultImp) GetDetectInstance() model.Instance {
7977

8078
// GetCode() return code
8179
func (r *DetectResultImp) GetCode() string {
82-
return r.code
80+
return r.Code
8381
}
8482

8583
// GetDelay
8684
func (r *DetectResultImp) GetDelay() time.Duration {
87-
return r.delay
85+
return time.Since(r.GetDetectTime())
8886
}
8987

9088
func (r *DetectResultImp) GetRetStatus() model.RetStatus {
91-
return r.status
89+
if r.IsSuccess() {
90+
return model.RetSuccess
91+
}
92+
return model.RetFail
9293
}
9394

9495
// init 初始化

pkg/plugin/healthcheck/proxy.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ func (p *Proxy) SetRealPlugin(plug plugin.Plugin, engine model.Engine) {
3838
}
3939

4040
// DetectInstance proxy HealthChecker DetectInstance
41-
func (p *Proxy) DetectInstance(inst model.Instance) (DetectResult, error) {
42-
result, err := p.HealthChecker.DetectInstance(inst)
41+
func (p *Proxy) DetectInstance(inst model.Instance, rule *fault_tolerance.FaultDetectRule) (DetectResult, error) {
42+
result, err := p.HealthChecker.DetectInstance(inst, rule)
4343
return result, err
4444
}
4545

pkg/plugin/register/plugins.go

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
_ "github.com/polarismesh/polaris-go/plugin/configfilter/crypto/aes"
3737
_ "github.com/polarismesh/polaris-go/plugin/healthcheck/http"
3838
_ "github.com/polarismesh/polaris-go/plugin/healthcheck/tcp"
39+
_ "github.com/polarismesh/polaris-go/plugin/healthcheck/udp"
3940
_ "github.com/polarismesh/polaris-go/plugin/loadbalancer/hash"
4041
_ "github.com/polarismesh/polaris-go/plugin/loadbalancer/maglev"
4142
_ "github.com/polarismesh/polaris-go/plugin/loadbalancer/ringhash"

plugin/circuitbreaker/composite/checker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (c *ResourceHealthChecker) doCheck(ins model.Instance, protocol fault_toler
191191
ins.GetHost(), ins.GetPort(), c.resource.String(), protocol.String())
192192
return false
193193
}
194-
ret, err := checker.DetectInstance(ins)
194+
ret, err := checker.DetectInstance(ins, rule)
195195
if err != nil {
196196
return false
197197
}

plugin/circuitbreaker/composite/counter.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,18 @@ func (rc *ResourceCounters) CurrentActiveRule() *fault_tolerance.CircuitBreakerR
122122
}
123123

124124
func (rc *ResourceCounters) updateCircuitBreakerStatus(status model.CircuitBreakerStatus) {
125-
rc.statusRef.Store(status)
125+
rc.statusRef.Store(&circuitBreakerStatusWrapper{
126+
val: status,
127+
})
126128
}
127129

128130
func (rc *ResourceCounters) CurrentCircuitBreakerStatus() model.CircuitBreakerStatus {
129131
val := rc.statusRef.Load()
130132
if val == nil {
131133
return nil
132134
}
133-
return val.(model.CircuitBreakerStatus)
135+
wrapper := val.(circuitBreakerStatusWrapper)
136+
return wrapper.val
134137
}
135138

136139
func (rc *ResourceCounters) CloseToOpen(breaker string) {
@@ -301,3 +304,7 @@ func buildFallbackInfo(rule *fault_tolerance.CircuitBreakerRule) *model.Fallback
301304
}
302305
return ret
303306
}
307+
308+
type circuitBreakerStatusWrapper struct {
309+
val model.CircuitBreakerStatus
310+
}

plugin/healthcheck/http/http.go

+74-38
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
package http
1919

2020
import (
21+
"bytes"
22+
"context"
23+
"fmt"
2124
"net/http"
22-
"net/url"
25+
"strconv"
26+
"strings"
2327
"time"
2428

2529
"github.com/polarismesh/specification/source/go/api/v1/fault_tolerance"
@@ -30,14 +34,18 @@ import (
3034
"github.com/polarismesh/polaris-go/pkg/plugin"
3135
"github.com/polarismesh/polaris-go/pkg/plugin/common"
3236
"github.com/polarismesh/polaris-go/pkg/plugin/healthcheck"
33-
"github.com/polarismesh/polaris-go/plugin/healthcheck/utils"
3437
)
3538

39+
type HttpSender interface {
40+
Do(req *http.Request) (*http.Response, error)
41+
}
42+
3643
// Detector TCP协议的实例健康探测器
3744
type Detector struct {
3845
*plugin.PluginBase
3946
cfg *Config
4047
timeout time.Duration
48+
client HttpSender
4149
}
4250

4351
// Type 插件类型
@@ -57,6 +65,7 @@ func (g *Detector) Init(ctx *plugin.InitContext) (err error) {
5765
if cfgValue != nil {
5866
g.cfg = cfgValue.(*Config)
5967
}
68+
g.client = &http.Client{}
6069
g.timeout = ctx.Config.GetConsumer().GetHealthCheck().GetTimeout()
6170
return nil
6271
}
@@ -67,15 +76,26 @@ func (g *Detector) Destroy() error {
6776
}
6877

6978
// DetectInstance 探测服务实例健康
70-
func (g *Detector) DetectInstance(ins model.Instance) (result healthcheck.DetectResult, err error) {
79+
func (g *Detector) DetectInstance(ins model.Instance, rule *fault_tolerance.FaultDetectRule) (result healthcheck.DetectResult, err error) {
7180
start := time.Now()
81+
timeout := g.timeout
82+
if rule != nil && rule.Protocol == fault_tolerance.FaultDetectRule_HTTP {
83+
timeout = time.Duration(rule.GetTimeout()) * time.Millisecond
84+
}
85+
86+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
87+
defer cancel()
7288
// 得到Http address
73-
address := utils.GetAddressByInstance(ins)
74-
success := g.doHttpDetect(address)
89+
detReq, err := g.generateHttpRequest(ctx, ins, rule)
90+
if err != nil {
91+
return nil, err
92+
}
93+
code, success := g.doHttpDetect(detReq, rule)
7594
result = &healthcheck.DetectResultImp{
7695
Success: success,
7796
DetectTime: start,
7897
DetectInstance: ins,
98+
Code: code,
7999
}
80100
return result, nil
81101
}
@@ -86,50 +106,66 @@ func (g *Detector) IsEnable(cfg config.Configuration) bool {
86106
}
87107

88108
// doHttpDetect 执行一次健康探测逻辑
89-
func (g *Detector) doHttpDetect(address string) bool {
90-
c := &http.Client{
91-
Timeout: g.timeout,
92-
}
93-
request := &http.Request{
94-
Method: http.MethodGet,
95-
URL: &url.URL{
96-
Scheme: "http",
97-
Host: address,
98-
Path: g.cfg.Path,
99-
},
100-
}
101-
header := http.Header{}
102-
if len(g.cfg.Host) > 0 {
103-
header.Add("Host", g.cfg.Host)
104-
}
105-
if len(g.cfg.RequestHeadersToAdd) > 0 {
106-
for _, requestHeader := range g.cfg.RequestHeadersToAdd {
107-
header.Add(requestHeader.Key, requestHeader.Value)
108-
}
109-
}
110-
if len(header) > 0 {
111-
request.Header = header
112-
}
113-
resp, err := c.Do(request)
109+
func (g *Detector) doHttpDetect(detReq *http.Request, rule *fault_tolerance.FaultDetectRule) (string, bool) {
110+
resp, err := g.client.Do(detReq)
114111
if err != nil {
115-
log.GetDetectLogger().Errorf("[HealthCheck][http] fail to check %s, err is %v", address, err)
116-
return false
112+
log.GetDetectLogger().Errorf("[HealthCheck][http] fail to check %+v, err is %v", detReq.URL, err)
113+
return "", false
117114
}
118115
defer resp.Body.Close()
119-
code := resp.StatusCode
120-
for _, statusCodeRange := range g.cfg.ExpectedStatuses {
121-
if code >= statusCodeRange.Start && code < statusCodeRange.End {
122-
return true
123-
}
116+
if code := resp.StatusCode; code >= 200 && code < 500 {
117+
return strconv.Itoa(resp.StatusCode), true
124118
}
125-
return false
119+
return strconv.Itoa(resp.StatusCode), false
126120
}
127121

128122
// Protocol .
129123
func (g *Detector) Protocol() fault_tolerance.FaultDetectRule_Protocol {
130124
return fault_tolerance.FaultDetectRule_HTTP
131125
}
132126

127+
func (g *Detector) generateHttpRequest(ctx context.Context, ins model.Instance, rule *fault_tolerance.FaultDetectRule) (*http.Request, error) {
128+
var (
129+
address string
130+
customUrl = g.cfg.Path
131+
port = ins.GetPort()
132+
)
133+
header := http.Header{}
134+
if rule == nil {
135+
customUrl = strings.TrimPrefix(customUrl, "/")
136+
if len(g.cfg.Host) > 0 {
137+
header.Add("Host", g.cfg.Host)
138+
}
139+
if len(g.cfg.RequestHeadersToAdd) > 0 {
140+
for _, requestHeader := range g.cfg.RequestHeadersToAdd {
141+
header.Add(requestHeader.Key, requestHeader.Value)
142+
}
143+
}
144+
} else {
145+
if rule.GetPort() > 0 {
146+
port = rule.Port
147+
}
148+
customUrl = rule.GetHttpConfig().GetUrl()
149+
customUrl = strings.TrimPrefix(customUrl, "/")
150+
ruleHeaders := rule.GetHttpConfig().GetHeaders()
151+
for i := range ruleHeaders {
152+
header.Add(ruleHeaders[i].Key, ruleHeaders[i].Value)
153+
}
154+
}
155+
address = fmt.Sprintf("http://%s:%d/%s", ins.GetHost(), port, customUrl)
156+
157+
request, err := http.NewRequestWithContext(ctx, rule.GetHttpConfig().Method, address, bytes.NewBufferString(rule.HttpConfig.GetBody()))
158+
if err != nil {
159+
log.GetDetectLogger().Errorf("[HealthCheck][http] fail to build request %+v, err is %v", address, err)
160+
return nil, err
161+
}
162+
163+
if len(header) > 0 {
164+
request.Header = header
165+
}
166+
return request, nil
167+
}
168+
133169
// init 注册插件信息
134170
func init() {
135171
plugin.RegisterConfigurablePlugin(&Detector{}, &Config{})

0 commit comments

Comments
 (0)