Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cmd/dra-example-kubeletplugin/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package main
import (
"encoding/json"

"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
Copy link
Contributor

@pohly pohly Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k8s.io/kubernetes should not be imported. Any code inside it is considered internal and not meant for public consumption. There are some exceptions (most notably the scheduler framework for building custom schedulers), but not this one here.

It's not even a particularly good package. We had huge issues with figuring out how checksumming was meant to be used and what the purpose of checksumming was in the first place.

Can we perhaps use this opportunity to drop the dependency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @pohly

Can we perhaps use this opportunity to drop the dependency?

=> In terms of dependency, I'm on the same page. I will update PR to introduce simple checkpoint util to drop the dependency.
But for checksum do you mean we don't need checksum here? Seems there was some issues with dra_manager_state in the past. Or do you mean we need well designed checkpoint implementation along with checksum. Since it is example dra driver, maybe simple checkpointing without checksum is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only purpose of checksumming that I could imagine is to detect bit flips in the file. As a DRA driver author, is that important to you on top of whatever potential checksumming and error correcting the OS might do?

Are there other reasons for it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven’t thought about it seriously.
The checkpoint manager already existed when I was involved with resource managers in kubelet around 2019.

Seems, checksum is originated from PodSandbox checkpointer of dockershim.

Approaching conservatively, there seem to be a few possible cases:

  • The process could restart while writing a file (e.g., OOM, kill, restart).
  • The file might be corrupted unintentionally by a person or script.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process could restart while writing a file (e.g., OOM, kill, restart).

The usual approach is to write a temp file, sync, then rename. But I suppose a checksum is easier.

The file might be corrupted unintentionally by a person or script.

True, albeit a bit unlikely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update PR to introduce simple checkpoint util to drop the dependency.

Gentle reminder that this is pending.

/lgtm cancel

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @pohly
I will resume this PR soon. Before I do, I have a couple of quick questions:

  1. My understanding from your feedback is that checksums might be unnecessary here. Would implementing a simple checkpointing mechanism without checksums address this concern?

  2. Aside from that, do you have any additional suggestions for an ideal checkpointing mechanism?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine without a checkpoint checksum. For the "writing file fails" case I think the "write tmp file, sync, close, remove, rename" approach would be useful. I don't know about other existing mechanisms that could be used here.

cc @nojnhuh

"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)

type PreparedClaims map[string]PreparedDevices

type Checkpoint struct {
Checksum checksum.Checksum `json:"checksum"`
V1 *CheckpointV1 `json:"v1,omitempty"`
}

var _ checkpointmanager.Checkpoint = &Checkpoint{}

type CheckpointV1 struct {
PreparedClaims PreparedClaims `json:"preparedClaims,omitempty"`
}
Expand All @@ -25,6 +30,32 @@ func newCheckpoint() *Checkpoint {
return pc
}

func (cp *Checkpoint) GetPreparedDevices(claimUID string) PreparedDevices {
if cp.V1 == nil {
return nil
}
if devices, ok := cp.V1.PreparedClaims[claimUID]; ok {
return devices
}
return nil
}

func (cp *Checkpoint) AddPreparedDevices(claimUID string, pds PreparedDevices) {
if cp.V1 == nil {
return
}

cp.V1.PreparedClaims[claimUID] = pds
}

func (cp *Checkpoint) RemovePreparedDevices(claimUID string) {
if cp.V1 == nil {
return
}

delete(cp.V1.PreparedClaims, claimUID)
}

func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
cp.Checksum = 0
out, err := json.Marshal(*cp)
Expand Down
21 changes: 21 additions & 0 deletions cmd/dra-example-kubeletplugin/prepared_device.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
cdiapi "tags.cncf.io/container-device-interface/pkg/cdi"
)

type PreparedDevice struct {
drapbv1.Device
ContainerEdits *cdiapi.ContainerEdits
}

type PreparedDevices []*PreparedDevice

func (pds PreparedDevices) GetDevices() []*drapbv1.Device {
var devices []*drapbv1.Device
for _, pd := range pds {
devices = append(devices, &pd.Device)
}
return devices
}
34 changes: 10 additions & 24 deletions cmd/dra-example-kubeletplugin/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,14 @@ import (
)

type AllocatableDevices map[string]resourceapi.Device
type PreparedDevices []*PreparedDevice
type PreparedClaims map[string]PreparedDevices

type PerDeviceCDIContainerEdits map[string]*cdiapi.ContainerEdits

type OpaqueDeviceConfig struct {
Requests []string
Config runtime.Object
}

type PreparedDevice struct {
drapbv1.Device
ContainerEdits *cdiapi.ContainerEdits
}

func (pds PreparedDevices) GetDevices() []*drapbv1.Device {
var devices []*drapbv1.Device
for _, pd := range pds {
devices = append(devices, &pd.Device)
}
return devices
}

type DeviceState struct {
sync.Mutex
cdi *CDIHandler
Expand Down Expand Up @@ -119,10 +105,10 @@ func (s *DeviceState) Prepare(claim *resourceapi.ResourceClaim) ([]*drapbv1.Devi
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
return nil, fmt.Errorf("unable to sync from checkpoint: %v", err)
}
preparedClaims := checkpoint.V1.PreparedClaims

if preparedClaims[claimUID] != nil {
return preparedClaims[claimUID].GetDevices(), nil
preparedDevices := checkpoint.GetPreparedDevices(claimUID)
if preparedDevices != nil {
return preparedDevices.GetDevices(), nil
}

preparedDevices, err := s.prepareDevices(claim)
Expand All @@ -134,12 +120,12 @@ func (s *DeviceState) Prepare(claim *resourceapi.ResourceClaim) ([]*drapbv1.Devi
return nil, fmt.Errorf("unable to create CDI spec file for claim: %v", err)
}

preparedClaims[claimUID] = preparedDevices
checkpoint.AddPreparedDevices(claimUID, preparedDevices)
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
return nil, fmt.Errorf("unable to sync to checkpoint: %v", err)
}

return preparedClaims[claimUID].GetDevices(), nil
return preparedDevices.GetDevices(), nil
}

func (s *DeviceState) Unprepare(claimUID string) error {
Expand All @@ -150,13 +136,13 @@ func (s *DeviceState) Unprepare(claimUID string) error {
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
return fmt.Errorf("unable to sync from checkpoint: %v", err)
}
preparedClaims := checkpoint.V1.PreparedClaims

if preparedClaims[claimUID] == nil {
preparedDevices := checkpoint.GetPreparedDevices(claimUID)
if preparedDevices == nil {
return nil
}

if err := s.unprepareDevices(claimUID, preparedClaims[claimUID]); err != nil {
if err := s.unprepareDevices(claimUID, preparedDevices); err != nil {
return fmt.Errorf("unprepare failed: %v", err)
}

Expand All @@ -165,7 +151,7 @@ func (s *DeviceState) Unprepare(claimUID string) error {
return fmt.Errorf("unable to delete CDI spec file for claim: %v", err)
}

delete(preparedClaims, claimUID)
checkpoint.RemovePreparedDevices(claimUID)
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
return fmt.Errorf("unable to sync to checkpoint: %v", err)
}
Expand Down
Loading