Skip to content

Commit fd1da0b

Browse files
authored
Merge pull request #104 from nojnhuh/liveness-probe
Add livenessProbe to kubelet plugin
2 parents 871042d + ef100f1 commit fd1da0b

File tree

6 files changed

+192
-6
lines changed

6 files changed

+192
-6
lines changed

cmd/dra-example-kubeletplugin/driver.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ import (
3232
)
3333

3434
type driver struct {
35-
client coreclientset.Interface
36-
helper *kubeletplugin.Helper
37-
state *DeviceState
35+
client coreclientset.Interface
36+
helper *kubeletplugin.Helper
37+
state *DeviceState
38+
healthcheck *healthcheck
3839
}
3940

4041
func NewDriver(ctx context.Context, config *Config) (*driver, error) {
@@ -78,14 +79,22 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
7879
},
7980
}
8081

82+
driver.healthcheck, err = startHealthcheck(ctx, config)
83+
if err != nil {
84+
return nil, fmt.Errorf("start healthcheck: %w", err)
85+
}
86+
8187
if err := helper.PublishResources(ctx, resources); err != nil {
8288
return nil, err
8389
}
8490

8591
return driver, nil
8692
}
8793

88-
func (d *driver) Shutdown() error {
94+
func (d *driver) Shutdown(logger klog.Logger) error {
95+
if d.healthcheck != nil {
96+
d.healthcheck.Stop(logger)
97+
}
8998
d.helper.Stop()
9099
return nil
91100
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2025 The Kubernetes Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net"
23+
"net/url"
24+
"path"
25+
"strconv"
26+
"sync"
27+
28+
"google.golang.org/grpc"
29+
"google.golang.org/grpc/codes"
30+
"google.golang.org/grpc/credentials/insecure"
31+
"google.golang.org/grpc/health/grpc_health_v1"
32+
"google.golang.org/grpc/status"
33+
"k8s.io/klog/v2"
34+
drapb "k8s.io/kubelet/pkg/apis/dra/v1beta1"
35+
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
36+
37+
"sigs.k8s.io/dra-example-driver/pkg/consts"
38+
)
39+
40+
type healthcheck struct {
41+
grpc_health_v1.UnimplementedHealthServer
42+
43+
server *grpc.Server
44+
wg sync.WaitGroup
45+
46+
regClient registerapi.RegistrationClient
47+
draClient drapb.DRAPluginClient
48+
}
49+
50+
func startHealthcheck(ctx context.Context, config *Config) (*healthcheck, error) {
51+
log := klog.FromContext(ctx)
52+
53+
port := config.flags.healthcheckPort
54+
if port < 0 {
55+
return nil, nil
56+
}
57+
58+
addr := net.JoinHostPort("", strconv.Itoa(port))
59+
lis, err := net.Listen("tcp", addr)
60+
if err != nil {
61+
return nil, fmt.Errorf("failed to listen for healthcheck service at %s: %w", addr, err)
62+
}
63+
64+
regSockPath := (&url.URL{
65+
Scheme: "unix",
66+
// TODO: this needs to adapt when seamless upgrades
67+
// are enabled and the filename includes a uid.
68+
Path: path.Join(config.flags.kubeletRegistrarDirectoryPath, consts.DriverName+"-reg.sock"),
69+
}).String()
70+
log.Info("connecting to registration socket", "path", regSockPath)
71+
regConn, err := grpc.NewClient(
72+
regSockPath,
73+
grpc.WithTransportCredentials(insecure.NewCredentials()),
74+
)
75+
if err != nil {
76+
return nil, fmt.Errorf("connect to registration socket: %w", err)
77+
}
78+
79+
draSockPath := (&url.URL{
80+
Scheme: "unix",
81+
Path: path.Join(config.DriverPluginPath(), "dra.sock"),
82+
}).String()
83+
log.Info("connecting to DRA socket", "path", draSockPath)
84+
draConn, err := grpc.NewClient(
85+
draSockPath,
86+
grpc.WithTransportCredentials(insecure.NewCredentials()),
87+
)
88+
if err != nil {
89+
return nil, fmt.Errorf("connect to DRA socket: %w", err)
90+
}
91+
92+
server := grpc.NewServer()
93+
healthcheck := &healthcheck{
94+
server: server,
95+
regClient: registerapi.NewRegistrationClient(regConn),
96+
draClient: drapb.NewDRAPluginClient(draConn),
97+
}
98+
grpc_health_v1.RegisterHealthServer(server, healthcheck)
99+
100+
healthcheck.wg.Add(1)
101+
go func() {
102+
defer healthcheck.wg.Done()
103+
log.Info("starting healthcheck service", "addr", lis.Addr().String())
104+
if err := server.Serve(lis); err != nil {
105+
log.Error(err, "failed to serve healthcheck service", "addr", addr)
106+
}
107+
}()
108+
109+
return healthcheck, nil
110+
}
111+
112+
func (h *healthcheck) Stop(logger klog.Logger) {
113+
if h.server != nil {
114+
logger.Info("stopping healthcheck service")
115+
h.server.GracefulStop()
116+
}
117+
h.wg.Wait()
118+
}
119+
120+
// Check implements [grpc_health_v1.HealthServer].
121+
func (h *healthcheck) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
122+
log := klog.FromContext(ctx)
123+
124+
knownServices := map[string]struct{}{"": {}, "liveness": {}}
125+
if _, known := knownServices[req.GetService()]; !known {
126+
return nil, status.Error(codes.NotFound, "unknown service")
127+
}
128+
129+
status := &grpc_health_v1.HealthCheckResponse{
130+
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
131+
}
132+
133+
info, err := h.regClient.GetInfo(ctx, &registerapi.InfoRequest{})
134+
if err != nil {
135+
log.Error(err, "failed to call GetInfo")
136+
return status, nil
137+
}
138+
log.V(5).Info("Successfully invoked GetInfo", "info", info)
139+
140+
_, err = h.draClient.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{})
141+
if err != nil {
142+
log.Error(err, "failed to call NodePrepareResources")
143+
return status, nil
144+
}
145+
log.V(5).Info("Successfully invoked NodePrepareResources")
146+
147+
status.Status = grpc_health_v1.HealthCheckResponse_SERVING
148+
return status, nil
149+
}

