Skip to content

Commit fc535d1

Browse files
authored
Add endpoint slice watcher (#1528)
1 parent d326656 commit fc535d1

File tree

8 files changed

+854
-66
lines changed

8 files changed

+854
-66
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package resolver
5+
6+
import (
7+
"fmt"
8+
"sync"
9+
10+
"go.uber.org/zap"
11+
discv1 "k8s.io/api/discovery/v1"
12+
"k8s.io/client-go/informers"
13+
"k8s.io/client-go/tools/cache"
14+
)
15+
16+
// endpointSliceWatcher watches EndpointSlices and builds:
17+
// 1. ip/ip:port -> "workload@namespace"
18+
// 2. service@namespace -> "workload@namespace"
19+
type endpointSliceWatcher struct {
20+
logger *zap.Logger
21+
informer cache.SharedIndexInformer
22+
ipToWorkload *sync.Map // key: "ip" or "ip:port", val: "workload@ns"
23+
serviceToWorkload *sync.Map // key: "service@namespace", val: "workload@ns"
24+
25+
// For bookkeeping, so we can remove old mappings upon EndpointSlice deletion
26+
sliceToKeysMap sync.Map // map[sliceUID string] -> []string of keys we inserted, which are "ip", "ip:port", or "service@namespace"
27+
deleter Deleter
28+
}
29+
30+
// kvPair holds one mapping from key -> value. The isService flag
31+
// indicates whether this key is for a Service or for an IP/IP:port.
32+
type kvPair struct {
33+
key string // key: "ip" or "ip:port" or "service@namespace"
34+
value string // value: "workload@namespace"
35+
isService bool // true if key = "service@namespace"
36+
}
37+
38+
// newEndpointSliceWatcher creates an EndpointSlice watcher for the new approach (when USE_LIST_POD=false).
39+
func newEndpointSliceWatcher(
40+
logger *zap.Logger,
41+
factory informers.SharedInformerFactory,
42+
deleter Deleter,
43+
) *endpointSliceWatcher {
44+
45+
esInformer := factory.Discovery().V1().EndpointSlices().Informer()
46+
err := esInformer.SetTransform(minimizeEndpointSlice)
47+
if err != nil {
48+
logger.Error("failed to minimize Service objects", zap.Error(err))
49+
}
50+
51+
return &endpointSliceWatcher{
52+
logger: logger,
53+
informer: esInformer,
54+
ipToWorkload: &sync.Map{},
55+
serviceToWorkload: &sync.Map{},
56+
deleter: deleter,
57+
}
58+
}
59+
60+
// run starts the endpointSliceWatcher.
61+
func (w *endpointSliceWatcher) Run(stopCh chan struct{}) {
62+
w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
63+
AddFunc: func(obj interface{}) {
64+
w.handleSliceAdd(obj)
65+
},
66+
UpdateFunc: func(oldObj, newObj interface{}) {
67+
w.handleSliceUpdate(newObj, oldObj)
68+
},
69+
DeleteFunc: func(obj interface{}) {
70+
w.handleSliceDelete(obj)
71+
},
72+
})
73+
go w.informer.Run(stopCh)
74+
}
75+
76+
func (w *endpointSliceWatcher) waitForCacheSync(stopCh chan struct{}) {
77+
if !cache.WaitForNamedCacheSync("endpointSliceWatcher", stopCh, w.informer.HasSynced) {
78+
w.logger.Fatal("timed out waiting for endpointSliceWatcher cache to sync")
79+
}
80+
w.logger.Info("endpointSliceWatcher: Cache synced")
81+
}
82+
83+
// extractEndpointSliceKeyValuePairs computes the relevant mappings from an EndpointSlice.
84+
//
85+
// It returns a list of kvPair:
86+
// - All IP and IP:port keys (isService=false) -> "workload@ns"
87+
// - The Service name key (isService=true) -> first "workload@ns" found
88+
//
89+
// This function does NOT modify ipToWorkload or serviceToWorkload. It's purely for computing
90+
// the pairs, so it can be reused by both add and update methods.
91+
func (w *endpointSliceWatcher) extractEndpointSliceKeyValuePairs(slice *discv1.EndpointSlice) []kvPair {
92+
var pairs []kvPair
93+
94+
isFirstPod := true
95+
for _, endpoint := range slice.Endpoints {
96+
if endpoint.TargetRef != nil {
97+
if endpoint.TargetRef.Kind != "Pod" {
98+
continue
99+
}
100+
101+
podName := endpoint.TargetRef.Name
102+
ns := endpoint.TargetRef.Namespace
103+
104+
derivedWorkload := inferWorkloadName(podName)
105+
if derivedWorkload == "" {
106+
w.logger.Warn("failed to infer workload name from Pod name", zap.String("podName", podName))
107+
continue
108+
}
109+
fullWl := derivedWorkload + "@" + ns
110+
111+
// Build IP and IP:port pairs
112+
for _, addr := range endpoint.Addresses {
113+
// "ip" -> "workload@namespace"
114+
pairs = append(pairs, kvPair{
115+
key: addr,
116+
value: fullWl,
117+
isService: false,
118+
})
119+
120+
// "ip:port" -> "workload@namespace" for each port
121+
for _, portDef := range slice.Ports {
122+
if portDef.Port != nil {
123+
ipPort := fmt.Sprintf("%s:%d", addr, *portDef.Port)
124+
pairs = append(pairs, kvPair{
125+
key: ipPort,
126+
value: fullWl,
127+
isService: false,
128+
})
129+
}
130+
}
131+
}
132+
133+
// Build service name -> "workload@namespace" pair from the first pod
134+
if isFirstPod {
135+
isFirstPod = false
136+
svcName := slice.Labels["kubernetes.io/service-name"]
137+
if svcName != "" {
138+
pairs = append(pairs, kvPair{
139+
key: svcName + "@" + ns,
140+
value: fullWl,
141+
isService: true,
142+
})
143+
}
144+
}
145+
}
146+
147+
}
148+
149+
return pairs
150+
}
151+
152+
// handleSliceAdd handles a new EndpointSlice that wasn't seen before.
153+
// It computes all keys and directly stores them. Then it records those keys
154+
// in sliceToKeysMap so that we can remove them later upon deletion.
155+
func (w *endpointSliceWatcher) handleSliceAdd(obj interface{}) {
156+
newSlice := obj.(*discv1.EndpointSlice)
157+
sliceUID := string(newSlice.UID)
158+
159+
// Compute all key-value pairs for this new slice
160+
pairs := w.extractEndpointSliceKeyValuePairs(newSlice)
161+
162+
// Insert them into our ipToWorkload / serviceToWorkload, and track the keys.
163+
keys := make([]string, 0, len(pairs))
164+
for _, kv := range pairs {
165+
if kv.isService {
166+
w.serviceToWorkload.Store(kv.key, kv.value)
167+
} else {
168+
w.ipToWorkload.Store(kv.key, kv.value)
169+
}
170+
keys = append(keys, kv.key)
171+
}
172+
173+
// Save these keys so we can remove them on delete
174+
w.sliceToKeysMap.Store(sliceUID, keys)
175+
}
176+
177+
// handleSliceUpdate handles an update from oldSlice -> newSlice.
178+
// Instead of blindly removing all old keys and adding new ones, it diffs them:
179+
// - remove only keys that no longer exist,
180+
// - add only new keys that didn't exist before,
181+
// - keep those that haven't changed.
182+
func (w *endpointSliceWatcher) handleSliceUpdate(oldObj, newObj interface{}) {
183+
oldSlice := oldObj.(*discv1.EndpointSlice)
184+
newSlice := newObj.(*discv1.EndpointSlice)
185+
186+
oldUID := string(oldSlice.UID)
187+
newUID := string(newSlice.UID)
188+
189+
// 1) Fetch old keys from sliceToKeysMap (if present).
190+
var oldKeys []string
191+
if val, ok := w.sliceToKeysMap.Load(oldUID); ok {
192+
oldKeys = val.([]string)
193+
}
194+
195+
// 2) Compute fresh pairs (and thus keys) from the new slice.
196+
newPairs := w.extractEndpointSliceKeyValuePairs(newSlice)
197+
var newKeys []string
198+
for _, kv := range newPairs {
199+
newKeys = append(newKeys, kv.key)
200+
}
201+
202+
// Convert oldKeys/newKeys to sets for easy diff
203+
oldKeysSet := make(map[string]struct{}, len(oldKeys))
204+
for _, k := range oldKeys {
205+
oldKeysSet[k] = struct{}{}
206+
}
207+
newKeysSet := make(map[string]struct{}, len(newKeys))
208+
for _, k := range newKeys {
209+
newKeysSet[k] = struct{}{}
210+
}
211+
212+
// 3) For each key in oldKeys that doesn't exist in newKeys, remove it
213+
for k := range oldKeysSet {
214+
if _, stillPresent := newKeysSet[k]; !stillPresent {
215+
w.deleter.DeleteWithDelay(w.ipToWorkload, k)
216+
w.deleter.DeleteWithDelay(w.serviceToWorkload, k)
217+
}
218+
}
219+
220+
// 4) For each key in newKeys that wasn't in oldKeys, we need to store it
221+
// in the appropriate sync.Map. We'll look up the value from newPairs.
222+
for _, kv := range newPairs {
223+
if _, alreadyHad := oldKeysSet[kv.key]; !alreadyHad {
224+
if kv.isService {
225+
w.serviceToWorkload.Store(kv.key, kv.value)
226+
} else {
227+
w.ipToWorkload.Store(kv.key, kv.value)
228+
}
229+
}
230+
}
231+
232+
// 5) Update sliceToKeysMap for the new slice UID
233+
// (Often the UID doesn't change across updates, but we'll handle it properly.)
234+
w.sliceToKeysMap.Delete(oldUID)
235+
w.sliceToKeysMap.Store(newUID, newKeys)
236+
}
237+
238+
// handleSliceDelete removes any IP->workload or service->workload keys that were created by this slice.
239+
func (w *endpointSliceWatcher) handleSliceDelete(obj interface{}) {
240+
slice := obj.(*discv1.EndpointSlice)
241+
w.removeSliceKeys(slice)
242+
}
243+
244+
func (w *endpointSliceWatcher) removeSliceKeys(slice *discv1.EndpointSlice) {
245+
sliceUID := string(slice.UID)
246+
val, ok := w.sliceToKeysMap.Load(sliceUID)
247+
if !ok {
248+
return
249+
}
250+
251+
keys := val.([]string)
252+
for _, k := range keys {
253+
w.deleter.DeleteWithDelay(w.ipToWorkload, k)
254+
w.deleter.DeleteWithDelay(w.serviceToWorkload, k)
255+
}
256+
w.sliceToKeysMap.Delete(sliceUID)
257+
}
258+
259+
// minimizeEndpointSlice removes fields that are not required by our mapping logic,
260+
// retaining only the minimal set of fields needed (ObjectMeta.Name, Namespace, UID, Labels,
261+
// Endpoints (with their Addresses and TargetRef) and Ports).
262+
func minimizeEndpointSlice(obj interface{}) (interface{}, error) {
263+
eps, ok := obj.(*discv1.EndpointSlice)
264+
if !ok {
265+
return obj, fmt.Errorf("object is not an EndpointSlice")
266+
}
267+
268+
// Minimize metadata: we only really need Name, Namespace, UID and Labels.
269+
eps.Annotations = nil
270+
eps.ManagedFields = nil
271+
eps.Finalizers = nil
272+
273+
// The watcher only uses:
274+
// - eps.Labels["kubernetes.io/service-name"]
275+
// - eps.Namespace (from metadata)
276+
// - eps.UID (from metadata)
277+
// - eps.Endpoints: for each endpoint, its Addresses and TargetRef.
278+
// - eps.Ports: each port's Port (and optionally Name/Protocol)
279+
//
280+
// For each endpoint, clear fields that we don’t use.
281+
for i := range eps.Endpoints {
282+
// We only need Addresses and TargetRef. Hostname, NodeName, and Zone are not used.
283+
eps.Endpoints[i].Hostname = nil
284+
eps.Endpoints[i].NodeName = nil
285+
eps.Endpoints[i].Zone = nil
286+
eps.Endpoints[i].DeprecatedTopology = nil
287+
eps.Endpoints[i].Hints = nil
288+
}
289+
290+
// No transformation is needed for eps.Ports because we use them directly.
291+
return eps, nil
292+
}

0 commit comments

Comments
 (0)