Skip to content

Commit 3ed3ff9

Browse files
tkashemrphillips
authored andcommitted
UPSTREAM: <carry>: add http logging for kubelet metrics endpoint
1 parent 24677bb commit 3ed3ff9

File tree

2 files changed

+143
-5
lines changed

2 files changed

+143
-5
lines changed

pkg/kubelet/server/server.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -474,15 +474,18 @@ func (s *Server) InstallAuthNotRequiredHandlers() {
474474
r.RawMustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics, clock.RealClock{}, cadvisorOpts))
475475
}
476476
s.restfulCont.Handle(cadvisorMetricsPath,
477-
compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
478-
)
477+
WithHTTPLogging(
478+
compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
479+
))
479480

480481
s.addMetricsBucketMatcher("metrics/resource")
481482
resourceRegistry := compbasemetrics.NewKubeRegistry()
482-
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
483+
analyzer := &SummaryProviderTracker{ResourceAnalyzer: s.resourceAnalyzer}
484+
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(analyzer))
483485
s.restfulCont.Handle(resourceMetricsPath,
484-
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
485-
)
486+
WithHTTPLogging(
487+
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
488+
))
486489

487490
// prober metrics are exposed under a different endpoint
488491

pkg/kubelet/server/server_patch.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"k8s.io/apiserver/pkg/audit"
10+
"k8s.io/apiserver/pkg/endpoints/responsewriter"
11+
"k8s.io/klog/v2"
12+
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
13+
"k8s.io/kubernetes/pkg/kubelet/server/stats"
14+
15+
"github.com/google/uuid"
16+
)
17+
18+
func WithHTTPLogging(handler http.Handler) http.Handler {
19+
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
20+
logger := newHTTPLogger(w, req)
21+
defer logger.log()
22+
23+
w = responsewriter.WrapForHTTP1Or2(logger)
24+
handler.ServeHTTP(w, req)
25+
})
26+
}
27+
28+
func newHTTPLogger(w http.ResponseWriter, req *http.Request) *httpLogger {
29+
auditID := audit.GetAuditIDTruncated(req.Context())
30+
if len(auditID) == 0 {
31+
auditID = uuid.New().String()
32+
}
33+
return &httpLogger{
34+
w: w,
35+
startedAt: time.Now(),
36+
method: req.Method,
37+
requestURI: req.RequestURI,
38+
auditID: auditID,
39+
userAgent: req.UserAgent(),
40+
srcIP: req.RemoteAddr,
41+
}
42+
}
43+
44+
type httpLogger struct {
45+
w http.ResponseWriter
46+
47+
method string
48+
requestURI string
49+
auditID string
50+
userAgent string
51+
srcIP string
52+
53+
startedAt time.Time
54+
writeLatency time.Duration
55+
flushLatency time.Duration
56+
writeBytes int
57+
statusRecorded bool
58+
statusCode int
59+
}
60+
61+
var _ http.ResponseWriter = &httpLogger{}
62+
var _ responsewriter.UserProvidedDecorator = &httpLogger{}
63+
64+
func (l *httpLogger) Unwrap() http.ResponseWriter {
65+
return l.w
66+
}
67+
68+
// Header implements http.ResponseWriter.
69+
func (l *httpLogger) Header() http.Header {
70+
return l.w.Header()
71+
}
72+
73+
// Write implements http.ResponseWriter.
74+
func (l *httpLogger) Write(b []byte) (int, error) {
75+
if !l.statusRecorded {
76+
l.record(http.StatusOK) // Default if WriteHeader hasn't been called
77+
}
78+
now := time.Now()
79+
var written int
80+
defer func() {
81+
l.writeLatency += time.Since(now)
82+
l.writeBytes += written
83+
}()
84+
written, err := l.w.Write(b)
85+
return written, err
86+
}
87+
88+
func (l *httpLogger) Flush() {
89+
now := time.Now()
90+
defer func() {
91+
l.flushLatency += time.Since(now)
92+
}()
93+
l.w.(http.Flusher).Flush()
94+
}
95+
96+
// WriteHeader implements http.ResponseWriter.
97+
func (l *httpLogger) WriteHeader(status int) {
98+
l.record(status)
99+
l.w.WriteHeader(status)
100+
}
101+
102+
func (l *httpLogger) record(status int) {
103+
l.statusCode = status
104+
l.statusRecorded = true
105+
}
106+
107+
func (l *httpLogger) log() {
108+
latency := time.Since(l.startedAt)
109+
kvs := []interface{}{
110+
"startedAt", l.startedAt,
111+
"method", l.method,
112+
"URI", l.requestURI,
113+
"latency", latency,
114+
"userAgent", l.userAgent,
115+
"audit-ID", l.auditID,
116+
"srcIP", l.srcIP,
117+
"status", l.statusCode,
118+
"writeLatency", l.writeLatency,
119+
"writtenBytes", fmt.Sprintf("%dK", l.writeBytes/1024),
120+
"flushLatency", l.flushLatency,
121+
}
122+
klog.V(1).InfoSDepth(1, "HTTP", kvs...)
123+
}
124+
125+
type SummaryProviderTracker struct {
126+
stats.ResourceAnalyzer
127+
}
128+
129+
func (t *SummaryProviderTracker) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
130+
now := time.Now()
131+
defer func() {
132+
klog.InfoS("GetCPUAndMemoryStats", "latency", time.Since(now))
133+
}()
134+
return t.ResourceAnalyzer.GetCPUAndMemoryStats(ctx)
135+
}

0 commit comments

Comments
 (0)