Skip to content

feat: add mark health device function #1241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions internal/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type nvidiaDevicePlugin struct {

socket string
server *grpc.Server
health chan *rm.Device
health chan *rm.DeviceEvent
stop chan interface{}

imexChannels imex.Channels
Expand Down Expand Up @@ -105,7 +105,7 @@ func getPluginSocketPath(resource spec.ResourceName) string {

func (plugin *nvidiaDevicePlugin) initialize() {
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
plugin.health = make(chan *rm.Device)
plugin.health = make(chan *rm.DeviceEvent, 10)
plugin.stop = make(chan interface{})
}

Expand Down Expand Up @@ -272,11 +272,21 @@ func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.D
return nil
case d := <-plugin.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
return nil
if d.Event == rm.DeviceUnHalthy {
d.Device.Health = pluginapi.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.Device.ID)
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
return nil
}
}
if d.Event == rm.DeviceHealthy {
d.Device.Health = pluginapi.Healthy
klog.Infof("'%s' device marked healthy: %s", plugin.rm.Resource(), d.Device.ID)
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
return nil
}
}

}
}
}
Expand Down
12 changes: 12 additions & 0 deletions internal/rm/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ type Device struct {
Replicas int
}

type DeviceEventType int

const (
DeviceUnHalthy DeviceEventType = iota
DeviceHealthy
)

type DeviceEvent struct {
Device *Device
Event DeviceEventType
}

// deviceInfo defines the information the required to construct a Device
type deviceInfo interface {
GetUUID() (string, error)
Expand Down
72 changes: 58 additions & 14 deletions internal/rm/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ const (
// this is in addition to the Application errors that are already ignored.
envDisableHealthChecks = "DP_DISABLE_HEALTHCHECKS"
allHealthChecks = "xids"

nvmlEventTypeGpuRecoveryAction = 0x0000000000008000 // from https://docs.nvidia.com/deploy/nvml-api/group__nvmlEventType.html?

nvmlEventTypeGpuUnavailableError = 0x0000000000004000
)

// CheckHealth performs health checks on a set of devices, writing to the 'unhealthy' channel with any unhealthy devices
func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devices, unhealthy chan<- *Device) error {
func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devices, unhealthy chan<- *DeviceEvent) error {
disableHealthChecks := strings.ToLower(os.Getenv(envDisableHealthChecks))
if disableHealthChecks == "all" {
disableHealthChecks = allHealthChecks
Expand Down Expand Up @@ -92,12 +96,15 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
deviceIDToGiMap := make(map[string]uint32)
deviceIDToCiMap := make(map[string]uint32)

eventMask := uint64(nvml.EventTypeXidCriticalError | nvml.EventTypeDoubleBitEccError | nvml.EventTypeSingleBitEccError)
eventMask := uint64(nvml.EventTypeXidCriticalError | nvml.EventTypeDoubleBitEccError | nvml.EventTypeSingleBitEccError | nvmlEventTypeGpuUnavailableError | nvmlEventTypeGpuRecoveryAction)
for _, d := range devices {
uuid, gi, ci, err := r.getDevicePlacement(d)
if err != nil {
klog.Warningf("Could not determine device placement for %v: %v; Marking it unhealthy.", d.ID, err)
unhealthy <- d
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
continue
}
deviceIDToGiMap[d.ID] = gi
Expand All @@ -107,14 +114,20 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
gpu, ret := r.nvml.DeviceGetHandleByUUID(uuid)
if ret != nvml.SUCCESS {
klog.Infof("unable to get device handle from UUID: %v; marking it as unhealthy", ret)
unhealthy <- d
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
continue
}

supportedEvents, ret := gpu.GetSupportedEventTypes()
if ret != nvml.SUCCESS {
klog.Infof("unable to determine the supported events for %v: %v; marking it as unhealthy", d.ID, ret)
unhealthy <- d
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
continue
}

Expand All @@ -124,7 +137,10 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
}
if ret != nvml.SUCCESS {
klog.Infof("Marking device %v as unhealthy: %v", d.ID, ret)
unhealthy <- d
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
}
}

Expand All @@ -142,16 +158,14 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
if ret != nvml.SUCCESS {
klog.Infof("Error waiting for event: %v; Marking all devices as unhealthy", ret)
for _, d := range devices {
unhealthy <- d
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
}
continue
}

if e.EventType != nvml.EventTypeXidCriticalError {
klog.Infof("Skipping non-nvmlEventTypeXidCriticalError event: %+v", e)
continue
}

if skippedXids[e.EventData] {
klog.Infof("Skipping event %+v", e)
continue
Expand All @@ -163,7 +177,10 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
// If we cannot reliably determine the device UUID, we mark all devices as unhealthy.
klog.Infof("Failed to determine uuid for event %v: %v; Marking all devices as unhealthy.", e, ret)
for _, d := range devices {
unhealthy <- d
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
}
continue
}
Expand All @@ -173,6 +190,30 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
klog.Infof("Ignoring event for unexpected device: %v", eventUUID)
continue
}
// nvmlEventTypeRecovery is a special case, where we mark the device as healthy.
if e.EventType == nvmlEventTypeGpuRecoveryAction {
klog.Infof("Gpu recovery event: %+v", e)
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceHealthy,
}
continue

}

if e.EventType == nvmlEventTypeGpuUnavailableError {
klog.Infof("Gpu unavailable event: %+v", e)
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
continue
}

if e.EventType != nvml.EventTypeXidCriticalError {
klog.Infof("Skipping non-nvmlEventTypeXidCriticalError event: %+v", e)
continue
}

if d.IsMigDevice() && e.GpuInstanceId != 0xFFFFFFFF && e.ComputeInstanceId != 0xFFFFFFFF {
gi := deviceIDToGiMap[d.ID]
Expand All @@ -184,7 +225,10 @@ func (r *nvmlResourceManager) checkHealth(stop <-chan interface{}, devices Devic
}

klog.Infof("XidCriticalError: Xid=%d on Device=%s; marking device as unhealthy.", e.EventData, d.ID)
unhealthy <- d
unhealthy <- &DeviceEvent{
Device: d,
Event: DeviceUnHalthy,
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/rm/nvml_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *nvmlResourceManager) GetDevicePaths(ids []string) []string {
}

// CheckHealth performs health checks on a set of devices, writing to the 'unhealthy' channel with any unhealthy devices
func (r *nvmlResourceManager) CheckHealth(stop <-chan interface{}, unhealthy chan<- *Device) error {
func (r *nvmlResourceManager) CheckHealth(stop <-chan interface{}, unhealthy chan<- *DeviceEvent) error {
return r.checkHealth(stop, r.devices, unhealthy)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/rm/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ResourceManager interface {
Devices() Devices
GetDevicePaths([]string) []string
GetPreferredAllocation(available, required []string, size int) ([]string, error)
CheckHealth(stop <-chan interface{}, unhealthy chan<- *Device) error
CheckHealth(stop <-chan interface{}, unhealthy chan<- *DeviceEvent) error
ValidateRequest(AnnotatedIDs) error
}

Expand Down
2 changes: 1 addition & 1 deletion internal/rm/tegra_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ func (r *tegraResourceManager) GetDevicePaths(ids []string) []string {
}

// CheckHealth is disabled for the tegraResourceManager
func (r *tegraResourceManager) CheckHealth(stop <-chan interface{}, unhealthy chan<- *Device) error {
func (r *tegraResourceManager) CheckHealth(stop <-chan interface{}, unhealthy chan<- *DeviceEvent) error {
return nil
}