Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/altmount/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func runServe(cmd *cobra.Command, args []string) error {

// Register health system config change handler for dynamic enable/disable
if healthWorker != nil && librarySyncWorker != nil {
healthController := health.NewHealthSystemController(healthWorker, librarySyncWorker, ctx)
healthController.RegisterConfigChangeHandler(configManager)
healthController := health.NewHealthSystemController(healthWorker, librarySyncWorker)
healthController.RegisterConfigChangeHandler(ctx, configManager)

// Trigger initial metadata date sync if health is enabled
if cfg.Health.Enabled != nil && *cfg.Health.Enabled {
Expand Down
1 change: 0 additions & 1 deletion cmd/altmount/cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ func startHealthWorker(
poolManager,
configManager.GetConfigGetter(),
rcloneClient,
nil, // No event handler for now
)

healthWorker := health.NewHealthWorker(
Expand Down
1 change: 0 additions & 1 deletion internal/api/health_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,6 @@ func (s *Server) handleGetHealthWorkerStatus(c *fiber.Ctx) error {
TotalFilesCorrupted: stats.TotalFilesCorrupted,
CurrentRunStartTime: stats.CurrentRunStartTime,
CurrentRunFilesChecked: stats.CurrentRunFilesChecked,
PendingManualChecks: stats.PendingManualChecks,
LastError: stats.LastError,
ErrorCount: stats.ErrorCount,
}
Expand Down
1 change: 0 additions & 1 deletion internal/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,6 @@ type HealthWorkerStatusResponse struct {
TotalFilesCorrupted int64 `json:"total_files_corrupted"`
CurrentRunStartTime *time.Time `json:"current_run_start_time,omitempty"`
CurrentRunFilesChecked int `json:"current_run_files_checked"`
PendingManualChecks int `json:"pending_manual_checks"`
LastError *string `json:"last_error,omitempty"`
ErrorCount int64 `json:"error_count"`
}
Expand Down
34 changes: 4 additions & 30 deletions internal/health/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,13 @@ type HealthEvent struct {
SourceNzb *string
}

// EventHandler handles health events
type EventHandler func(event HealthEvent)

// HealthConfig holds unified configuration for health checking
type HealthConfig struct {
// Worker settings
Enabled bool // Whether health worker is enabled
CheckInterval time.Duration // How often to run health checks
MaxConcurrentJobs int // How many files to check in each batch

// Health check settings
MaxRetries int // Maximum retries before marking as permanently corrupted
MaxSegmentConnections int // Maximum concurrent connections for segment checking
EventHandler EventHandler // Optional event handler for notifications
}

// HealthChecker manages file health checking logic
type HealthChecker struct {
healthRepo *database.HealthRepository
metadataService *metadata.MetadataService
poolManager pool.Manager
configGetter config.ConfigGetter
rcloneClient rclonecli.RcloneRcClient // Optional rclone client for VFS notifications
eventHandler EventHandler // Optional event handler for notifications
}

// NewHealthChecker creates a new health checker
Expand All @@ -71,26 +54,16 @@ func NewHealthChecker(
poolManager pool.Manager,
configGetter config.ConfigGetter,
rcloneClient rclonecli.RcloneRcClient,
eventHandler EventHandler,
) *HealthChecker {
return &HealthChecker{
healthRepo: healthRepo,
metadataService: metadataService,
poolManager: poolManager,
configGetter: configGetter,
rcloneClient: rcloneClient,
eventHandler: eventHandler,
}
}

func (hc *HealthChecker) getMaxConnectionsForHealthChecks() int {
return hc.configGetter().GetMaxConnectionsForHealthChecks()
}

func (hc *HealthChecker) getSegmentSamplePercentage() int {
return hc.configGetter().GetSegmentSamplePercentage()
}

// CheckFile checks the health of a specific file
func (hc *HealthChecker) CheckFile(ctx context.Context, filePath string) HealthEvent {
// Get file metadata
Expand Down Expand Up @@ -136,15 +109,16 @@ func (hc *HealthChecker) checkSingleFile(ctx context.Context, filePath string, f
return event
}

slog.InfoContext(ctx, "Checking segment availability", "file_path", filePath, "total_segments", len(fileMeta.SegmentData), "sample_percentage", hc.getSegmentSamplePercentage())
cfg := hc.configGetter()
slog.InfoContext(ctx, "Checking segment availability", "file_path", filePath, "total_segments", len(fileMeta.SegmentData), "sample_percentage", cfg.GetSegmentSamplePercentage())

// Validate segment availability using detailed validation logic
result, err := usenet.ValidateSegmentAvailabilityDetailed(
ctx,
fileMeta.SegmentData,
hc.poolManager,
hc.getMaxConnectionsForHealthChecks(),
hc.getSegmentSamplePercentage(),
cfg.GetMaxConnectionsForHealthChecks(),
cfg.GetSegmentSamplePercentage(),
nil, // No progress callback for health checks
30*time.Second,
)
Expand Down
27 changes: 12 additions & 15 deletions internal/health/config.go → internal/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@ import (
type HealthSystemController struct {
healthWorker *HealthWorker
librarySyncWorker *LibrarySyncWorker
ctx context.Context
mu sync.Mutex
}

// NewHealthSystemController creates a new health system controller
func NewHealthSystemController(
healthWorker *HealthWorker,
librarySyncWorker *LibrarySyncWorker,
ctx context.Context,
) *HealthSystemController {
return &HealthSystemController{
healthWorker: healthWorker,
librarySyncWorker: librarySyncWorker,
ctx: ctx,
}
}

Expand Down Expand Up @@ -87,7 +84,7 @@ func (hsc *HealthSystemController) SyncMetadataDates(ctx context.Context) {
}

// RegisterConfigChangeHandler registers a callback to handle health system enable/disable changes
func (hsc *HealthSystemController) RegisterConfigChangeHandler(configManager *config.Manager) {
func (hsc *HealthSystemController) RegisterConfigChangeHandler(ctx context.Context, configManager *config.Manager) {
configManager.OnConfigChange(func(oldConfig, newConfig *config.Config) {
hsc.mu.Lock()
defer hsc.mu.Unlock()
Expand All @@ -103,43 +100,43 @@ func (hsc *HealthSystemController) RegisterConfigChangeHandler(configManager *co

if newEnabled {
// Health system was disabled, now enabled - start workers
slog.InfoContext(hsc.ctx, "Health system enabled via config change, starting workers")
slog.InfoContext(ctx, "Health system enabled via config change, starting workers")

// Start health worker
if !hsc.healthWorker.IsRunning() {
if err := hsc.healthWorker.Start(hsc.ctx); err != nil {
slog.ErrorContext(hsc.ctx, "Failed to start health worker", "error", err)
if err := hsc.healthWorker.Start(ctx); err != nil {
slog.ErrorContext(ctx, "Failed to start health worker", "error", err)
return
}
}

// Start library sync worker
if !hsc.librarySyncWorker.IsRunning() {
hsc.librarySyncWorker.StartLibrarySync(hsc.ctx)
hsc.librarySyncWorker.StartLibrarySync(ctx)
}

// Run background backfill
hsc.SyncMetadataDates(hsc.ctx)
hsc.SyncMetadataDates(ctx)

slog.InfoContext(hsc.ctx, "Health system started successfully")
slog.InfoContext(ctx, "Health system started successfully")
} else {
// Health system was enabled, now disabled - stop workers
slog.InfoContext(hsc.ctx, "Health system disabled via config change, stopping workers")
slog.InfoContext(ctx, "Health system disabled via config change, stopping workers")

// Stop library sync worker first
if hsc.librarySyncWorker.IsRunning() {
hsc.librarySyncWorker.Stop(hsc.ctx)
hsc.librarySyncWorker.Stop(ctx)
}

// Stop health worker
if hsc.healthWorker.IsRunning() {
if err := hsc.healthWorker.Stop(hsc.ctx); err != nil {
slog.ErrorContext(hsc.ctx, "Failed to stop health worker", "error", err)
if err := hsc.healthWorker.Stop(ctx); err != nil {
slog.ErrorContext(ctx, "Failed to stop health worker", "error", err)
return
}
}

slog.InfoContext(hsc.ctx, "Health system stopped successfully")
slog.InfoContext(ctx, "Health system stopped successfully")
}
})
}
Loading
Loading