Skip to content

Commit 1eef760

Browse files
authored
[ISSUE polarismesh#1047] support envoy gateway to get XDS rule (polarismesh#1046)
1 parent d914b43 commit 1eef760

28 files changed

+1726
-433
lines changed

.licenserc.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ header: # `header` section is configurations for source codes license header.
5757
- "common/api/protoc"
5858
- "deploy"
5959
- "release"
60-
- "**/testdata/**"
60+
- "test/data/xds"
6161

6262
# single file
6363
- "LICENSE"

apiserver/xdsserverv3/cache.go

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/**
2+
* Tencent is pleased to support the open source community by making Polaris available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package xdsserverv3
19+
20+
import (
21+
"context"
22+
23+
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
24+
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
25+
)
26+
27+
type (
28+
snapshotCache struct {
29+
hook CacheHook
30+
// hash is the hashing function for Envoy nodes
31+
hash cachev3.NodeHash
32+
xdsCache cachev3.SnapshotCache
33+
}
34+
35+
// CacheHook
36+
CacheHook interface {
37+
// OnCreateWatch
38+
OnCreateWatch(request *cachev3.Request, streamState stream.StreamState,
39+
value chan cachev3.Response)
40+
// OnCreateDeltaWatch
41+
OnCreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState,
42+
value chan cachev3.DeltaResponse)
43+
// OnFetch
44+
OnFetch(ctx context.Context, request *cachev3.Request)
45+
}
46+
)
47+
48+
// NewSnapshotCache create a XDS SnapshotCache to proxy cachev3.SnapshotCache
49+
func NewSnapshotCache(xdsCache cachev3.SnapshotCache, hook CacheHook) cachev3.SnapshotCache {
50+
return newSnapshotCache(xdsCache, hook)
51+
}
52+
53+
func newSnapshotCache(xdsCache cachev3.SnapshotCache, hook CacheHook) *snapshotCache {
54+
cache := &snapshotCache{
55+
hook: hook,
56+
xdsCache: xdsCache,
57+
}
58+
return cache
59+
}
60+
61+
// SetSnapshotCacheContext updates a snapshot for a node.
62+
func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string,
63+
snapshot cachev3.ResourceSnapshot) error {
64+
return cache.xdsCache.SetSnapshot(ctx, node, snapshot)
65+
}
66+
67+
// GetSnapshots gets the snapshot for a node, and returns an error if not found.
68+
func (cache *snapshotCache) GetSnapshot(node string) (cachev3.ResourceSnapshot, error) {
69+
return cache.xdsCache.GetSnapshot(node)
70+
}
71+
72+
// ClearSnapshot clears snapshot and info for a node.
73+
func (cache *snapshotCache) ClearSnapshot(node string) {
74+
cache.xdsCache.ClearSnapshot(node)
75+
}
76+
77+
// CreateWatch returns a watch for an xDS request.
78+
func (cache *snapshotCache) CreateWatch(request *cachev3.Request, streamState stream.StreamState,
79+
value chan cachev3.Response) func() {
80+
if cache.hook != nil {
81+
cache.hook.OnCreateWatch(request, streamState, value)
82+
}
83+
return cache.xdsCache.CreateWatch(request, streamState, value)
84+
}
85+
86+
// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache.
87+
func (cache *snapshotCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream.StreamState,
88+
value chan cachev3.DeltaResponse) func() {
89+
if cache.hook != nil {
90+
cache.hook.OnCreateDeltaWatch(request, state, value)
91+
}
92+
return cache.xdsCache.CreateDeltaWatch(request, state, value)
93+
}
94+
95+
// Fetch implements the cache fetch function.
96+
// Fetch is called on multiple streams, so responding to individual names with the same version works.
97+
func (cache *snapshotCache) Fetch(ctx context.Context, request *cachev3.Request) (cachev3.Response, error) {
98+
if cache.hook != nil {
99+
cache.hook.OnFetch(ctx, request)
100+
}
101+
return cache.xdsCache.Fetch(ctx, request)
102+
}
103+
104+
// GetStatusInfo retrieves the status info for the node.
105+
func (cache *snapshotCache) GetStatusInfo(node string) cachev3.StatusInfo {
106+
return cache.xdsCache.GetStatusInfo(node)
107+
}
108+
109+
// GetStatusKeys retrieves all node IDs in the status map.
110+
func (cache *snapshotCache) GetStatusKeys() []string {
111+
return cache.xdsCache.GetStatusKeys()
112+
}

apiserver/xdsserverv3/callback.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import (
2727
)
2828

2929
type Callbacks struct {
30-
log *commonlog.Scope
30+
log *commonlog.Scope
31+
nodeMgr *XDSNodeManager
3132
}
3233

3334
func (cb *Callbacks) Report() {
@@ -45,6 +46,7 @@ func (cb *Callbacks) OnStreamClosed(id int64) {
4546
if cb.log.DebugEnabled() {
4647
cb.log.Debugf("stream %d closed", id)
4748
}
49+
cb.nodeMgr.DelNode(id)
4850
}
4951

5052
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
@@ -66,6 +68,7 @@ func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest)
6668
str, _ := marshaler.MarshalToString(req)
6769
cb.log.Debugf("on stream %d type %s request %s ", id, req.TypeUrl, str)
6870
}
71+
cb.nodeMgr.AddNodeIfAbsent(id, req.GetNode())
6972
return nil
7073
}
7174

@@ -79,23 +82,24 @@ func (cb *Callbacks) OnStreamResponse(_ context.Context, id int64, req *discover
7982
}
8083
}
8184

82-
func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest,
83-
resp *discovery.DeltaDiscoveryResponse) {
85+
func (cb *Callbacks) OnStreamDeltaRequest(id int64, req *discovery.DeltaDiscoveryRequest) error {
8486
if cb.log.DebugEnabled() {
8587
marshaler := jsonpb.Marshaler{}
86-
reqstr, _ := marshaler.MarshalToString(req)
87-
respstr, _ := marshaler.MarshalToString(resp)
88-
cb.log.Debugf("on delta stream %d type %s request %s response %s", id, req.TypeUrl, reqstr, respstr)
88+
str, _ := marshaler.MarshalToString(req)
89+
cb.log.Debugf("on stream %d delta type %s request %s", id, req.TypeUrl, str)
8990
}
91+
cb.nodeMgr.AddNodeIfAbsent(id, req.GetNode())
92+
return nil
9093
}
9194

92-
func (cb *Callbacks) OnStreamDeltaRequest(id int64, req *discovery.DeltaDiscoveryRequest) error {
95+
func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest,
96+
resp *discovery.DeltaDiscoveryResponse) {
9397
if cb.log.DebugEnabled() {
9498
marshaler := jsonpb.Marshaler{}
95-
str, _ := marshaler.MarshalToString(req)
96-
cb.log.Debugf("on stream %d delta type %s request %s", id, req.TypeUrl, str)
99+
reqstr, _ := marshaler.MarshalToString(req)
100+
respstr, _ := marshaler.MarshalToString(resp)
101+
cb.log.Debugf("on delta stream %d type %s request %s response %s", id, req.TypeUrl, reqstr, respstr)
97102
}
98-
return nil
99103
}
100104

101105
func (cb *Callbacks) OnFetchRequest(_ context.Context, req *discovery.DiscoveryRequest) error {

0 commit comments

Comments
 (0)