cmd/dra-example-kubeletplugin/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type Flags struct {
4848
numDevices int
4949
kubeletRegistrarDirectoryPath string
5050
kubeletPluginsDirectoryPath string
51+
healthcheckPort int
5152
}
5253

5354
type Config struct {
@@ -106,6 +107,13 @@ func newApp() *cli.App {
106107
Destination: &flags.kubeletPluginsDirectoryPath,
107108
EnvVars: []string{"KUBELET_PLUGINS_DIRECTORY_PATH"},
108109
},
110+
&cli.IntFlag{
111+
Name: "healthcheck-port",
112+
Usage: "Port to start a gRPC healthcheck service. When positive, a literal port number. When zero, a random port is allocated. When negative, the healthcheck service is disabled.",
113+
Value: -1,
114+
Destination: &flags.healthcheckPort,
115+
EnvVars: []string{"HEALTHCHECK_PORT"},
116+
},
109117
}
110118
cliFlags = append(cliFlags, flags.kubeClientConfig.Flags()...)
111119
cliFlags = append(cliFlags, flags.loggingConfig.Flags()...)
@@ -180,7 +188,7 @@ func RunPlugin(ctx context.Context, config *Config) error {
180188
logger.Error(err, "error from context")
181189
}
182190

183-
err = driver.Shutdown()
191+
err = driver.Shutdown(logger)
184192
if err != nil {
185193
logger.Error(err, "Unable to cleanly shutdown driver")
186194
}

deployments/helm/dra-example-driver/templates/kubeletplugin.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@ spec:
4545
command: ["dra-example-kubeletplugin"]
4646
resources:
4747
{{- toYaml .Values.kubeletPlugin.containers.plugin.resources | nindent 10 }}
48+
{{/*
49+
A literal "0" will allocate a random port. Don't configure the probe
50+
with the same literal "0" since that won't match where the service is
51+
actually running.
52+
*/}}
53+
{{- if (gt (int .Values.kubeletPlugin.containers.plugin.healthcheckPort) 0) }}
54+
livenessProbe:
55+
grpc:
56+
port: {{ .Values.kubeletPlugin.containers.plugin.healthcheckPort }}
57+
service: liveness
58+
failureThreshold: 3
59+
periodSeconds: 10
60+
{{- end }}
4861
env:
4962
- name: CDI_ROOT
5063
value: /var/run/cdi
@@ -63,6 +76,10 @@ spec:
6376
# Simulated number of devices the example driver will pretend to have.
6477
- name: NUM_DEVICES
6578
value: "9"
79+
{{- if .Values.kubeletPlugin.containers.plugin.healthcheckPort }}
80+
- name: HEALTHCHECK_PORT
81+
value: {{ .Values.kubeletPlugin.containers.plugin.healthcheckPort | quote }}
82+
{{- end }}
6683
volumeMounts:
6784
- name: plugins-registry
6885
mountPath: {{ .Values.kubeletPlugin.kubeletRegistrarDirectoryPath | quote }}

deployments/helm/dra-example-driver/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ kubeletPlugin:
6363
securityContext:
6464
privileged: true
6565
resources: {}
66+
# Port running a gRPC health service checked by a livenessProbe.
67+
# Set to a negative value to disable the service and the probe.
68+
healthcheckPort: 51515
6669

6770
webhook:
6871
enabled: false

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/spf13/pflag v1.0.5
1010
github.com/stretchr/testify v1.10.0
1111
github.com/urfave/cli/v2 v2.25.3
12+
google.golang.org/grpc v1.68.1
1213
k8s.io/api v0.33.0
1314
k8s.io/apimachinery v0.33.0
1415
k8s.io/client-go v0.33.0
@@ -72,7 +73,6 @@ require (
7273
golang.org/x/text v0.23.0 // indirect
7374
golang.org/x/time v0.9.0 // indirect
7475
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
75-
google.golang.org/grpc v1.68.1 // indirect
7676
google.golang.org/protobuf v1.36.5 // indirect
7777
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
7878
gopkg.in/inf.v0 v0.9.1 // indirect

0 commit comments

Comments
 (0)