Skip to content

Commit 4f9a769

Browse files
committed
feat: 添加ReportClientHandler以及LocationProvider插件
1 parent ab8fd43 commit 4f9a769

File tree

19 files changed

+577
-45
lines changed

19 files changed

+577
-45
lines changed

pkg/config/api.go

+4
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ type APIConfig interface {
169169
GetRetryInterval() time.Duration
170170
// SetRetryInterval 设置api调用重试时间
171171
SetRetryInterval(time.Duration)
172+
// GetLocationProvider 获取地理位置的提供者插件名称
173+
GetLocationProvider() string
174+
// SetLocationProvider 设置地理位置的提供者插件名称
175+
SetLocationProvider(string)
172176
}
173177

174178
// StatReporterConfig 统计上报配置

pkg/config/default.go

+8
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ const (
127127
DefaultMapKeyValueSeparator = ":"
128128
// 默认Map组装str (key:value) 二元组分割符
129129
DefaultMapKVTupleSeparator = "|"
130+
// 默认实例地理位置提供者插件名称
131+
DefaultLocationProvider = ""
130132
)
131133

132134
// 默认埋点server的端口,与上面的IP一一对应
@@ -353,6 +355,9 @@ func (a *APIConfigImpl) Verify() error {
353355
if *a.RetryInterval < DefaultAPIRetryInterval {
354356
return fmt.Errorf("global.api.retryInterval must be greater than %v", DefaultAPIRetryInterval)
355357
}
358+
if len(a.LocationProvider) == 0 {
359+
return fmt.Errorf("global.api.locationProvider is empty")
360+
}
356361
return nil
357362
}
358363

@@ -373,6 +378,9 @@ func (a *APIConfigImpl) SetDefault() {
373378
if len(a.BindIP) > 0 {
374379
a.BindIPValue = a.BindIP
375380
}
381+
if len(a.LocationProvider) > 0 {
382+
a.LocationProvider = DefaultLocationProvider
383+
}
376384
}
377385

378386
// 检验globalConfig配置

