Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/cgroup_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func ParseSystemdToCgroupName(name string) CgroupName {
driverName := path.Base(name)
driverName = strings.TrimSuffix(driverName, systemdSuffix)
parts := strings.Split(driverName, "-")
result := []string{}
result := make([]string, 0, len(parts))
for _, part := range parts {
result = append(result, unescapeSystemdCgroupName(part))
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/kubelet/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,18 @@ func (s *Server) InstallAuthNotRequiredHandlers() {
r.RawMustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics, clock.RealClock{}, cadvisorOpts))
}
s.restfulCont.Handle(cadvisorMetricsPath,
compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
WithHTTPLogging(
compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
))

s.addMetricsBucketMatcher("metrics/resource")
resourceRegistry := compbasemetrics.NewKubeRegistry()
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer))
analyzer := &SummaryProviderTracker{ResourceAnalyzer: s.resourceAnalyzer}
resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(analyzer))
s.restfulCont.Handle(resourceMetricsPath,
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
)
WithHTTPLogging(
compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}),
))

// prober metrics are exposed under a different endpoint

Expand Down
147 changes: 147 additions & 0 deletions pkg/kubelet/server/server_patch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package server

import (
"context"
"fmt"
"net/http"
"time"

"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/klog/v2"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/server/stats"

"github.com/google/uuid"
)

func WithHTTPLogging(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
logger := newHTTPLogger(w, req)
defer logger.log()

w = responsewriter.WrapForHTTP1Or2(logger)
handler.ServeHTTP(w, req)
})
}

func newHTTPLogger(w http.ResponseWriter, req *http.Request) *httpLogger {
auditID := audit.GetAuditIDTruncated(req.Context())
if len(auditID) == 0 {
auditID = uuid.New().String()
}
return &httpLogger{
w: w,
startedAt: time.Now(),
method: req.Method,
requestURI: req.RequestURI,
auditID: auditID,
userAgent: req.UserAgent(),
srcIP: req.RemoteAddr,
}
}

type httpLogger struct {
w http.ResponseWriter

method string
requestURI string
auditID string
userAgent string
srcIP string

startedAt time.Time
writeLatency time.Duration
flushLatency time.Duration
writeBytes int
statusRecorded bool
statusCode int
}

var _ http.ResponseWriter = &httpLogger{}
var _ responsewriter.UserProvidedDecorator = &httpLogger{}

func (l *httpLogger) Unwrap() http.ResponseWriter {
return l.w
}

// Header implements http.ResponseWriter.
func (l *httpLogger) Header() http.Header {
return l.w.Header()
}

// Write implements http.ResponseWriter.
func (l *httpLogger) Write(b []byte) (int, error) {
if !l.statusRecorded {
l.record(http.StatusOK) // Default if WriteHeader hasn't been called
}
now := time.Now()
var written int
defer func() {
l.writeLatency += time.Since(now)
l.writeBytes += written
}()
written, err := l.w.Write(b)
return written, err
}

func (l *httpLogger) Flush() {
now := time.Now()
defer func() {
l.flushLatency += time.Since(now)
}()
l.w.(http.Flusher).Flush()
}

// WriteHeader implements http.ResponseWriter.
func (l *httpLogger) WriteHeader(status int) {
l.record(status)
l.w.WriteHeader(status)
}

func (l *httpLogger) record(status int) {
l.statusCode = status
l.statusRecorded = true
}

func (l *httpLogger) log() {
latency := time.Since(l.startedAt)
kvs := []interface{}{
"startedAt", l.startedAt,
"method", l.method,
"URI", l.requestURI,
"latency", latency,
"userAgent", l.userAgent,
"audit-ID", l.auditID,
"srcIP", l.srcIP,
"status", l.statusCode,
"writeLatency", l.writeLatency,
"writtenBytes", fmt.Sprintf("%dK", l.writeBytes/1024),
"flushLatency", l.flushLatency,
}
klog.V(1).InfoSDepth(1, "HTTP", kvs...)
}

type SummaryProviderTracker struct {
stats.ResourceAnalyzer
}

func (t *SummaryProviderTracker) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
now := time.Now()
defer func() {
if passed := time.Since(now); passed > time.Second {
klog.InfoS("GetCPUAndMemoryStats", "latency", passed)
}
}()
return t.ResourceAnalyzer.GetCPUAndMemoryStats(ctx)
}

