Skip to content

Commit 6127a7c

Browse files
committed
UPSTREAM: <carry>: add http logging for kubelet metrics endpoint
1 parent 24677bb commit 6127a7c

File tree

3 files changed

+256
-5
lines changed

3 files changed

+256
-5
lines changed

pkg/kubelet/metrics/collectors/resource_metrics_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ package collectors
1919
import (
2020
"context"
2121
"fmt"
22+
"io"
23+
"net/http"
24+
"net/http/httptest"
2225
"strings"
2326
"testing"
2427
"time"
2528

2629
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
compbasemetrics "k8s.io/component-base/metrics"
2731
"k8s.io/component-base/metrics/testutil"
2832
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
2933
summaryprovidertest "k8s.io/kubernetes/pkg/kubelet/server/stats/testing"
@@ -419,3 +423,112 @@ func TestCollectResourceMetrics(t *testing.T) {
419423
func uint64Ptr(u uint64) *uint64 {
420424
return &u
421425
}
426+
427+
type fakeSummaryProvider struct {
428+
stats *statsapi.Summary
429+
}
430+
431+
func (p *fakeSummaryProvider) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) {
432+
return nil, fmt.Errorf("should not be invoked")
433+
}
434+
435+
func (p *fakeSummaryProvider) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
436+
return p.stats, nil
437+
}
438+
439+
func createSummary() *statsapi.Summary {
440+
staticTimestamp := time.Unix(0, 1624396278302091597)
441+
testTime := metav1.NewTime(staticTimestamp)
442+
443+
pods := make([]statsapi.PodStats, 0)
444+
for i := 0; i < 500; i++ {
445+
podstat := statsapi.PodStats{
446+
PodRef: statsapi.PodReference{
447+
Name: fmt.Sprintf("pod_%d", i),
448+
Namespace: fmt.Sprintf("namespace_%d", i),
449+
},
450+
CPU: &statsapi.CPUStats{
451+
Time: testTime,
452+
UsageCoreNanoSeconds: uint64Ptr(10000000000),
453+
},
454+
Memory: &statsapi.MemoryStats{
455+
Time: testTime,
456+
WorkingSetBytes: uint64Ptr(1000),
457+
},
458+
Swap: &statsapi.SwapStats{
459+
Time: testTime,
460+
SwapUsageBytes: uint64Ptr(5000),
461+
},
462+
}
463+
for j := 0; j < 10; j++ {
464+
podstat.Containers = append(podstat.Containers, statsapi.ContainerStats{
465+
Name: fmt.Sprintf("container_%d", j),
466+
StartTime: metav1.NewTime(staticTimestamp.Add(-30 * time.Second)),
467+
CPU: &statsapi.CPUStats{
468+
Time: testTime,
469+
UsageCoreNanoSeconds: uint64Ptr(10000000000),
470+
},
471+
Memory: &statsapi.MemoryStats{
472+
Time: testTime,
473+
WorkingSetBytes: uint64Ptr(1000),
474+
},
475+
Swap: &statsapi.SwapStats{
476+
Time: testTime,
477+
SwapUsageBytes: uint64Ptr(1000),
478+
},
479+
})
480+
}
481+
pods = append(pods, podstat)
482+
}
483+
484+
return &statsapi.Summary{
485+
Node: statsapi.NodeStats{
486+
CPU: &statsapi.CPUStats{
487+
Time: testTime,
488+
UsageCoreNanoSeconds: uint64Ptr(10000000000),
489+
},
490+
Memory: &statsapi.MemoryStats{
491+
Time: testTime,
492+
WorkingSetBytes: uint64Ptr(1000),
493+
},
494+
Swap: &statsapi.SwapStats{
495+
Time: testTime,
496+
SwapUsageBytes: uint64Ptr(500),
497+
},
498+
},
499+
Pods: pods,
500+
}
501+
}
502+
503+
func TestResourceMetricsCollector(t *testing.T) {
504+
provider := &fakeSummaryProvider{stats: createSummary()}
505+
collector := NewResourceMetricsCollector(provider)
506+
507+
registry := compbasemetrics.NewKubeRegistry()
508+
registry.CustomMustRegister(collector)
509+
510+
handler := compbasemetrics.HandlerFor(registry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError})
511+
// handler = http.HandlerFunc(WithHTTPLogging(handler))
512+
513+
server := httptest.NewUnstartedServer(handler)
514+
defer server.Close()
515+
server.EnableHTTP2 = false
516+
server.StartTLS()
517+
518+
client := server.Client()
519+
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, server.URL, nil)
520+
if err != nil {
521+
t.Errorf("failed to create a new http request - %v", err)
522+
return
523+
}
524+
525+
resp, err := client.Do(req)
526+
if err != nil {
527+
t.Fatalf("unexpected error from client.Do - %v", err)
528+
}
529+
bytes, err := io.ReadAll(resp.Body)
530+
if err != nil {
531+
t.Errorf("unexpected error while reading the response body - %v", err)
532+
}
533+
t.Logf("%v", len(bytes)/1024)
534+
}

