From ccac7d7d0bb2f9d349742260291b54f0de68424e Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Tue, 23 Dec 2025 11:42:14 -0500 Subject: [PATCH 1/7] Add snap / restore for qemu --- lib/hypervisor/qemu/process.go | 135 ++++++++++++++++++++++++++++++++- lib/hypervisor/qemu/qemu.go | 41 ++++++++-- lib/hypervisor/qemu/qmp.go | 57 ++++++++++++++ lib/instances/qemu_test.go | 122 +++++++++++++++++++++++++++++ lib/instances/standby.go | 20 ++--- 5 files changed, 353 insertions(+), 22 deletions(-) diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index a0ebfb90..acc70a01 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -3,6 +3,7 @@ package qemu import ( "context" + "encoding/json" "fmt" "net" "os" @@ -168,15 +169,145 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s return 0, nil, fmt.Errorf("create client: %w", err) } + // Save config for potential restore later + // QEMU migration files only contain memory state, not device config + if err := saveVMConfig(instanceDir, config); err != nil { + // Non-fatal - restore just won't work + // Log would be nice but we don't have logger here + } + // Success - release cleanup to prevent killing the process cu.Release() return pid, hv, nil } // RestoreVM starts QEMU and restores VM state from a snapshot. -// Not yet implemented for QEMU. +// The VM is in paused state after restore; caller should call Resume() to continue execution. func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (int, hypervisor.Hypervisor, error) { - return 0, nil, fmt.Errorf("restore not supported by QEMU implementation") + // Get binary path + binaryPath, err := s.GetBinaryPath(p, version) + if err != nil { + return 0, nil, fmt.Errorf("get binary: %w", err) + } + + // Check if socket is already in use + if isSocketInUse(socketPath) { + return 0, nil, fmt.Errorf("socket already in use, QEMU may be running at %s", socketPath) + } + + // Remove stale socket if exists + os.Remove(socketPath) + + // Load saved VM config from snapshot directory + // QEMU requires exact same command-line args as when snapshot was taken + config, err := loadVMConfig(snapshotPath) + if err != nil { + return 0, nil, fmt.Errorf("load vm config from snapshot: %w", err) + } + + instanceDir := filepath.Dir(socketPath) + + // Build command arguments: QMP socket + VM configuration + incoming migration + args := []string{ + "-chardev", fmt.Sprintf("socket,id=qmp,path=%s,server=on,wait=off", socketPath), + "-mon", "chardev=qmp,mode=control", + } + // Append VM configuration as command-line arguments + args = append(args, BuildArgs(config)...) + + // Add incoming migration flag to restore from snapshot + // The snapshot file is named "memory" in the snapshot directory + incomingURI := "file://" + filepath.Join(snapshotPath, "memory") + args = append(args, "-incoming", incomingURI) + + // Create command + cmd := exec.Command(binaryPath, args...) + + // Daemonize: detach from parent process group + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + // Redirect stdout/stderr to VMM log file + logsDir := filepath.Join(instanceDir, "logs") + if err := os.MkdirAll(logsDir, 0755); err != nil { + return 0, nil, fmt.Errorf("create logs directory: %w", err) + } + + vmmLogFile, err := os.OpenFile( + filepath.Join(logsDir, "vmm.log"), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0644, + ) + if err != nil { + return 0, nil, fmt.Errorf("create vmm log: %w", err) + } + defer vmmLogFile.Close() + + cmd.Stdout = vmmLogFile + cmd.Stderr = vmmLogFile + + if err := cmd.Start(); err != nil { + return 0, nil, fmt.Errorf("start qemu: %w", err) + } + + pid := cmd.Process.Pid + + // Setup cleanup to kill the process if subsequent steps fail + cu := cleanup.Make(func() { + syscall.Kill(pid, syscall.SIGKILL) + }) + defer cu.Clean() + + // Wait for socket to be ready + if err := waitForSocket(socketPath, 10*time.Second); err != nil { + vmmLogPath := filepath.Join(logsDir, "vmm.log") + if logData, readErr := os.ReadFile(vmmLogPath); readErr == nil && len(logData) > 0 { + return 0, nil, fmt.Errorf("%w; vmm.log: %s", err, string(logData)) + } + return 0, nil, err + } + + // Create QMP client + hv, err := New(socketPath) + if err != nil { + return 0, nil, fmt.Errorf("create client: %w", err) + } + + // Success - release cleanup to prevent killing the process + cu.Release() + return pid, hv, nil +} + +// vmConfigFile is the name of the file where VM config is saved for restore. +const vmConfigFile = "qemu-config.json" + +// saveVMConfig saves the VM configuration to a file in the instance directory. +// This is needed for QEMU restore since migration files only contain memory state. +func saveVMConfig(instanceDir string, config hypervisor.VMConfig) error { + configPath := filepath.Join(instanceDir, vmConfigFile) + data, err := json.MarshalIndent(config, "", " ") + if err != nil { + return fmt.Errorf("marshal config: %w", err) + } + if err := os.WriteFile(configPath, data, 0644); err != nil { + return fmt.Errorf("write config: %w", err) + } + return nil +} + +// loadVMConfig loads the VM configuration from the instance directory. +func loadVMConfig(instanceDir string) (hypervisor.VMConfig, error) { + configPath := filepath.Join(instanceDir, vmConfigFile) + data, err := os.ReadFile(configPath) + if err != nil { + return hypervisor.VMConfig{}, fmt.Errorf("read config: %w", err) + } + var config hypervisor.VMConfig + if err := json.Unmarshal(data, &config); err != nil { + return hypervisor.VMConfig{}, fmt.Errorf("unmarshal config: %w", err) + } + return config, nil } // qemuBinaryName returns the QEMU binary name for the host architecture. diff --git a/lib/hypervisor/qemu/qemu.go b/lib/hypervisor/qemu/qemu.go index 10ed70ee..bf2b2049 100644 --- a/lib/hypervisor/qemu/qemu.go +++ b/lib/hypervisor/qemu/qemu.go @@ -3,6 +3,8 @@ package qemu import ( "context" "fmt" + "os" + "path/filepath" "time" "github.com/digitalocean/go-qemu/qemu" @@ -36,8 +38,8 @@ var _ hypervisor.Hypervisor = (*QEMU)(nil) // Capabilities returns the features supported by QEMU. func (q *QEMU) Capabilities() hypervisor.Capabilities { return hypervisor.Capabilities{ - SupportsSnapshot: false, // Not implemented in first pass - SupportsHotplugMemory: false, // Not implemented in first pass + SupportsSnapshot: true, // Uses QMP migrate file:// for snapshot + SupportsHotplugMemory: false, // Not implemented - balloon not configured SupportsPause: true, SupportsVsock: true, SupportsGPUPassthrough: true, @@ -119,10 +121,39 @@ func (q *QEMU) Resume(ctx context.Context) error { return nil } -// Snapshot creates a VM snapshot. -// Not implemented in first pass. +// Snapshot creates a VM snapshot using QEMU's migrate-to-file mechanism. +// The VM state is saved to destPath/memory file. +// The VM config is copied to destPath for restore (QEMU requires exact arg match). func (q *QEMU) Snapshot(ctx context.Context, destPath string) error { - return fmt.Errorf("snapshot not supported by QEMU implementation") + // QEMU uses migrate to file for snapshots + // The file URI must be absolute path + uri := "file://" + destPath + "/memory" + if err := q.client.Migrate(uri); err != nil { + Remove(q.socketPath) + return fmt.Errorf("migrate: %w", err) + } + + // Wait for migration to complete + if err := q.client.WaitMigration(ctx, 30*time.Second); err != nil { + Remove(q.socketPath) + return fmt.Errorf("wait migration: %w", err) + } + + // Copy VM config from instance dir to snapshot dir + // QEMU restore requires exact same command-line args as when snapshot was taken + instanceDir := filepath.Dir(q.socketPath) + srcConfig := filepath.Join(instanceDir, vmConfigFile) + dstConfig := filepath.Join(destPath, vmConfigFile) + + configData, err := os.ReadFile(srcConfig) + if err != nil { + return fmt.Errorf("read vm config for snapshot: %w", err) + } + if err := os.WriteFile(dstConfig, configData, 0644); err != nil { + return fmt.Errorf("write vm config to snapshot: %w", err) + } + + return nil } // ResizeMemory changes the VM's memory allocation. diff --git a/lib/hypervisor/qemu/qmp.go b/lib/hypervisor/qemu/qmp.go index 155ef1f4..54420900 100644 --- a/lib/hypervisor/qemu/qmp.go +++ b/lib/hypervisor/qemu/qmp.go @@ -1,6 +1,7 @@ package qemu import ( + "context" "fmt" "time" @@ -94,3 +95,59 @@ func (c *Client) Events() (chan qmp.Event, chan struct{}, error) { func (c *Client) Run(cmd qmp.Command) ([]byte, error) { return c.domain.Run(cmd) } + +// Migrate initiates a migration to the given URI (typically "file:///path"). +// This is used for saving VM state to a file for snapshot/standby. +func (c *Client) Migrate(uri string) error { + // Migrate(uri, blk, inc, detach) - we use nil for optional params + return c.raw.Migrate(uri, nil, nil, nil) +} + +// QueryMigration returns the current migration status. +func (c *Client) QueryMigration() (raw.MigrationInfo, error) { + return c.raw.QueryMigrate() +} + +// WaitMigration polls until migration completes or times out. +// Returns nil if migration completed successfully, error otherwise. +func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + pollInterval := 50 * time.Millisecond + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + info, err := c.QueryMigration() + if err != nil { + return fmt.Errorf("query migration: %w", err) + } + + // Check migration status (Status is a pointer in MigrationInfo) + if info.Status == nil { + // Status not available yet, continue polling + time.Sleep(pollInterval) + continue + } + + switch *info.Status { + case raw.MigrationStatusCompleted: + return nil + case raw.MigrationStatusFailed: + return fmt.Errorf("migration failed") + case raw.MigrationStatusCancelled: + return fmt.Errorf("migration cancelled") + case raw.MigrationStatusActive, raw.MigrationStatusSetup, raw.MigrationStatusPreSwitchover, raw.MigrationStatusDevice: + // Still in progress, continue polling + default: + // Unknown or "none" status - might not have started yet + } + + time.Sleep(pollInterval) + } + + return fmt.Errorf("migration timeout after %v", timeout) +} diff --git a/lib/instances/qemu_test.go b/lib/instances/qemu_test.go index b7f5a36b..31e3045e 100644 --- a/lib/instances/qemu_test.go +++ b/lib/instances/qemu_test.go @@ -536,3 +536,125 @@ func TestQEMUBasicEndToEnd(t *testing.T) { t.Log("QEMU instance lifecycle test complete!") } + +// TestQEMUStandbyAndRestore tests the standby/restore cycle with QEMU. +// This tests QEMU's migrate-to-file snapshot mechanism. +func TestQEMUStandbyAndRestore(t *testing.T) { + // Require KVM access + if _, err := os.Stat("/dev/kvm"); os.IsNotExist(err) { + t.Fatal("/dev/kvm not available - ensure KVM is enabled and user is in 'kvm' group (sudo usermod -aG kvm $USER)") + } + + // Require QEMU to be installed + starter := qemu.NewStarter() + if _, err := starter.GetBinaryPath(nil, ""); err != nil { + t.Fatalf("QEMU not available: %v", err) + } + + manager, tmpDir := setupTestManagerForQEMU(t) + ctx := context.Background() + p := paths.New(tmpDir) + + // Get the image manager for image operations + imageManager, err := images.NewManager(p, 1, nil) + require.NoError(t, err) + + // Pull nginx image + t.Log("Pulling nginx:alpine image...") + nginxImage, err := imageManager.CreateImage(ctx, images.CreateImageRequest{ + Name: "docker.io/library/nginx:alpine", + }) + require.NoError(t, err) + + // Wait for image to be ready + t.Log("Waiting for image build to complete...") + imageName := nginxImage.Name + for i := 0; i < 60; i++ { + img, err := imageManager.GetImage(ctx, imageName) + if err == nil && img.Status == images.StatusReady { + nginxImage = img + break + } + if err == nil && img.Status == images.StatusFailed { + t.Fatalf("Image build failed: %s", *img.Error) + } + time.Sleep(1 * time.Second) + } + require.Equal(t, images.StatusReady, nginxImage.Status, "Image should be ready after 60 seconds") + t.Log("Nginx image ready") + + // Ensure system files + systemManager := system.NewManager(p) + t.Log("Ensuring system files...") + err = systemManager.EnsureSystemFiles(ctx) + require.NoError(t, err) + t.Log("System files ready") + + // Create instance with QEMU hypervisor (no network for simpler test) + req := CreateInstanceRequest{ + Name: "test-qemu-standby", + Image: "docker.io/library/nginx:alpine", + Size: 2 * 1024 * 1024 * 1024, // 2GB + HotplugSize: 512 * 1024 * 1024, // 512MB (unused by QEMU) + OverlaySize: 10 * 1024 * 1024 * 1024, // 10GB + Vcpus: 1, + NetworkEnabled: false, // No network for simpler standby test + Hypervisor: hypervisor.TypeQEMU, + Env: map[string]string{}, + } + + t.Log("Creating QEMU instance...") + inst, err := manager.CreateInstance(ctx, req) + require.NoError(t, err) + require.NotNil(t, inst) + assert.Equal(t, StateRunning, inst.State) + assert.Equal(t, hypervisor.TypeQEMU, inst.HypervisorType) + t.Logf("Instance created: %s (hypervisor: %s)", inst.Id, inst.HypervisorType) + + // Wait for VM to be fully running before standby + err = waitForQEMUReady(ctx, inst.SocketPath, 10*time.Second) + require.NoError(t, err, "QEMU VM should reach running state") + + // Standby instance + t.Log("Standing by instance...") + inst, err = manager.StandbyInstance(ctx, inst.Id) + require.NoError(t, err) + assert.Equal(t, StateStandby, inst.State) + assert.True(t, inst.HasSnapshot) + t.Log("Instance in standby") + + // Verify snapshot exists + snapshotDir := p.InstanceSnapshotLatest(inst.Id) + assert.DirExists(t, snapshotDir) + assert.FileExists(t, filepath.Join(snapshotDir, "memory"), "QEMU snapshot memory file should exist") + assert.FileExists(t, filepath.Join(snapshotDir, "qemu-config.json"), "QEMU config should be saved in snapshot") + + // Log snapshot files + t.Log("Snapshot files:") + entries, _ := os.ReadDir(snapshotDir) + for _, entry := range entries { + info, _ := entry.Info() + t.Logf(" - %s (size: %d bytes)", entry.Name(), info.Size()) + } + + // Restore instance + t.Log("Restoring instance...") + inst, err = manager.RestoreInstance(ctx, inst.Id) + require.NoError(t, err) + assert.Equal(t, StateRunning, inst.State) + t.Log("Instance restored and running") + + // Wait for VM to be running again + err = waitForQEMUReady(ctx, inst.SocketPath, 10*time.Second) + require.NoError(t, err, "QEMU VM should reach running state after restore") + + // Cleanup + t.Log("Cleaning up...") + err = manager.DeleteInstance(ctx, inst.Id) + require.NoError(t, err) + + // Verify cleanup + assert.NoDirExists(t, p.InstanceDir(inst.Id)) + + t.Log("QEMU standby/restore test complete!") +} diff --git a/lib/instances/standby.go b/lib/instances/standby.go index 72ce467c..c09e6a33 100644 --- a/lib/instances/standby.go +++ b/lib/instances/standby.go @@ -71,24 +71,14 @@ func (m *manager) standbyInstance( return nil, fmt.Errorf("hypervisor %s does not support standby (snapshots)", stored.HypervisorType) } - // 6. Reduce memory to base size (virtio-mem hotplug) if supported - // Wait for memory to stabilize so the snapshot is as small as possible - if hv.Capabilities().SupportsHotplugMemory { - log.DebugContext(ctx, "reducing VM memory before snapshot", "instance_id", id, "base_size", inst.Size) - if err := hv.ResizeMemoryAndWait(ctx, inst.Size, 5*time.Second); err != nil { - // Log warning but continue - snapshot will just be larger - log.WarnContext(ctx, "failed to reduce memory, snapshot will be larger", "instance_id", id, "error", err) - } - } - - // 7. Transition: Running → Paused + // 6. Transition: Running → Paused log.DebugContext(ctx, "pausing VM", "instance_id", id) if err := hv.Pause(ctx); err != nil { log.ErrorContext(ctx, "failed to pause VM", "instance_id", id, "error", err) return nil, fmt.Errorf("pause vm failed: %w", err) } - // 8. Create snapshot + // 7. Create snapshot snapshotDir := m.paths.InstanceSnapshotLatest(id) log.DebugContext(ctx, "creating snapshot", "instance_id", id, "snapshot_dir", snapshotDir) if err := createSnapshot(ctx, hv, snapshotDir); err != nil { @@ -98,14 +88,14 @@ func (m *manager) standbyInstance( return nil, fmt.Errorf("create snapshot: %w", err) } - // 9. Stop VMM gracefully (snapshot is complete) + // 8. Stop VMM gracefully (snapshot is complete) log.DebugContext(ctx, "shutting down hypervisor", "instance_id", id) if err := m.shutdownHypervisor(ctx, &inst); err != nil { // Log but continue - snapshot was created successfully log.WarnContext(ctx, "failed to shutdown hypervisor gracefully, snapshot still valid", "instance_id", id, "error", err) } - // 10. Release network allocation (delete TAP device) + // 9. Release network allocation (delete TAP device) // TAP devices with explicit Owner/Group fields do NOT auto-delete when VMM exits // They must be explicitly deleted if inst.NetworkEnabled { @@ -116,7 +106,7 @@ func (m *manager) standbyInstance( } } - // 11. Update timestamp and clear PID (hypervisor no longer running) + // 10. Update timestamp and clear PID (hypervisor no longer running) now := time.Now() stored.StoppedAt = &now stored.HypervisorPID = nil From f536cd2e7cd4c930373449c3657aac149ef17677 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Tue, 23 Dec 2025 13:10:20 -0500 Subject: [PATCH 2/7] Fix restore on qemu, test passing --- lib/hypervisor/qemu/process.go | 47 ++++++++++++++++++++++++++++++++-- lib/hypervisor/qemu/qemu.go | 5 ++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index acc70a01..057ca1c1 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/digitalocean/go-qemu/qmp/raw" "github.com/onkernel/hypeman/lib/hypervisor" "github.com/onkernel/hypeman/lib/paths" "gvisor.dev/gvisor/pkg/cleanup" @@ -216,8 +217,9 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, args = append(args, BuildArgs(config)...) // Add incoming migration flag to restore from snapshot - // The snapshot file is named "memory" in the snapshot directory - incomingURI := "file://" + filepath.Join(snapshotPath, "memory") + // The "file:" protocol is deprecated in QEMU 7.2+, use "exec:cat < path" instead + memoryFile := filepath.Join(snapshotPath, "memory") + incomingURI := "exec:cat < " + memoryFile args = append(args, "-incoming", incomingURI) // Create command @@ -274,11 +276,52 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, return 0, nil, fmt.Errorf("create client: %w", err) } + // Wait for incoming migration to complete + // QEMU loads the migration data from the exec subprocess + // After loading, VM is in paused state and ready for 'cont' + if err := waitForMigrationComplete(hv.client, 30*time.Second); err != nil { + return 0, nil, fmt.Errorf("wait for migration: %w", err) + } + // Success - release cleanup to prevent killing the process cu.Release() return pid, hv, nil } +// waitForMigrationComplete waits for incoming migration to finish loading +func waitForMigrationComplete(client *Client, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + info, err := client.QueryMigration() + if err != nil { + // Ignore errors during migration + time.Sleep(100 * time.Millisecond) + continue + } + + if info.Status == nil { + // No migration status yet, might be loading + time.Sleep(100 * time.Millisecond) + continue + } + + switch *info.Status { + case raw.MigrationStatusCompleted: + return nil + case raw.MigrationStatusFailed: + return fmt.Errorf("migration failed") + case raw.MigrationStatusCancelled: + return fmt.Errorf("migration cancelled") + case raw.MigrationStatusNone: + // No active migration - incoming may have completed + return nil + } + + time.Sleep(100 * time.Millisecond) + } + return fmt.Errorf("migration timeout") +} + // vmConfigFile is the name of the file where VM config is saved for restore. const vmConfigFile = "qemu-config.json" diff --git a/lib/hypervisor/qemu/qemu.go b/lib/hypervisor/qemu/qemu.go index bf2b2049..f63a71b9 100644 --- a/lib/hypervisor/qemu/qemu.go +++ b/lib/hypervisor/qemu/qemu.go @@ -126,8 +126,9 @@ func (q *QEMU) Resume(ctx context.Context) error { // The VM config is copied to destPath for restore (QEMU requires exact arg match). func (q *QEMU) Snapshot(ctx context.Context, destPath string) error { // QEMU uses migrate to file for snapshots - // The file URI must be absolute path - uri := "file://" + destPath + "/memory" + // The "file:" protocol is deprecated in QEMU 7.2+, use "exec:cat > path" instead + memoryFile := destPath + "/memory" + uri := "exec:cat > " + memoryFile if err := q.client.Migrate(uri); err != nil { Remove(q.socketPath) return fmt.Errorf("migrate: %w", err) From 09af36ce7d1df2413cbf8d804623eb2e3372a554 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Tue, 23 Dec 2025 13:16:53 -0500 Subject: [PATCH 3/7] More observability --- lib/hypervisor/cloudhypervisor/process.go | 10 ++++++++ lib/hypervisor/qemu/process.go | 13 ++++++++++ lib/instances/restore.go | 29 ++++++++++++++++++++++- 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/lib/hypervisor/cloudhypervisor/process.go b/lib/hypervisor/cloudhypervisor/process.go index bd48718b..c2217560 100644 --- a/lib/hypervisor/cloudhypervisor/process.go +++ b/lib/hypervisor/cloudhypervisor/process.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "syscall" + "time" "github.com/onkernel/hypeman/lib/hypervisor" + "github.com/onkernel/hypeman/lib/logger" "github.com/onkernel/hypeman/lib/paths" "github.com/onkernel/hypeman/lib/vmm" "gvisor.dev/gvisor/pkg/cleanup" @@ -100,6 +102,9 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s // RestoreVM starts Cloud Hypervisor and restores VM state from a snapshot. // The VM is in paused state after restore; caller should call Resume() to continue execution. func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (int, hypervisor.Hypervisor, error) { + log := logger.FromContext(ctx) + startTime := time.Now() + // Validate version chVersion := vmm.CHVersion(version) if !vmm.IsVersionSupported(chVersion) { @@ -107,10 +112,12 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, } // 1. Start the Cloud Hypervisor process + processStartTime := time.Now() pid, err := vmm.StartProcess(ctx, p, chVersion, socketPath) if err != nil { return 0, nil, fmt.Errorf("start process: %w", err) } + log.DebugContext(ctx, "CH process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds()) // Setup cleanup to kill the process if subsequent steps fail cu := cleanup.Make(func() { @@ -125,6 +132,7 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, } // 3. Restore from snapshot via HTTP API + restoreAPIStart := time.Now() sourceURL := "file://" + snapshotPath restoreConfig := vmm.RestoreConfig{ SourceUrl: sourceURL, @@ -137,9 +145,11 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, if resp.StatusCode() != 204 { return 0, nil, fmt.Errorf("restore failed with status %d: %s", resp.StatusCode(), string(resp.Body)) } + log.DebugContext(ctx, "CH restore API complete", "duration_ms", time.Since(restoreAPIStart).Milliseconds()) // Success - release cleanup to prevent killing the process cu.Release() + log.DebugContext(ctx, "CH restore complete", "pid", pid, "total_duration_ms", time.Since(startTime).Milliseconds()) return pid, hv, nil } diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index 057ca1c1..1a4ef1b7 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -16,6 +16,7 @@ import ( "github.com/digitalocean/go-qemu/qmp/raw" "github.com/onkernel/hypeman/lib/hypervisor" + "github.com/onkernel/hypeman/lib/logger" "github.com/onkernel/hypeman/lib/paths" "gvisor.dev/gvisor/pkg/cleanup" ) @@ -185,6 +186,9 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s // RestoreVM starts QEMU and restores VM state from a snapshot. // The VM is in paused state after restore; caller should call Resume() to continue execution. func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, socketPath string, snapshotPath string) (int, hypervisor.Hypervisor, error) { + log := logger.FromContext(ctx) + startTime := time.Now() + // Get binary path binaryPath, err := s.GetBinaryPath(p, version) if err != nil { @@ -201,10 +205,12 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, // Load saved VM config from snapshot directory // QEMU requires exact same command-line args as when snapshot was taken + configLoadStart := time.Now() config, err := loadVMConfig(snapshotPath) if err != nil { return 0, nil, fmt.Errorf("load vm config from snapshot: %w", err) } + log.DebugContext(ctx, "loaded VM config from snapshot", "duration_ms", time.Since(configLoadStart).Milliseconds()) instanceDir := filepath.Dir(socketPath) @@ -249,11 +255,13 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, cmd.Stdout = vmmLogFile cmd.Stderr = vmmLogFile + processStartTime := time.Now() if err := cmd.Start(); err != nil { return 0, nil, fmt.Errorf("start qemu: %w", err) } pid := cmd.Process.Pid + log.DebugContext(ctx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds()) // Setup cleanup to kill the process if subsequent steps fail cu := cleanup.Make(func() { @@ -262,6 +270,7 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, defer cu.Clean() // Wait for socket to be ready + socketWaitStart := time.Now() if err := waitForSocket(socketPath, 10*time.Second); err != nil { vmmLogPath := filepath.Join(logsDir, "vmm.log") if logData, readErr := os.ReadFile(vmmLogPath); readErr == nil && len(logData) > 0 { @@ -269,6 +278,7 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, } return 0, nil, err } + log.DebugContext(ctx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds()) // Create QMP client hv, err := New(socketPath) @@ -279,12 +289,15 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, // Wait for incoming migration to complete // QEMU loads the migration data from the exec subprocess // After loading, VM is in paused state and ready for 'cont' + migrationWaitStart := time.Now() if err := waitForMigrationComplete(hv.client, 30*time.Second); err != nil { return 0, nil, fmt.Errorf("wait for migration: %w", err) } + log.DebugContext(ctx, "migration complete", "duration_ms", time.Since(migrationWaitStart).Milliseconds()) // Success - release cleanup to prevent killing the process cu.Release() + log.DebugContext(ctx, "QEMU restore complete", "pid", pid, "total_duration_ms", time.Since(startTime).Milliseconds()) return pid, hv, nil } diff --git a/lib/instances/restore.go b/lib/instances/restore.go index 5f7af9b6..69590b56 100644 --- a/lib/instances/restore.go +++ b/lib/instances/restore.go @@ -56,16 +56,33 @@ func (m *manager) restoreInstance( // 4. Recreate TAP device if network enabled if stored.NetworkEnabled { + var networkSpan trace.Span + if m.metrics != nil && m.metrics.tracer != nil { + ctx, networkSpan = m.metrics.tracer.Start(ctx, "RestoreNetwork") + } log.DebugContext(ctx, "recreating network for restore", "instance_id", id, "network", "default") if err := m.networkManager.RecreateAllocation(ctx, id); err != nil { + if networkSpan != nil { + networkSpan.End() + } log.ErrorContext(ctx, "failed to recreate network", "instance_id", id, "error", err) return nil, fmt.Errorf("recreate network: %w", err) } + if networkSpan != nil { + networkSpan.End() + } } // 5. Transition: Standby → Paused (start hypervisor + restore) - log.DebugContext(ctx, "restoring from snapshot", "instance_id", id, "snapshot_dir", snapshotDir) + var restoreSpan trace.Span + if m.metrics != nil && m.metrics.tracer != nil { + ctx, restoreSpan = m.metrics.tracer.Start(ctx, "RestoreFromSnapshot") + } + log.DebugContext(ctx, "restoring from snapshot", "instance_id", id, "snapshot_dir", snapshotDir, "hypervisor", stored.HypervisorType) pid, hv, err := m.restoreFromSnapshot(ctx, stored, snapshotDir) + if restoreSpan != nil { + restoreSpan.End() + } if err != nil { log.ErrorContext(ctx, "failed to restore from snapshot", "instance_id", id, "error", err) // Cleanup network on failure @@ -80,8 +97,15 @@ func (m *manager) restoreInstance( stored.HypervisorPID = &pid // 6. Transition: Paused → Running (resume) + var resumeSpan trace.Span + if m.metrics != nil && m.metrics.tracer != nil { + ctx, resumeSpan = m.metrics.tracer.Start(ctx, "ResumeVM") + } log.DebugContext(ctx, "resuming VM", "instance_id", id) if err := hv.Resume(ctx); err != nil { + if resumeSpan != nil { + resumeSpan.End() + } log.ErrorContext(ctx, "failed to resume VM", "instance_id", id, "error", err) // Cleanup on failure hv.Shutdown(ctx) @@ -91,6 +115,9 @@ func (m *manager) restoreInstance( } return nil, fmt.Errorf("resume vm failed: %w", err) } + if resumeSpan != nil { + resumeSpan.End() + } // 8. Delete snapshot after successful restore log.DebugContext(ctx, "deleting snapshot after successful restore", "instance_id", id) From 6b7dabd9b0f1c87d77dbde75c6c10bb64c727423 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Tue, 23 Dec 2025 14:00:35 -0500 Subject: [PATCH 4/7] Move sleeps --- lib/hypervisor/qemu/process.go | 38 ++++++++++++++++++++++++++-------- lib/hypervisor/qemu/qemu.go | 2 +- lib/hypervisor/qemu/qmp.go | 16 ++++++++++---- 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index 1a4ef1b7..e0200735 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -21,6 +21,24 @@ import ( "gvisor.dev/gvisor/pkg/cleanup" ) +// Timeout constants for QEMU operations +const ( + // socketWaitTimeout is how long to wait for QMP socket to become available after process start + socketWaitTimeout = 10 * time.Second + + // migrationTimeout is how long to wait for incoming migration to complete during restore + migrationTimeout = 30 * time.Second + + // migrationPollInterval is how often to poll migration status + migrationPollInterval = 50 * time.Millisecond + + // socketPollInterval is how often to check if socket is ready + socketPollInterval = 50 * time.Millisecond + + // socketDialTimeout is timeout for individual socket connection attempts + socketDialTimeout = 100 * time.Millisecond +) + func init() { hypervisor.RegisterSocketName(hypervisor.TypeQEMU, "qemu.sock") } @@ -94,6 +112,8 @@ func (s *Starter) GetVersion(p *paths.Paths) (string, error) { // StartVM launches QEMU with the VM configuration and returns a Hypervisor client. // QEMU receives all configuration via command-line arguments at process start. func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, socketPath string, config hypervisor.VMConfig) (int, hypervisor.Hypervisor, error) { + log := logger.FromContext(ctx) + // Get binary path binaryPath, err := s.GetBinaryPath(p, version) if err != nil { @@ -157,7 +177,7 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s defer cu.Clean() // Wait for socket to be ready - if err := waitForSocket(socketPath, 10*time.Second); err != nil { + if err := waitForSocket(socketPath, socketWaitTimeout); err != nil { vmmLogPath := filepath.Join(logsDir, "vmm.log") if logData, readErr := os.ReadFile(vmmLogPath); readErr == nil && len(logData) > 0 { return 0, nil, fmt.Errorf("%w; vmm.log: %s", err, string(logData)) @@ -175,7 +195,7 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s // QEMU migration files only contain memory state, not device config if err := saveVMConfig(instanceDir, config); err != nil { // Non-fatal - restore just won't work - // Log would be nice but we don't have logger here + log.WarnContext(ctx, "failed to save VM config for restore", "error", err) } // Success - release cleanup to prevent killing the process @@ -290,7 +310,7 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, // QEMU loads the migration data from the exec subprocess // After loading, VM is in paused state and ready for 'cont' migrationWaitStart := time.Now() - if err := waitForMigrationComplete(hv.client, 30*time.Second); err != nil { + if err := waitForMigrationComplete(hv.client, migrationTimeout); err != nil { return 0, nil, fmt.Errorf("wait for migration: %w", err) } log.DebugContext(ctx, "migration complete", "duration_ms", time.Since(migrationWaitStart).Milliseconds()) @@ -308,13 +328,13 @@ func waitForMigrationComplete(client *Client, timeout time.Duration) error { info, err := client.QueryMigration() if err != nil { // Ignore errors during migration - time.Sleep(100 * time.Millisecond) + time.Sleep(migrationPollInterval) continue } if info.Status == nil { // No migration status yet, might be loading - time.Sleep(100 * time.Millisecond) + time.Sleep(migrationPollInterval) continue } @@ -330,7 +350,7 @@ func waitForMigrationComplete(client *Client, timeout time.Duration) error { return nil } - time.Sleep(100 * time.Millisecond) + time.Sleep(migrationPollInterval) } return fmt.Errorf("migration timeout") } @@ -392,7 +412,7 @@ func qemuInstallHint() string { // isSocketInUse checks if a Unix socket is actively being used func isSocketInUse(socketPath string) bool { - conn, err := net.DialTimeout("unix", socketPath, 100*time.Millisecond) + conn, err := net.DialTimeout("unix", socketPath, socketDialTimeout) if err != nil { return false } @@ -404,12 +424,12 @@ func isSocketInUse(socketPath string) bool { func waitForSocket(socketPath string, timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { - conn, err := net.DialTimeout("unix", socketPath, 100*time.Millisecond) + conn, err := net.DialTimeout("unix", socketPath, socketDialTimeout) if err == nil { conn.Close() return nil } - time.Sleep(50 * time.Millisecond) + time.Sleep(socketPollInterval) } return fmt.Errorf("timeout waiting for socket") } diff --git a/lib/hypervisor/qemu/qemu.go b/lib/hypervisor/qemu/qemu.go index f63a71b9..a77dd71e 100644 --- a/lib/hypervisor/qemu/qemu.go +++ b/lib/hypervisor/qemu/qemu.go @@ -135,7 +135,7 @@ func (q *QEMU) Snapshot(ctx context.Context, destPath string) error { } // Wait for migration to complete - if err := q.client.WaitMigration(ctx, 30*time.Second); err != nil { + if err := q.client.WaitMigration(ctx, migrationTimeout); err != nil { Remove(q.socketPath) return fmt.Errorf("wait migration: %w", err) } diff --git a/lib/hypervisor/qemu/qmp.go b/lib/hypervisor/qemu/qmp.go index 54420900..1a5b45c6 100644 --- a/lib/hypervisor/qemu/qmp.go +++ b/lib/hypervisor/qemu/qmp.go @@ -10,6 +10,15 @@ import ( "github.com/digitalocean/go-qemu/qmp/raw" ) +// QMP client timeout constants +const ( + // qmpConnectTimeout is the timeout for connecting to the QMP socket + qmpConnectTimeout = 1 * time.Second + + // qmpMigrationPollInterval is how often to poll migration status in WaitMigration + qmpMigrationPollInterval = 50 * time.Millisecond +) + // Client wraps go-qemu's Domain and raw.Monitor with convenience methods. type Client struct { domain *qemu.Domain @@ -19,7 +28,7 @@ type Client struct { // NewClient creates a new QEMU client connected to the given socket. func NewClient(socketPath string) (*Client, error) { - mon, err := qmp.NewSocketMonitor("unix", socketPath, 2*time.Second) + mon, err := qmp.NewSocketMonitor("unix", socketPath, qmpConnectTimeout) if err != nil { return nil, fmt.Errorf("create socket monitor: %w", err) } @@ -112,7 +121,6 @@ func (c *Client) QueryMigration() (raw.MigrationInfo, error) { // Returns nil if migration completed successfully, error otherwise. func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error { deadline := time.Now().Add(timeout) - pollInterval := 50 * time.Millisecond for time.Now().Before(deadline) { select { @@ -129,7 +137,7 @@ func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error // Check migration status (Status is a pointer in MigrationInfo) if info.Status == nil { // Status not available yet, continue polling - time.Sleep(pollInterval) + time.Sleep(qmpMigrationPollInterval) continue } @@ -146,7 +154,7 @@ func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error // Unknown or "none" status - might not have started yet } - time.Sleep(pollInterval) + time.Sleep(qmpMigrationPollInterval) } return fmt.Errorf("migration timeout after %v", timeout) From 3afbd744e2db061c6b9f6e918714fd55f84429f0 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Tue, 23 Dec 2025 14:11:52 -0500 Subject: [PATCH 5/7] Consolidated waiting --- lib/hypervisor/qemu/process.go | 42 ++-------------------------------- lib/hypervisor/qemu/qmp.go | 13 ++++++++--- 2 files changed, 12 insertions(+), 43 deletions(-) diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index e0200735..a77c6f7c 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -14,7 +14,6 @@ import ( "syscall" "time" - "github.com/digitalocean/go-qemu/qmp/raw" "github.com/onkernel/hypeman/lib/hypervisor" "github.com/onkernel/hypeman/lib/logger" "github.com/onkernel/hypeman/lib/paths" @@ -26,12 +25,9 @@ const ( // socketWaitTimeout is how long to wait for QMP socket to become available after process start socketWaitTimeout = 10 * time.Second - // migrationTimeout is how long to wait for incoming migration to complete during restore + // migrationTimeout is how long to wait for migration to complete migrationTimeout = 30 * time.Second - // migrationPollInterval is how often to poll migration status - migrationPollInterval = 50 * time.Millisecond - // socketPollInterval is how often to check if socket is ready socketPollInterval = 50 * time.Millisecond @@ -310,7 +306,7 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, // QEMU loads the migration data from the exec subprocess // After loading, VM is in paused state and ready for 'cont' migrationWaitStart := time.Now() - if err := waitForMigrationComplete(hv.client, migrationTimeout); err != nil { + if err := hv.client.WaitMigration(ctx, migrationTimeout); err != nil { return 0, nil, fmt.Errorf("wait for migration: %w", err) } log.DebugContext(ctx, "migration complete", "duration_ms", time.Since(migrationWaitStart).Milliseconds()) @@ -321,40 +317,6 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, return pid, hv, nil } -// waitForMigrationComplete waits for incoming migration to finish loading -func waitForMigrationComplete(client *Client, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - info, err := client.QueryMigration() - if err != nil { - // Ignore errors during migration - time.Sleep(migrationPollInterval) - continue - } - - if info.Status == nil { - // No migration status yet, might be loading - time.Sleep(migrationPollInterval) - continue - } - - switch *info.Status { - case raw.MigrationStatusCompleted: - return nil - case raw.MigrationStatusFailed: - return fmt.Errorf("migration failed") - case raw.MigrationStatusCancelled: - return fmt.Errorf("migration cancelled") - case raw.MigrationStatusNone: - // No active migration - incoming may have completed - return nil - } - - time.Sleep(migrationPollInterval) - } - return fmt.Errorf("migration timeout") -} - // vmConfigFile is the name of the file where VM config is saved for restore. const vmConfigFile = "qemu-config.json" diff --git a/lib/hypervisor/qemu/qmp.go b/lib/hypervisor/qemu/qmp.go index 1a5b45c6..98a9c7e0 100644 --- a/lib/hypervisor/qemu/qmp.go +++ b/lib/hypervisor/qemu/qmp.go @@ -118,6 +118,7 @@ func (c *Client) QueryMigration() (raw.MigrationInfo, error) { } // WaitMigration polls until migration completes or times out. +// Works for both outgoing (snapshot) and incoming (restore) migrations. // Returns nil if migration completed successfully, error otherwise. func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error { deadline := time.Now().Add(timeout) @@ -131,7 +132,9 @@ func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error info, err := c.QueryMigration() if err != nil { - return fmt.Errorf("query migration: %w", err) + // Ignore transient errors during migration, keep polling + time.Sleep(qmpMigrationPollInterval) + continue } // Check migration status (Status is a pointer in MigrationInfo) @@ -144,14 +147,18 @@ func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error switch *info.Status { case raw.MigrationStatusCompleted: return nil + case raw.MigrationStatusNone: + // No active migration - for incoming this means complete, for outgoing it transitions quickly + return nil case raw.MigrationStatusFailed: + if info.ErrorDesc != nil && *info.ErrorDesc != "" { + return fmt.Errorf("migration failed: %s", *info.ErrorDesc) + } return fmt.Errorf("migration failed") case raw.MigrationStatusCancelled: return fmt.Errorf("migration cancelled") case raw.MigrationStatusActive, raw.MigrationStatusSetup, raw.MigrationStatusPreSwitchover, raw.MigrationStatusDevice: // Still in progress, continue polling - default: - // Unknown or "none" status - might not have started yet } time.Sleep(qmpMigrationPollInterval) From 6bbb7a790959dfa73e0448a055f29d0a99f9d2de Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Tue, 23 Dec 2025 15:45:55 -0500 Subject: [PATCH 6/7] Deduplicate arguments building --- lib/hypervisor/qemu/process.go | 145 +++++++++++---------------------- 1 file changed, 48 insertions(+), 97 deletions(-) diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index a77c6f7c..682d65c1 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -105,33 +105,34 @@ func (s *Starter) GetVersion(p *paths.Paths) (string, error) { return "", fmt.Errorf("could not parse QEMU version from: %s", string(output)) } -// StartVM launches QEMU with the VM configuration and returns a Hypervisor client. -// QEMU receives all configuration via command-line arguments at process start. -func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, socketPath string, config hypervisor.VMConfig) (int, hypervisor.Hypervisor, error) { +// buildQMPArgs returns the base QMP socket arguments for QEMU. +func buildQMPArgs(socketPath string) []string { + return []string{ + "-chardev", fmt.Sprintf("socket,id=qmp,path=%s,server=on,wait=off", socketPath), + "-mon", "chardev=qmp,mode=control", + } +} + +// startQEMUProcess handles the common QEMU process startup logic. +// Returns the PID, hypervisor client, and a cleanup function. +// The cleanup function must be called on error; call cleanup.Release() on success. +func (s *Starter) startQEMUProcess(ctx context.Context, p *paths.Paths, version string, socketPath string, args []string) (int, *QEMU, *cleanup.Cleanup, error) { log := logger.FromContext(ctx) // Get binary path binaryPath, err := s.GetBinaryPath(p, version) if err != nil { - return 0, nil, fmt.Errorf("get binary: %w", err) + return 0, nil, nil, fmt.Errorf("get binary: %w", err) } // Check if socket is already in use if isSocketInUse(socketPath) { - return 0, nil, fmt.Errorf("socket already in use, QEMU may be running at %s", socketPath) + return 0, nil, nil, fmt.Errorf("socket already in use, QEMU may be running at %s", socketPath) } // Remove stale socket if exists os.Remove(socketPath) - // Build command arguments: QMP socket + VM configuration - args := []string{ - "-chardev", fmt.Sprintf("socket,id=qmp,path=%s,server=on,wait=off", socketPath), - "-mon", "chardev=qmp,mode=control", - } - // Append VM configuration as command-line arguments - args = append(args, BuildArgs(config)...) - // Create command cmd := exec.Command(binaryPath, args...) @@ -144,7 +145,7 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s instanceDir := filepath.Dir(socketPath) logsDir := filepath.Join(instanceDir, "logs") if err := os.MkdirAll(logsDir, 0755); err != nil { - return 0, nil, fmt.Errorf("create logs directory: %w", err) + return 0, nil, nil, fmt.Errorf("create logs directory: %w", err) } vmmLogFile, err := os.OpenFile( @@ -153,48 +154,71 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s 0644, ) if err != nil { - return 0, nil, fmt.Errorf("create vmm log: %w", err) + return 0, nil, nil, fmt.Errorf("create vmm log: %w", err) } defer vmmLogFile.Close() cmd.Stdout = vmmLogFile cmd.Stderr = vmmLogFile + processStartTime := time.Now() if err := cmd.Start(); err != nil { - return 0, nil, fmt.Errorf("start qemu: %w", err) + return 0, nil, nil, fmt.Errorf("start qemu: %w", err) } pid := cmd.Process.Pid + log.DebugContext(ctx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds()) // Setup cleanup to kill the process if subsequent steps fail cu := cleanup.Make(func() { syscall.Kill(pid, syscall.SIGKILL) }) - defer cu.Clean() // Wait for socket to be ready + socketWaitStart := time.Now() if err := waitForSocket(socketPath, socketWaitTimeout); err != nil { + cu.Clean() vmmLogPath := filepath.Join(logsDir, "vmm.log") if logData, readErr := os.ReadFile(vmmLogPath); readErr == nil && len(logData) > 0 { - return 0, nil, fmt.Errorf("%w; vmm.log: %s", err, string(logData)) + return 0, nil, nil, fmt.Errorf("%w; vmm.log: %s", err, string(logData)) } - return 0, nil, err + return 0, nil, nil, err } + log.DebugContext(ctx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds()) // Create QMP client hv, err := New(socketPath) if err != nil { - return 0, nil, fmt.Errorf("create client: %w", err) + cu.Clean() + return 0, nil, nil, fmt.Errorf("create client: %w", err) } + return pid, hv, &cu, nil +} + +// StartVM launches QEMU with the VM configuration and returns a Hypervisor client. +// QEMU receives all configuration via command-line arguments at process start. +func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, socketPath string, config hypervisor.VMConfig) (int, hypervisor.Hypervisor, error) { + log := logger.FromContext(ctx) + + // Build command arguments: QMP socket + VM configuration + args := buildQMPArgs(socketPath) + args = append(args, BuildArgs(config)...) + + pid, hv, cu, err := s.startQEMUProcess(ctx, p, version, socketPath, args) + if err != nil { + return 0, nil, err + } + defer cu.Clean() + // Save config for potential restore later // QEMU migration files only contain memory state, not device config + instanceDir := filepath.Dir(socketPath) if err := saveVMConfig(instanceDir, config); err != nil { // Non-fatal - restore just won't work log.WarnContext(ctx, "failed to save VM config for restore", "error", err) } - // Success - release cleanup to prevent killing the process cu.Release() return pid, hv, nil } @@ -205,20 +229,6 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, log := logger.FromContext(ctx) startTime := time.Now() - // Get binary path - binaryPath, err := s.GetBinaryPath(p, version) - if err != nil { - return 0, nil, fmt.Errorf("get binary: %w", err) - } - - // Check if socket is already in use - if isSocketInUse(socketPath) { - return 0, nil, fmt.Errorf("socket already in use, QEMU may be running at %s", socketPath) - } - - // Remove stale socket if exists - os.Remove(socketPath) - // Load saved VM config from snapshot directory // QEMU requires exact same command-line args as when snapshot was taken configLoadStart := time.Now() @@ -228,14 +238,8 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, } log.DebugContext(ctx, "loaded VM config from snapshot", "duration_ms", time.Since(configLoadStart).Milliseconds()) - instanceDir := filepath.Dir(socketPath) - // Build command arguments: QMP socket + VM configuration + incoming migration - args := []string{ - "-chardev", fmt.Sprintf("socket,id=qmp,path=%s,server=on,wait=off", socketPath), - "-mon", "chardev=qmp,mode=control", - } - // Append VM configuration as command-line arguments + args := buildQMPArgs(socketPath) args = append(args, BuildArgs(config)...) // Add incoming migration flag to restore from snapshot @@ -244,63 +248,11 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, incomingURI := "exec:cat < " + memoryFile args = append(args, "-incoming", incomingURI) - // Create command - cmd := exec.Command(binaryPath, args...) - - // Daemonize: detach from parent process group - cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, - } - - // Redirect stdout/stderr to VMM log file - logsDir := filepath.Join(instanceDir, "logs") - if err := os.MkdirAll(logsDir, 0755); err != nil { - return 0, nil, fmt.Errorf("create logs directory: %w", err) - } - - vmmLogFile, err := os.OpenFile( - filepath.Join(logsDir, "vmm.log"), - os.O_CREATE|os.O_WRONLY|os.O_APPEND, - 0644, - ) + pid, hv, cu, err := s.startQEMUProcess(ctx, p, version, socketPath, args) if err != nil { - return 0, nil, fmt.Errorf("create vmm log: %w", err) - } - defer vmmLogFile.Close() - - cmd.Stdout = vmmLogFile - cmd.Stderr = vmmLogFile - - processStartTime := time.Now() - if err := cmd.Start(); err != nil { - return 0, nil, fmt.Errorf("start qemu: %w", err) - } - - pid := cmd.Process.Pid - log.DebugContext(ctx, "QEMU process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds()) - - // Setup cleanup to kill the process if subsequent steps fail - cu := cleanup.Make(func() { - syscall.Kill(pid, syscall.SIGKILL) - }) - defer cu.Clean() - - // Wait for socket to be ready - socketWaitStart := time.Now() - if err := waitForSocket(socketPath, 10*time.Second); err != nil { - vmmLogPath := filepath.Join(logsDir, "vmm.log") - if logData, readErr := os.ReadFile(vmmLogPath); readErr == nil && len(logData) > 0 { - return 0, nil, fmt.Errorf("%w; vmm.log: %s", err, string(logData)) - } return 0, nil, err } - log.DebugContext(ctx, "QMP socket ready", "duration_ms", time.Since(socketWaitStart).Milliseconds()) - - // Create QMP client - hv, err := New(socketPath) - if err != nil { - return 0, nil, fmt.Errorf("create client: %w", err) - } + defer cu.Clean() // Wait for incoming migration to complete // QEMU loads the migration data from the exec subprocess @@ -311,7 +263,6 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, } log.DebugContext(ctx, "migration complete", "duration_ms", time.Since(migrationWaitStart).Milliseconds()) - // Success - release cleanup to prevent killing the process cu.Release() log.DebugContext(ctx, "QEMU restore complete", "pid", pid, "total_duration_ms", time.Since(startTime).Milliseconds()) return pid, hv, nil From c998b6491d0d4a1218c774ede4053136f4ea7e93 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Tue, 23 Dec 2025 15:54:51 -0500 Subject: [PATCH 7/7] Fix possible polling error in standby / resume --- lib/hypervisor/qemu/process.go | 11 +++--- lib/hypervisor/qemu/qmp.go | 65 +++++++++++++++++++++++++++------- 2 files changed, 58 insertions(+), 18 deletions(-) diff --git a/lib/hypervisor/qemu/process.go b/lib/hypervisor/qemu/process.go index 682d65c1..e0c53734 100644 --- a/lib/hypervisor/qemu/process.go +++ b/lib/hypervisor/qemu/process.go @@ -254,14 +254,13 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, } defer cu.Clean() - // Wait for incoming migration to complete - // QEMU loads the migration data from the exec subprocess - // After loading, VM is in paused state and ready for 'cont' + // Wait for VM to be ready after loading migration data + // QEMU transitions from "inmigrate" to "paused" when loading completes migrationWaitStart := time.Now() - if err := hv.client.WaitMigration(ctx, migrationTimeout); err != nil { - return 0, nil, fmt.Errorf("wait for migration: %w", err) + if err := hv.client.WaitVMReady(ctx, migrationTimeout); err != nil { + return 0, nil, fmt.Errorf("wait for vm ready: %w", err) } - log.DebugContext(ctx, "migration complete", "duration_ms", time.Since(migrationWaitStart).Milliseconds()) + log.DebugContext(ctx, "VM ready", "duration_ms", time.Since(migrationWaitStart).Milliseconds()) cu.Release() log.DebugContext(ctx, "QEMU restore complete", "pid", pid, "total_duration_ms", time.Since(startTime).Milliseconds()) diff --git a/lib/hypervisor/qemu/qmp.go b/lib/hypervisor/qemu/qmp.go index 98a9c7e0..f28fcb72 100644 --- a/lib/hypervisor/qemu/qmp.go +++ b/lib/hypervisor/qemu/qmp.go @@ -15,8 +15,8 @@ const ( // qmpConnectTimeout is the timeout for connecting to the QMP socket qmpConnectTimeout = 1 * time.Second - // qmpMigrationPollInterval is how often to poll migration status in WaitMigration - qmpMigrationPollInterval = 50 * time.Millisecond + // qmpPollInterval is how often to poll status in WaitMigration and WaitVMReady + qmpPollInterval = 50 * time.Millisecond ) // Client wraps go-qemu's Domain and raw.Monitor with convenience methods. @@ -117,8 +117,8 @@ func (c *Client) QueryMigration() (raw.MigrationInfo, error) { return c.raw.QueryMigrate() } -// WaitMigration polls until migration completes or times out. -// Works for both outgoing (snapshot) and incoming (restore) migrations. +// WaitMigration polls until an outgoing migration completes or times out. +// Used for snapshot/standby operations where we initiate the migration. // Returns nil if migration completed successfully, error otherwise. func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error { deadline := time.Now().Add(timeout) @@ -133,23 +133,20 @@ func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error info, err := c.QueryMigration() if err != nil { // Ignore transient errors during migration, keep polling - time.Sleep(qmpMigrationPollInterval) + time.Sleep(qmpPollInterval) continue } // Check migration status (Status is a pointer in MigrationInfo) if info.Status == nil { // Status not available yet, continue polling - time.Sleep(qmpMigrationPollInterval) + time.Sleep(qmpPollInterval) continue } switch *info.Status { case raw.MigrationStatusCompleted: return nil - case raw.MigrationStatusNone: - // No active migration - for incoming this means complete, for outgoing it transitions quickly - return nil case raw.MigrationStatusFailed: if info.ErrorDesc != nil && *info.ErrorDesc != "" { return fmt.Errorf("migration failed: %s", *info.ErrorDesc) @@ -157,12 +154,56 @@ func (c *Client) WaitMigration(ctx context.Context, timeout time.Duration) error return fmt.Errorf("migration failed") case raw.MigrationStatusCancelled: return fmt.Errorf("migration cancelled") - case raw.MigrationStatusActive, raw.MigrationStatusSetup, raw.MigrationStatusPreSwitchover, raw.MigrationStatusDevice: - // Still in progress, continue polling + case raw.MigrationStatusNone, raw.MigrationStatusActive, raw.MigrationStatusSetup, raw.MigrationStatusPreSwitchover, raw.MigrationStatusDevice: + // Still in progress or not started yet, continue polling } - time.Sleep(qmpMigrationPollInterval) + time.Sleep(qmpPollInterval) } return fmt.Errorf("migration timeout after %v", timeout) } + +// WaitVMReady polls until the VM is ready after an incoming migration. +// Used for restore operations where QEMU was started with -incoming. +// The VM transitions from "inmigrate" to "paused" when migration data is loaded. +// Returns nil when VM is ready for resume, error on timeout or failure. +func (c *Client) WaitVMReady(ctx context.Context, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + status, err := c.Status() + if err != nil { + // Ignore transient errors, keep polling + time.Sleep(qmpPollInterval) + continue + } + + switch status { + case qemu.StatusPaused, qemu.StatusPostMigrate: + // VM has finished loading migration data and is ready for resume + return nil + case qemu.StatusInMigrate, qemu.StatusRestoreVM: + // Still loading migration data, continue polling + case qemu.StatusRunning: + // Already running (shouldn't happen, but not an error) + return nil + case qemu.StatusGuestPanicked, qemu.StatusInternalError, qemu.StatusIOError: + return fmt.Errorf("VM in error state: %v", status) + case qemu.StatusShutdown: + return fmt.Errorf("VM shut down during migration") + default: + // Other states - keep polling + } + + time.Sleep(qmpPollInterval) + } + + return fmt.Errorf("timeout waiting for VM ready after %v", timeout) +}