func (t *SummaryProviderTracker) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) {
now := time.Now()
defer func() {
if passed := time.Since(now); passed > time.Second {
klog.InfoS("Get", "latency", passed)
}
}()
return t.ResourceAnalyzer.Get(ctx, updateStats)
}
2 changes: 0 additions & 2 deletions pkg/kubelet/server/stats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ type Provider interface {
// GetPodByName returns the spec of the pod with the name in the specified
// namespace.
GetPodByName(namespace, name string) (*v1.Pod, bool)
// GetNode returns the spec of the local node.
GetNode() (*v1.Node, error)
// GetNodeConfig returns the configuration of the local node.
GetNodeConfig() cm.NodeConfig
// ListVolumesForPod returns the stats of the volume used by the pod with
Expand Down
16 changes: 2 additions & 14 deletions pkg/kubelet/server/stats/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ func NewSummaryProvider(statsProvider Provider) SummaryProvider {
}

func (sp *summaryProviderImpl) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) {
// TODO(timstclair): Consider returning a best-effort response if any of
// the following errors occur.
node, err := sp.provider.GetNode()
if err != nil {
return nil, fmt.Errorf("failed to get node info: %v", err)
}
nodeConfig := sp.provider.GetNodeConfig()
rootStats, networkStats, err := sp.provider.GetCgroupStats("/", updateStats)
if err != nil {
Expand Down Expand Up @@ -104,7 +98,7 @@ func (sp *summaryProviderImpl) Get(ctx context.Context, updateStats bool) (*stat
}

nodeStats := statsapi.NodeStats{
NodeName: node.Name,
NodeName: string(nodeConfig.NodeName),
CPU: rootStats.CPU,
Memory: rootStats.Memory,
Swap: rootStats.Swap,
Expand All @@ -126,12 +120,6 @@ func (sp *summaryProviderImpl) Get(ctx context.Context, updateStats bool) (*stat
}

func (sp *summaryProviderImpl) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
// TODO(timstclair): Consider returning a best-effort response if any of
// the following errors occur.
node, err := sp.provider.GetNode()
if err != nil {
return nil, fmt.Errorf("failed to get node info: %v", err)
}
nodeConfig := sp.provider.GetNodeConfig()
rootStats, err := sp.provider.GetCgroupCPUAndMemoryStats("/", false)
if err != nil {
Expand All @@ -144,7 +132,7 @@ func (sp *summaryProviderImpl) GetCPUAndMemoryStats(ctx context.Context) (*stats
}

nodeStats := statsapi.NodeStats{
NodeName: node.Name,
NodeName: string(nodeConfig.NodeName),
CPU: rootStats.CPU,
Memory: rootStats.Memory,
Swap: rootStats.Swap,
Expand Down
5 changes: 2 additions & 3 deletions pkg/kubelet/server/stats/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
Expand All @@ -42,6 +43,7 @@ var (
rootFsStats = getFsStats()
node = &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "test-node"}}
nodeConfig = cm.NodeConfig{
NodeName: types.NodeName("test-node"),
RuntimeCgroupsName: "/runtime",
SystemCgroupsName: "/misc",
KubeletCgroupsName: "/kubelet",
Expand Down Expand Up @@ -77,7 +79,6 @@ func TestSummaryProviderGetStatsNoSplitFileSystem(t *testing.T) {

mockStatsProvider := statstest.NewMockProvider(t)

mockStatsProvider.EXPECT().GetNode().Return(node, nil)
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig)
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot)
mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe()
Expand Down Expand Up @@ -178,7 +179,6 @@ func TestSummaryProviderGetStatsSplitImageFs(t *testing.T) {

mockStatsProvider := statstest.NewMockProvider(t)

mockStatsProvider.EXPECT().GetNode().Return(node, nil)
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig)
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot)
mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe()
Expand Down Expand Up @@ -278,7 +278,6 @@ func TestSummaryProviderGetCPUAndMemoryStats(t *testing.T) {

mockStatsProvider := statstest.NewMockProvider(t)

mockStatsProvider.EXPECT().GetNode().Return(node, nil)
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig)
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot)
mockStatsProvider.EXPECT().ListPodCPUAndMemoryStats(ctx).Return(podStats, nil)
Expand Down
3 changes: 0 additions & 3 deletions pkg/kubelet/server/stats/summary_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/assert"
"sigs.k8s.io/randfill"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
Expand All @@ -40,7 +39,6 @@ func TestSummaryProvider(t *testing.T) {
podStats = []statsapi.PodStats{*getPodStats()}
imageFsStats = getFsStats()
rootFsStats = getFsStats()
node = &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "test-node"}}
nodeConfig = cm.NodeConfig{}
cgroupRoot = "/kubepods"
cgroupStatsMap = map[string]struct {
Expand All @@ -55,7 +53,6 @@ func TestSummaryProvider(t *testing.T) {
assert := assert.New(t)

mockStatsProvider := statstest.NewMockProvider(t)
mockStatsProvider.EXPECT().GetNode().Return(node, nil).Maybe()
mockStatsProvider.EXPECT().GetNodeConfig().Return(nodeConfig).Maybe()
mockStatsProvider.EXPECT().GetPodCgroupRoot().Return(cgroupRoot).Maybe()
mockStatsProvider.EXPECT().ListPodStats(ctx).Return(podStats, nil).Maybe()
Expand Down
Loading