Skip to content

Commit 554c4c6

Browse files
feat: eureka注册发现支持命名空间隔离 (polarismesh#1068)
1 parent 3d5090a commit 554c4c6

11 files changed

+408
-114
lines changed

apiserver/eurekaserver/access.go

+82-50
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ import (
3131
)
3232

3333
const (
34-
ParamAppId string = "appId"
35-
ParamInstId string = "instId"
36-
ParamValue string = "value"
37-
ParamVip string = "vipAddress"
38-
ParamSVip string = "svipAddress"
34+
ParamAppId string = "appId"
35+
ParamInstId string = "instId"
36+
ParamValue string = "value"
37+
ParamVip string = "vipAddress"
38+
ParamSVip string = "svipAddress"
39+
HeaderNamespace string = "x-namespace"
3940
)
4041

4142
// GetEurekaServer eureka web server
@@ -104,7 +105,7 @@ func (h *EurekaServer) addDiscoverAccess(ws *restful.WebService) {
104105
ws.Route(ws.GET(fmt.Sprintf("/instances/{%s}", ParamInstId)).To(h.GetInstance)).
105106
Param(ws.PathParameter(ParamInstId, "instanceId").DataType("string"))
106107
// Update metadata
107-
ws.Route(ws.GET(fmt.Sprintf("/apps/{%s}/{%s}/metadata", ParamAppId, ParamInstId)).To(h.UpdateMetadata)).
108+
ws.Route(ws.PUT(fmt.Sprintf("/apps/{%s}/{%s}/metadata", ParamAppId, ParamInstId)).To(h.UpdateMetadata)).
108109
Param(ws.PathParameter(ParamAppId, "applicationId").DataType("string")).
109110
Param(ws.PathParameter(ParamInstId, "instanceId").DataType("string"))
110111
// Query for all instances under a particular vip address
@@ -131,7 +132,8 @@ func parseAcceptValue(acceptValue string) map[string]bool {
131132

132133
// GetAllApplications 全量拉取服务实例信息
133134
func (h *EurekaServer) GetAllApplications(req *restful.Request, rsp *restful.Response) {
134-
appsRespCache := h.worker.GetCachedAppsWithLoad()
135+
namespace := readNamespaceFromRequest(req, h.namespace)
136+
appsRespCache := h.workers.Get(namespace).GetCachedAppsWithLoad()
135137
remoteAddr := req.Request.RemoteAddr
136138
acceptValue := getParamFromEurekaRequestHeader(req, restful.HEADER_Accept)
137139
if err := writeResponse(parseAcceptValue(acceptValue), appsRespCache, req, rsp); nil != err {
@@ -148,7 +150,8 @@ func (h *EurekaServer) GetApplication(req *restful.Request, rsp *restful.Respons
148150
appId := readAppIdFromRequest(req)
149151

150152
remoteAddr := req.Request.RemoteAddr
151-
appsRespCache := h.worker.GetCachedAppsWithLoad()
153+
namespace := readNamespaceFromRequest(req, h.namespace)
154+
appsRespCache := h.workers.Get(namespace).GetCachedAppsWithLoad()
152155
apps := appsRespCache.AppsResp.Applications
153156
app := apps.GetApplication(appId)
154157
if app == nil {
@@ -190,7 +193,8 @@ func (h *EurekaServer) GetAppInstance(req *restful.Request, rsp *restful.Respons
190193
writeHeader(http.StatusBadRequest, rsp)
191194
return
192195
}
193-
appsRespCache := h.worker.GetCachedAppsWithLoad()
196+
namespace := readNamespaceFromRequest(req, h.namespace)
197+
appsRespCache := h.workers.Get(namespace).GetCachedAppsWithLoad()
194198
apps := appsRespCache.AppsResp.Applications
195199
app := apps.GetApplication(appId)
196200
if app == nil {
@@ -271,13 +275,15 @@ func writeResponse(acceptValues map[string]bool, appsRespCache *ApplicationsResp
271275

272276
// GetDeltaApplications 增量拉取服务实例信息
273277
func (h *EurekaServer) GetDeltaApplications(req *restful.Request, rsp *restful.Response) {
274-
appsRespCache := h.worker.GetDeltaApps()
278+
namespace := readNamespaceFromRequest(req, h.namespace)
279+
work := h.workers.Get(namespace)
280+
appsRespCache := work.GetDeltaApps()
275281
if nil == appsRespCache {
276-
ctx := h.worker.StartWorker()
282+
ctx := work.StartWorker()
277283
if nil != ctx {
278284
<-ctx.Done()
279285
}
280-
appsRespCache = h.worker.GetDeltaApps()
286+
appsRespCache = work.GetDeltaApps()
281287
}
282288
remoteAddr := req.Request.RemoteAddr
283289
acceptValue := getParamFromEurekaRequestHeader(req, restful.HEADER_Accept)
@@ -378,18 +384,24 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re
378384

379385
ctx := context.WithValue(context.Background(), utils.ContextAuthTokenKey, token)
380386

381-
log.Infof("[EUREKA-SERVER]received instance register request, client: %s, instId: %s, appId: %s, ipAddr: %s",
382-
remoteAddr, registrationRequest.Instance.InstanceId, appId, registrationRequest.Instance.IpAddr)
383-
code := h.registerInstances(ctx, appId, registrationRequest.Instance, false)
387+
namespace := readNamespaceFromRequest(req, h.namespace)
388+
log.Infof(
389+
"[EUREKA-SERVER]received instance register request, "+
390+
"client: %s, namespace: %s, instId: %s, appId: %s, ipAddr: %s",
391+
remoteAddr, namespace, registrationRequest.Instance.InstanceId, appId, registrationRequest.Instance.IpAddr)
392+
code := h.registerInstances(ctx, namespace, appId, registrationRequest.Instance, false)
384393
if code == api.ExecuteSuccess || code == api.ExistedResource || code == api.SameInstanceRequest {
385-
log.Infof("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been registered successfully, code is %d",
386-
registrationRequest.Instance.InstanceId, appId, code)
394+
log.Infof(
395+
"[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been registered successfully,"+
396+
" code is %d",
397+
namespace, registrationRequest.Instance.InstanceId, appId, code)
387398
writePolarisStatusCode(req, code)
388399
writeHeader(http.StatusNoContent, rsp)
389400
return
390401
}
391-
log.Errorf("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been registered failed, code is %d",
392-
registrationRequest.Instance.InstanceId, appId, code)
402+
log.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been registered failed, "+
403+
"code is %d",
404+
namespace, registrationRequest.Instance.InstanceId, appId, code)
393405
writePolarisStatusCode(req, code)
394406
writeHeader(int(code/1000), rsp)
395407
}
@@ -414,23 +426,27 @@ func (h *EurekaServer) UpdateStatus(req *restful.Request, rsp *restful.Response)
414426
return
415427
}
416428
status := req.QueryParameter(ParamValue)
417-
log.Infof("[EUREKA-SERVER]received instance updateStatus request, client: %s, instId: %s, appId: %s, status: %s",
418-
remoteAddr, instId, appId, status)
429+
namespace := readNamespaceFromRequest(req, h.namespace)
430+
log.Infof("[EUREKA-SERVER]received instance updateStatus request, "+
431+
"client: %s, namespace: %s, instId: %s, appId: %s, status: %s",
432+
remoteAddr, namespace, instId, appId, status)
419433
// check status
420434
if status == StatusUnknown {
421435
writePolarisStatusCode(req, api.ExecuteSuccess)
422436
writeHeader(http.StatusOK, rsp)
423437
return
424438
}
425-
code := h.updateStatus(context.Background(), appId, instId, status, false)
439+
code := h.updateStatus(context.Background(), namespace, appId, instId, status, false)
426440
writePolarisStatusCode(req, code)
427441
if code == api.ExecuteSuccess {
428-
log.Infof("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been updated successfully", instId, appId)
442+
log.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been updated successfully",
443+
namespace, instId, appId)
429444
writeHeader(http.StatusOK, rsp)
430445
return
431446
}
432-
log.Errorf("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been updated failed, code is %d",
433-
instId, appId, code)
447+
log.Errorf("[EUREKA-SERVER]instance ((namespace=%s, instId=%s, appId=%s) has been updated failed, "+
448+
"code is %d",
449+
namespace, instId, appId, code)
434450
if code == api.NotFoundResource {
435451
writeHeader(http.StatusNotFound, rsp)
436452
return
@@ -458,19 +474,24 @@ func (h *EurekaServer) DeleteStatus(req *restful.Request, rsp *restful.Response)
458474
return
459475
}
460476

461-
log.Infof("[EUREKA-SERVER]received instance status delete request, client: %s, instId=%s, appId=%s",
462-
remoteAddr, instId, appId)
477+
namespace := readNamespaceFromRequest(req, h.namespace)
463478

464-
code := h.updateStatus(context.Background(), appId, instId, StatusUp, false)
479+
log.Infof("[EUREKA-SERVER]received instance status delete request, "+
480+
"client: %s,namespace=%s, instId=%s, appId=%s",
481+
remoteAddr, namespace, instId, appId)
482+
483+
code := h.updateStatus(context.Background(), namespace, appId, instId, StatusUp, false)
465484
writePolarisStatusCode(req, code)
466485
if code == api.ExecuteSuccess {
467-
log.Infof("[EUREKA-SERVER]instance status (instId=%s, appId=%s) has been deleted successfully",
468-
instId, appId)
486+
log.Infof("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+
487+
"has been deleted successfully",
488+
namespace, instId, appId)
469489
writeHeader(http.StatusOK, rsp)
470490
return
471491
}
472-
log.Errorf("[EUREKA-SERVER]instance status (instId=%s, appId=%s) has been deleted failed, code is %d",
473-
instId, appId, code)
492+
log.Errorf("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+
493+
"has been deleted failed, code is %d",
494+
namespace, instId, appId, code)
474495
if code == api.NotFoundResource {
475496
writeHeader(http.StatusNotFound, rsp)
476497
return
@@ -497,14 +518,15 @@ func (h *EurekaServer) RenewInstance(req *restful.Request, rsp *restful.Response
497518
writeHeader(http.StatusBadRequest, rsp)
498519
return
499520
}
500-
code := h.renew(context.Background(), appId, instId, false)
521+
namespace := readNamespaceFromRequest(req, h.namespace)
522+
code := h.renew(context.Background(), namespace, appId, instId, false)
501523
writePolarisStatusCode(req, code)
502524
if code == api.ExecuteSuccess || code == api.HeartbeatExceedLimit {
503525
writeHeader(http.StatusOK, rsp)
504526
return
505527
}
506-
log.Errorf("[EUREKA-SERVER]instance (instId=%s, appId=%s) heartbeat failed, code is %d",
507-
instId, appId, code)
528+
log.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) heartbeat failed, code is %d",
529+
namespace, instId, appId, code)
508530
if code == api.NotFoundResource {
509531
writeHeader(http.StatusNotFound, rsp)
510532
return
@@ -531,18 +553,22 @@ func (h *EurekaServer) CancelInstance(req *restful.Request, rsp *restful.Respons
531553
writeHeader(http.StatusBadRequest, rsp)
532554
return
533555
}
534-
log.Infof("[EUREKA-SERVER]received instance deregistered request, client: %s, instId: %s, appId: %s",
535-
remoteAddr, instId, appId)
536-
code := h.deregisterInstance(context.Background(), appId, instId, false)
556+
namespace := readNamespaceFromRequest(req, h.namespace)
557+
log.Infof("[EUREKA-SERVER]received instance deregistered request, "+
558+
"client: %s, namespace: %s, instId: %s, appId: %s",
559+
remoteAddr, namespace, instId, appId)
560+
code := h.deregisterInstance(context.Background(), namespace, appId, instId, false)
537561
writePolarisStatusCode(req, code)
538562
if code == api.ExecuteSuccess || code == api.NotFoundResource || code == api.SameInstanceRequest {
539563
writeHeader(http.StatusOK, rsp)
540-
log.Infof("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been deregistered successfully, code is %d",
541-
instId, appId, code)
564+
log.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) "+
565+
"has been deregistered successfully, code is %d",
566+
namespace, instId, appId, code)
542567
return
543568
}
544-
log.Errorf("[EUREKA-SERVER]instance (instId=%s, appId=%s) has been deregistered failed, code is %d",
545-
instId, appId, code)
569+
log.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been deregistered failed,"+
570+
" code is %d",
571+
namespace, instId, appId, code)
546572
writeHeader(int(code/1000), rsp)
547573
}
548574

@@ -557,7 +583,8 @@ func (h *EurekaServer) GetInstance(req *restful.Request, rsp *restful.Response)
557583
writeHeader(http.StatusBadRequest, rsp)
558584
return
559585
}
560-
appsRespCache := h.worker.GetCachedAppsWithLoad()
586+
namespace := readNamespaceFromRequest(req, h.namespace)
587+
appsRespCache := h.workers.Get(namespace).GetCachedAppsWithLoad()
561588
apps := appsRespCache.AppsResp.Applications
562589
instance := apps.GetInstance(instId)
563590
if nil == instance {
@@ -596,6 +623,7 @@ func (h *EurekaServer) UpdateMetadata(req *restful.Request, rsp *restful.Respons
596623
writeHeader(http.StatusBadRequest, rsp)
597624
return
598625
}
626+
namespace := readNamespaceFromRequest(req, h.namespace)
599627
queryValues := req.Request.URL.Query()
600628
metadataMap := make(map[string]string, len(queryValues))
601629
for key, values := range queryValues {
@@ -605,16 +633,17 @@ func (h *EurekaServer) UpdateMetadata(req *restful.Request, rsp *restful.Respons
605633
}
606634
metadataMap[key] = values[0]
607635
}
608-
code := h.updateMetadata(context.Background(), appId, instId, metadataMap)
636+
code := h.updateMetadata(context.Background(), namespace, appId, instId, metadataMap)
609637
writePolarisStatusCode(req, code)
610638
if code == api.ExecuteSuccess {
611-
log.Infof("[EUREKA-SERVER]instance metadata (instId=%s, appId=%s) has been updated successfully",
612-
instId, appId)
639+
log.Infof("[EUREKA-SERVER]instance metadata (namespace=%s, instId=%s, appId=%s) has been updated successfully",
640+
namespace, instId, appId)
613641
writeHeader(http.StatusOK, rsp)
614642
return
615643
}
616-
log.Errorf("[EUREKA-SERVER]instance metadata (instId=%s, appId=%s) has been updated failed, code is %d",
617-
instId, appId, code)
644+
log.Errorf("[EUREKA-SERVER]instance metadata (namespace=%s, instId=%s, appId=%s) has been updated failed, "+
645+
"code is %d",
646+
namespace, instId, appId, code)
618647
if code == api.NotFoundResource {
619648
writeHeader(http.StatusNotFound, rsp)
620649
return
@@ -633,7 +662,9 @@ func (h *EurekaServer) QueryByVipAddress(req *restful.Request, rsp *restful.Resp
633662
writeHeader(http.StatusBadRequest, rsp)
634663
return
635664
}
636-
appsRespCache := h.worker.GetVipApps(VipCacheKey{
665+
666+
namespace := readNamespaceFromRequest(req, h.namespace)
667+
appsRespCache := h.workers.Get(namespace).GetVipApps(VipCacheKey{
637668
entityType: entityTypeVip,
638669
targetVipAddress: formatReadName(vipAddress),
639670
})
@@ -654,7 +685,8 @@ func (h *EurekaServer) QueryBySVipAddress(req *restful.Request, rsp *restful.Res
654685
writeHeader(http.StatusBadRequest, rsp)
655686
return
656687
}
657-
appsRespCache := h.worker.GetVipApps(VipCacheKey{
688+
namespace := readNamespaceFromRequest(req, h.namespace)
689+
appsRespCache := h.workers.Get(namespace).GetVipApps(VipCacheKey{
658690
entityType: entityTypeSVip,
659691
targetVipAddress: formatReadName(vipAddress),
660692
})

apiserver/eurekaserver/access_test.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ func batchBuildInstances(appId string, host string, port int, lease *LeaseInfo,
7575
return instances
7676
}
7777

78-
func batchCreateInstance(t *testing.T, eurekaSvr *EurekaServer, instances []*InstanceInfo) {
78+
func batchCreateInstance(t *testing.T, eurekaSvr *EurekaServer, namespace string, instances []*InstanceInfo) {
7979
for _, instance := range instances {
80-
code := eurekaSvr.registerInstances(context.Background(), instance.AppName, instance, false)
80+
code := eurekaSvr.registerInstances(context.Background(), namespace, instance.AppName, instance, false)
8181
assert.Equal(t, api.ExecuteSuccess, code)
8282
}
8383
}
@@ -151,9 +151,10 @@ func TestCreateInstance(t *testing.T) {
151151
options := map[string]interface{}{optionRefreshInterval: 5, optionDeltaExpireInterval: 120}
152152
eurekaSrv, err := createEurekaServerForTest(discoverSuit, options)
153153
assert.Nil(t, err)
154-
eurekaSrv.worker = NewApplicationsWorker(eurekaSrv.refreshInterval, eurekaSrv.deltaExpireInterval,
154+
eurekaSrv.workers = NewApplicationsWorkers(eurekaSrv.refreshInterval, eurekaSrv.deltaExpireInterval,
155155
eurekaSrv.enableSelfPreservation, eurekaSrv.namingServer, eurekaSrv.healthCheckServer, eurekaSrv.namespace)
156156

157+
namespace := "default"
157158
appId := "TESTAPP"
158159
startPort := 8900
159160
host := "127.0.1.1"
@@ -162,10 +163,13 @@ func TestCreateInstance(t *testing.T) {
162163
RenewalIntervalInSecs: 30,
163164
DurationInSecs: 120,
164165
}, total)
165-
batchCreateInstance(t, eurekaSrv, instances)
166+
batchCreateInstance(t, eurekaSrv, namespace, instances)
166167

167168
time.Sleep(10 * time.Second)
168-
httpRequest := &http.Request{Header: map[string][]string{restful.HEADER_Accept: []string{restful.MIME_JSON}}}
169+
httpRequest := &http.Request{Header: map[string][]string{
170+
restful.HEADER_Accept: {restful.MIME_JSON},
171+
HeaderNamespace: {namespace},
172+
}}
169173
req := restful.NewRequest(httpRequest)
170174
mockWriter := newMockResponseWriter()
171175
resp := &restful.Response{ResponseWriter: mockWriter}
@@ -180,7 +184,7 @@ func TestCreateInstance(t *testing.T) {
180184

181185
time.Sleep(5 * time.Second)
182186
instanceId := fmt.Sprintf("%s_%s_%d", appId, host, startPort)
183-
code := eurekaSrv.deregisterInstance(context.Background(), appId, instanceId, false)
187+
code := eurekaSrv.deregisterInstance(context.Background(), namespace, appId, instanceId, false)
184188
assert.Equal(t, api.ExecuteSuccess, code)
185189
time.Sleep(20 * time.Second)
186190

apiserver/eurekaserver/delta_worker.go

+59
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,65 @@ func (l *Lease) Expired(curTimeSec int64, deltaExpireInterval time.Duration) boo
4545
return curTimeSec-l.lastUpdateTimeSec >= deltaExpireInterval.Milliseconds()/1000
4646
}
4747

48+
type ApplicationsWorkers struct {
49+
interval time.Duration
50+
deltaExpireInterval time.Duration
51+
enableSelfPreservation bool
52+
namingServer service.DiscoverServer
53+
healthCheckServer *healthcheck.Server
54+
workers map[string]*ApplicationsWorker
55+
rwMutex *sync.RWMutex
56+
}
57+
58+
func NewApplicationsWorkers(interval time.Duration,
59+
deltaExpireInterval time.Duration, enableSelfPreservation bool,
60+
namingServer service.DiscoverServer, healthCheckServer *healthcheck.Server,
61+
namespaces ...string) *ApplicationsWorkers {
62+
workers := make(map[string]*ApplicationsWorker)
63+
for _, namespace := range namespaces {
64+
work := NewApplicationsWorker(interval, deltaExpireInterval, enableSelfPreservation,
65+
namingServer, healthCheckServer, namespace)
66+
workers[namespace] = work
67+
}
68+
return &ApplicationsWorkers{
69+
interval: interval,
70+
deltaExpireInterval: deltaExpireInterval,
71+
enableSelfPreservation: enableSelfPreservation,
72+
namingServer: namingServer,
73+
healthCheckServer: healthCheckServer,
74+
workers: workers,
75+
rwMutex: &sync.RWMutex{},
76+
}
77+
}
78+
79+
func (a *ApplicationsWorkers) Get(namespace string) *ApplicationsWorker {
80+
a.rwMutex.RLock()
81+
work, exist := a.workers[namespace]
82+
a.rwMutex.RUnlock()
83+
if exist {
84+
return work
85+
}
86+
a.rwMutex.Lock()
87+
defer a.rwMutex.Unlock()
88+
89+
work, exist = a.workers[namespace]
90+
if exist {
91+
return work
92+
}
93+
94+
work = NewApplicationsWorker(a.interval, a.deltaExpireInterval, a.enableSelfPreservation,
95+
a.namingServer, a.healthCheckServer, namespace)
96+
a.workers[namespace] = work
97+
return work
98+
99+
}
100+
101+
func (a *ApplicationsWorkers) Stop() {
102+
for _, v := range a.workers {
103+
v.Stop()
104+
}
105+
}
106+
48107
// ApplicationsWorker 应用缓存协程
49108
type ApplicationsWorker struct {
50109
mutex *sync.Mutex

0 commit comments

Comments
 (0)