pkg/kubelet/server/server.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -474,15 +474,18 @@ func (s *Server) InstallAuthNotRequiredHandlers() {
474474
r.RawMustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics, clock.RealClock{}, cadvisorOpts))
475475
}
476476
s.restfulCont.Handle(cadvisorMetricsPath,
477-
compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
478-
)
477+
WithHTTPLogging(
478+
compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
479+
))
479480

480481
s.addMetricsBucketMatcher("metrics/resource")
481482
resourceRegistry := compbasemetrics.NewKubeRegistry()
482-
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
483+
analyzer := &SummaryProviderTracker{ResourceAnalyzer: s.resourceAnalyzer}
484+
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(analyzer))
483485
s.restfulCont.Handle(resourceMetricsPath,
484-
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
485-
)
486+
WithHTTPLogging(
487+
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
488+
))
486489

487490
// prober metrics are exposed under a different endpoint
488491

pkg/kubelet/server/server_patch.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"k8s.io/apiserver/pkg/audit"
10+
"k8s.io/apiserver/pkg/endpoints/responsewriter"
11+
"k8s.io/klog/v2"
12+
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
13+
"k8s.io/kubernetes/pkg/kubelet/server/stats"
14+
15+
"github.com/google/uuid"
16+
)
17+
18+
func WithHTTPLogging(handler http.Handler) http.Handler {
19+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
20+
logger := newHTTPLogger(w, req)
21+
defer logger.log()
22+
23+
w = responsewriter.WrapForHTTP1Or2(logger)
24+
handler.ServeHTTP(w, req)
25+
})
26+
}
27+
28+
func newHTTPLogger(w http.ResponseWriter, req *http.Request) *httpLogger {
29+
auditID := audit.GetAuditIDTruncated(req.Context())
30+
if len(auditID) == 0 {
31+
auditID = uuid.New().String()
32+
}
33+
return &httpLogger{
34+
w: w,
35+
startedAt: time.Now(),
36+
method: req.Method,
37+
requestURI: req.RequestURI,
38+
auditID: auditID,
39+
userAgent: req.UserAgent(),
40+
srcIP: req.RemoteAddr,
41+
}
42+
}
43+
44+
type httpLogger struct {
45+
w http.ResponseWriter
46+
47+
method string
48+
requestURI string
49+
auditID string
50+
userAgent string
51+
srcIP string
52+
53+
startedAt time.Time
54+
writeLatency time.Duration
55+
flushLatency time.Duration
56+
writeBytes int
57+
statusRecorded bool
58+
statusCode int
59+
}
60+
61+
var _ http.ResponseWriter = &httpLogger{}
62+
var _ responsewriter.UserProvidedDecorator = &httpLogger{}
63+
64+
func (l *httpLogger) Unwrap() http.ResponseWriter {
65+
return l.w
66+
}
67+
68+
// Header implements http.ResponseWriter.
69+
func (l *httpLogger) Header() http.Header {
70+
return l.w.Header()
71+
}
72+
73+
// Write implements http.ResponseWriter.
74+
func (l *httpLogger) Write(b []byte) (int, error) {
75+
if !l.statusRecorded {
76+
l.record(http.StatusOK) // Default if WriteHeader hasn't been called
77+
}
78+
now := time.Now()
79+
var written int
80+
defer func() {
81+
l.writeLatency += time.Since(now)
82+
l.writeBytes += written
83+
}()
84+
written, err := l.w.Write(b)
85+
return written, err
86+
}
87+
88+
func (l *httpLogger) Flush() {
89+
now := time.Now()
90+
defer func() {
91+
l.flushLatency += time.Since(now)
92+
}()
93+
l.w.(http.Flusher).Flush()
94+
}
95+
96+
// WriteHeader implements http.ResponseWriter.
97+
func (l *httpLogger) WriteHeader(status int) {
98+
l.record(status)
99+
l.w.WriteHeader(status)
100+
}
101+
102+
func (l *httpLogger) record(status int) {
103+
l.statusCode = status
104+
l.statusRecorded = true
105+
}
106+
107+
func (l *httpLogger) log() {
108+
latency := time.Since(l.startedAt)
109+
kvs := []interface{}{
110+
"startedAt", l.startedAt,
111+
"method", l.method,
112+
"URI", l.requestURI,
113+
"latency", latency,
114+
"userAgent", l.userAgent,
115+
"audit-ID", l.auditID,
116+
"srcIP", l.srcIP,
117+
"status", l.statusCode,
118+
"writeLatency", l.writeLatency,
119+
"writtenBytes", fmt.Sprintf("%dK", l.writeBytes/1024),
120+
"flushLatency", l.flushLatency,
121+
}
122+
klog.V(1).InfoSDepth(1, "HTTP", kvs...)
123+
}
124+
125+
type SummaryProviderTracker struct {
126+
stats.ResourceAnalyzer
127+
}
128+
129+
func (t *SummaryProviderTracker) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
130+
now := time.Now()
131+
defer func() {
132+
klog.InfoS("GetCPUAndMemoryStats", "latency", time.Since(now))
133+
}()
134+
return t.ResourceAnalyzer.GetCPUAndMemoryStats(ctx)
135+
}

0 commit comments

Comments
 (0)