pkg/config/impl.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,14 @@ func ServiceClusterToServiceKey(config ServerClusterConfig) model.ServiceKey {
247247

248248
// APIConfigImpl API访问相关的配置
249249
type APIConfigImpl struct {
250-
Timeout *time.Duration `yaml:"timeout" json:"timeout"`
251-
BindIntf string `yaml:"bindIf" json:"bindIf"`
252-
BindIP string `yaml:"bindIP" json:"bindIP"`
253-
BindIPValue string `yaml:"-" json:"-"`
254-
ReportInterval *time.Duration `yaml:"reportInterval" json:"reportInterval"`
255-
MaxRetryTimes int `yaml:"maxRetryTimes" json:"maxRetryTimes"`
256-
RetryInterval *time.Duration `yaml:"retryInterval" json:"retryInterval"`
250+
Timeout *time.Duration `yaml:"timeout" json:"timeout"`
251+
BindIntf string `yaml:"bindIf" json:"bindIf"`
252+
BindIP string `yaml:"bindIP" json:"bindIP"`
253+
BindIPValue string `yaml:"-" json:"-"`
254+
ReportInterval *time.Duration `yaml:"reportInterval" json:"reportInterval"`
255+
MaxRetryTimes int `yaml:"maxRetryTimes" json:"maxRetryTimes"`
256+
RetryInterval *time.Duration `yaml:"retryInterval" json:"retryInterval"`
257+
LocationProvider string `yaml:"locationProvider" json:"locationProvider"`
257258
}
258259

259260
// GetTimeout 默认调用超时时间
@@ -316,6 +317,16 @@ func (a *APIConfigImpl) SetRetryInterval(interval time.Duration) {
316317
a.RetryInterval = &interval
317318
}
318319

320+
// GetLocationProvider 获取地理位置的提供者插件名称
321+
func (a *APIConfigImpl) GetLocationProvider() string {
322+
return a.LocationProvider
323+
}
324+
325+
// SetLocationProvider 设置地理位置的提供者插件名称
326+
func (a *APIConfigImpl) SetLocationProvider(provider string) {
327+
a.LocationProvider = provider
328+
}
329+
319330
// NewDefaultConfiguration 创建默认配置对象
320331
func NewDefaultConfiguration(addresses []string) *ConfigurationImpl {
321332
cfg := &ConfigurationImpl{}

pkg/flow/data/util.go

+21
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/polarismesh/polaris-go/pkg/plugin/healthcheck"
3232
"github.com/polarismesh/polaris-go/pkg/plugin/loadbalancer"
3333
"github.com/polarismesh/polaris-go/pkg/plugin/localregistry"
34+
"github.com/polarismesh/polaris-go/pkg/plugin/reporthandler"
3435
"github.com/polarismesh/polaris-go/pkg/plugin/serverconnector"
3536
"github.com/polarismesh/polaris-go/pkg/plugin/servicerouter"
3637
"github.com/polarismesh/polaris-go/pkg/plugin/statreporter"
@@ -156,6 +157,26 @@ func GetLoadBalancerByLbType(lbType string, supplier plugin.Supplier) (loadbalan
156157
return targetPlugin.(loadbalancer.LoadBalancer), nil
157158
}
158159

160+
// GetReportChain 获取ReportClient处理链
161+
func GetReportChain(cfg config.Configuration, supplier plugin.Supplier) (*reporthandler.ReportHandlerChain, error) {
162+
chain := &reporthandler.ReportHandlerChain{
163+
Chain: make([]reporthandler.ReportHandler, 0),
164+
}
165+
166+
pluginNames := supplier.GetPluginsByType(common.TypeReportHandler)
167+
168+
for i := range pluginNames {
169+
name := pluginNames[i]
170+
p, err := supplier.GetPlugin(common.TypeReportHandler, name)
171+
if err != nil {
172+
return nil, err
173+
}
174+
chain.Chain = append(chain.Chain, p.(reporthandler.ReportHandler))
175+
}
176+
177+
return chain, nil
178+
}
179+
159180
// SingleInvoke 同步调用的通用方法定义
160181
type SingleInvoke func(request interface{}) (interface{}, error)
161182

pkg/flow/startup/client_report.go

+17-24
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
namingpb "github.com/polarismesh/polaris-go/pkg/model/pb/v1"
3030
"github.com/polarismesh/polaris-go/pkg/plugin"
3131
"github.com/polarismesh/polaris-go/pkg/plugin/localregistry"
32+
"github.com/polarismesh/polaris-go/pkg/plugin/reporthandler"
3233
"github.com/polarismesh/polaris-go/pkg/plugin/serverconnector"
3334
"github.com/polarismesh/polaris-go/pkg/version"
3435
)
@@ -44,6 +45,9 @@ func NewReportClientCallBack(
4445
if callback.registry, err = data.GetRegistry(cfg, supplier); err != nil {
4546
return nil, err
4647
}
48+
if callback.reportChain, err = data.GetReportChain(cfg, supplier); err != nil {
49+
return nil, err
50+
}
4751
callback.configuration = cfg
4852
callback.globalCtx = globalCtx
4953
callback.interval = cfg.GetGlobal().GetAPI().GetReportInterval()
@@ -58,6 +62,7 @@ type ReportClientCallBack struct {
5862
configuration config.Configuration
5963
globalCtx model.ValueContext
6064
interval time.Duration
65+
reportChain *reporthandler.ReportHandlerChain
6166
}
6267

6368
const (
@@ -74,12 +79,13 @@ func (r *ReportClientCallBack) loadLocalClientReportResult() {
7479
log.GetBaseLogger().Warnf("fail to load local region info from %s, err is %v", cachedFile, err)
7580
return
7681
}
77-
location := resp.GetClient().GetLocation()
78-
r.updateLocation(&model.Location{
79-
Region: location.GetRegion().GetValue(),
80-
Zone: location.GetZone().GetValue(),
81-
Campus: location.GetCampus().GetValue(),
82-
}, nil)
82+
83+
client := resp.Client
84+
85+
for i := range r.reportChain.Chain {
86+
handler := r.reportChain.Chain[i]
87+
handler.InitLocal(client)
88+
}
8389
}
8490

8591
// reportClientRequest 客户端上报的请求
@@ -113,31 +119,18 @@ func (r *ReportClientCallBack) Process(
113119
reportClientResp, err := r.connector.ReportClient(reportClientReq)
114120
if err != nil {
115121
log.GetBaseLogger().Errorf("report client info:%+v, error:%v", reportClientReq, err)
116-
r.updateLocation(nil, err.(model.SDKError))
117122
// 发生错误也要重试,直到获取到地域信息为止
118123
return model.CONTINUE
119124
}
120-
r.updateLocation(&model.Location{
121-
Region: reportClientResp.Region,
122-
Zone: reportClientResp.Zone,
123-
Campus: reportClientResp.Campus,
124-
}, nil)
125+
126+
for i := range r.reportChain.Chain {
127+
handler := r.reportChain.Chain[i]
128+
handler.HandleResponse(reportClientResp)
129+
}
125130
return model.CONTINUE
126131
}
127132

128133
// OnTaskEvent 任务事件回调
129134
func (r *ReportClientCallBack) OnTaskEvent(event model.TaskEvent) {
130135

131136
}
132-
133-
// updateLocation 更新区域属性
134-
func (r *ReportClientCallBack) updateLocation(location *model.Location, lastErr model.SDKError) {
135-
if nil != location {
136-
// 已获取到客户端的地域信息,更新到全局上下文
137-
log.GetBaseLogger().Infof("current client area info is {Region:%s, Zone:%s, Campus:%s}",
138-
location.Region, location.Zone, location.Campus)
139-
}
140-
if r.globalCtx.SetCurrentLocation(location, lastErr) {
141-
log.GetBaseLogger().Infof("client area info is ready")
142-
}
143-
}

pkg/flow/sync_flow.go

+6
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,12 @@ func (e *Engine) SyncRegister(instance *model.InstanceRegisterRequest) (*model.I
338338
// 方法开始时间
339339
startTime := e.globalCtx.Now()
340340
svcKey := model.ServiceKey{Namespace: instance.Namespace, Service: instance.Service}
341+
342+
// 如果注册请求没有设置 Location 信息,则由内部自动设置
343+
if instance.Location == nil {
344+
instance.Location = e.globalCtx.GetCurrentLocation().GetLocation()
345+
}
346+
341347
resp, err := data.RetrySyncCall("register", &svcKey, instance, func(request interface{}) (interface{}, error) {
342348
return e.connector.RegisterInstance(request.(*model.InstanceRegisterRequest))
343349
}, param)

pkg/model/service.go

+12
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,8 @@ type InstanceRegisterRequest struct {
11201120
// ttl超时时间,如果节点要调用heartbeat上报,则必须填写,否则会400141错误码,单位:秒
11211121
TTL *int
11221122

1123+
Location *Location
1124+
11231125
// 可选,单次查询超时时间,默认直接获取全局的超时配置
11241126
// 用户总最大超时时间为(1+RetryCount) * Timeout
11251127
Timeout *time.Duration
@@ -1157,6 +1159,11 @@ func (g *InstanceRegisterRequest) SetTTL(ttl int) {
11571159
g.TTL = &ttl
11581160
}
11591161

1162+
// SetLocation 设置服务实例的地理信息
1163+
func (g *InstanceRegisterRequest) SetLocation(loc *Location) {
1164+
g.Location = loc
1165+
}
1166+
11601167
// GetTimeoutPtr 获取超时值指针
11611168
func (g *InstanceRegisterRequest) GetTimeoutPtr() *time.Duration {
11621169
return g.Timeout
@@ -1167,6 +1174,11 @@ func (g *InstanceRegisterRequest) GetRetryCountPtr() *int {
11671174
return g.RetryCount
11681175
}
11691176

1177+
// GetLocation 获取实例的地址信息
1178+
func (g *InstanceRegisterRequest) GetLocation() *Location {
1179+
return g.Location
1180+
}
1181+
11701182
// validateMetadata 校验元数据的key是否为空
11711183
func validateMetadata(prefix string, metadata map[string]string) error {
11721184
if len(metadata) > 0 {

pkg/plugin/common/plugin.go

+20-12
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,27 @@ const (
5555
TypeRateLimiter Type = 0x1010
5656
// TypeSubScribe .
5757
TypeSubScribe Type = 0x1011
58+
// TypeLocationProvider 实例地理信息获取扩展点
59+
TypeLocationProvider Type = 0x1012
60+
// TypeReportHandler ReportClient 请求、响应处理器
61+
TypeReportHandler Type = 0x1013
5862
)
5963

6064
var typeToPresent = map[Type]string{
61-
TypePluginBase: "TypePluginBase",
62-
TypeServerConnector: "serverConnector",
63-
TypeLocalRegistry: "localRegistry",
64-
TypeServiceRouter: "serviceRouter",
65-
TypeLoadBalancer: "loadBalancer",
66-
TypeHealthCheck: "healthChecker",
67-
TypeCircuitBreaker: "circuitBreaker",
68-
TypeWeightAdjuster: "weightAdjuster",
69-
TypeStatReporter: "statReporter",
70-
TypeAlarmReporter: "alarmReporter",
71-
TypeRateLimiter: "rateLimiter",
72-
TypeSubScribe: "subScribe",
65+
TypePluginBase: "TypePluginBase",
66+
TypeServerConnector: "serverConnector",
67+
TypeLocalRegistry: "localRegistry",
68+
TypeServiceRouter: "serviceRouter",
69+
TypeLoadBalancer: "loadBalancer",
70+
TypeHealthCheck: "healthChecker",
71+
TypeCircuitBreaker: "circuitBreaker",
72+
TypeWeightAdjuster: "weightAdjuster",
73+
TypeStatReporter: "statReporter",
74+
TypeAlarmReporter: "alarmReporter",
75+
TypeRateLimiter: "rateLimiter",
76+
TypeSubScribe: "subScribe",
77+
TypeLocationProvider: "locationProvider",
78+
TypeReportHandler: "reportHandler",
7379
}
7480

7581
// ToString方法
@@ -235,4 +241,6 @@ var LoadedPluginTypes = []Type{
235241
TypeLocalRegistry,
236242
TypeRateLimiter,
237243
TypeSubScribe,
244+
TypeLocationProvider,
245+
TypeReportHandler,
238246
}

pkg/plugin/location/location.go

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Tencent is pleased to support the open source community by making polaris-go available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package location
19+
20+
import (
21+
"github.com/polarismesh/polaris-go/pkg/model"
22+
"github.com/polarismesh/polaris-go/pkg/plugin"
23+
"github.com/polarismesh/polaris-go/pkg/plugin/common"
24+
)
25+
26+
// Location 实例地址位置获取插件
27+
type LocationProvider interface {
28+
plugin.Plugin
29+
30+
// GetLocation 获取实例地理位置信息
31+
GetLocation() (*model.Location, error)
32+
}
33+
34+
// init 初始化
35+
func init() {
36+
plugin.RegisterPluginInterface(common.TypeLocationProvider, new(LocationProvider))
37+
}

pkg/plugin/location/proxy.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Tencent is pleased to support the open source community by making polaris-go available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package location
19+
20+
import (
21+
"github.com/polarismesh/polaris-go/pkg/model"
22+
"github.com/polarismesh/polaris-go/pkg/plugin"
23+
"github.com/polarismesh/polaris-go/pkg/plugin/common"
24+
)
25+
26+
type Proxy struct {
27+
LocationProvider
28+
engine model.Engine
29+
}
30+
31+
// SetRealPlugin 设置
32+
func (p *Proxy) SetRealPlugin(plug plugin.Plugin, engine model.Engine) {
33+
p.LocationProvider = plug.(LocationProvider)
34+
p.engine = engine
35+
}
36+
37+
// GetLocation 获取实例地理位置信息
38+
func (proxy *Proxy) GetLocation() (*model.Location, error) {
39+
return proxy.LocationProvider.GetLocation()
40+
}
41+
42+
// init 注册proxy
43+
func init() {
44+
plugin.RegisterPluginProxy(common.TypeLocationProvider, &Proxy{})
45+
}

pkg/plugin/register/plugins.go

+6
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,10 @@ import (
6161
_ "github.com/polarismesh/polaris-go/plugin/statreporter/serviceroute"
6262
_ "github.com/polarismesh/polaris-go/plugin/subscribe/localchannel"
6363
_ "github.com/polarismesh/polaris-go/plugin/weightadjuster/ratedelay"
64+
65+
// 注册 report 插件
66+
_ "github.com/polarismesh/polaris-go/plugin/reporthandler/location"
67+
68+
// 注册 location 地址插件
69+
_ "github.com/polarismesh/polaris-go/plugin/location/tencent"
6470
)

0 commit comments

Comments
 (0)