Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add coverage for runner health monitor #39

Merged
merged 14 commits into from
Dec 2, 2024
4 changes: 2 additions & 2 deletions internal/commands/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (l *LaunchCommand) Execute(cfg *config.Config) error {
return fmt.Errorf("failed to start runner process: %w", err)
}

go http.MonitorRunnerHealth(ctx, cmd, env.RunnerServerURI, &wg)
go http.ManageRunnerHealth(ctx, cmd, env.RunnerServerURI, &wg)

err = cmd.Wait()
if err != nil && err.Error() == "signal: killed" {
logs.Warn("Unhealthy runner process was terminated")
logs.Warn("Unresponsive runner process was terminated")
} else if err != nil {
logs.Errorf("Runner process exited with error: %v", err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"
)

const (
var (
// healthCheckTimeout is the timeout (in seconds) for the launcher's health
// check request to the runner.
healthCheckTimeout = 5 * time.Second
Expand All @@ -28,6 +28,23 @@ const (
initialDelay = 3 * time.Second
)

// HealthStatus represents the possible states of runner health monitoring
type HealthStatus int

const (
// StatusHealthy indicates the runner is responding to health checks
StatusHealthy HealthStatus = iota
// StatusUnhealthy indicates the runner has failed too many health checks
StatusUnhealthy
// StatusMonitoringCancelled indicates monitoring was cancelled via context
StatusMonitoringCancelled
)

// healthCheckResult contains the result of health monitoring
type healthCheckResult struct {
Status HealthStatus
}

// sendRunnerHealthCheckRequest sends a request to the runner's health check endpoint.
// Returns `nil` if the health check succeeds, or an error if it fails.
func sendRunnerHealthCheckRequest(runnerServerURI string) error {
Expand All @@ -50,12 +67,18 @@ func sendRunnerHealthCheckRequest(runnerServerURI string) error {
return nil
}

// MonitorRunnerHealth regularly checks the runner's health status. If the
// health check fails more times than allowed, we terminate the runner process.
func MonitorRunnerHealth(ctx context.Context, cmd *exec.Cmd, runnerServerURI string, wg *sync.WaitGroup) {
func monitorRunnerHealth(
ctx context.Context,
runnerServerURI string,
wg *sync.WaitGroup,
) chan healthCheckResult {
logs.Debug("Started monitoring runner health")
resultChan := make(chan healthCheckResult, 1)

wg.Add(1)
go func() {
defer wg.Done()
defer close(resultChan)

time.Sleep(initialDelay)

Expand All @@ -67,24 +90,47 @@ func MonitorRunnerHealth(ctx context.Context, cmd *exec.Cmd, runnerServerURI str
select {
case <-ctx.Done():
logs.Debug("Stopped monitoring runner health")
resultChan <- healthCheckResult{Status: StatusMonitoringCancelled}
return

case <-ticker.C:
if err := sendRunnerHealthCheckRequest(runnerServerURI); err != nil {
failureCount++
logs.Warnf("Found runner unresponsive (%d/%d)", failureCount, healthCheckMaxFailures)
if failureCount >= healthCheckMaxFailures {
logs.Warn("Reached max failures on runner health check, terminating runner...")
if err := cmd.Process.Kill(); err != nil {
panic(fmt.Errorf("failed to terminate runner process: %v", err))
}
logs.Debug("Stopped monitoring runner health")
resultChan <- healthCheckResult{Status: StatusUnhealthy}
return
}
} else {
failureCount = 0
logs.Debug("Found runner healthy")
failureCount = 0
}
}
}
}()

return resultChan
}

// ManageRunnerHealth monitors runner health and terminates it if unhealthy.
func ManageRunnerHealth(
ctx context.Context,
cmd *exec.Cmd,
runnerServerURI string,
wg *sync.WaitGroup,
) {
resultChan := monitorRunnerHealth(ctx, runnerServerURI, wg)

go func() {
result := <-resultChan
switch result.Status {
case StatusUnhealthy:
logs.Warn("Found runner unresponsive too many times, terminating runner...")
if err := cmd.Process.Kill(); err != nil {
panic(fmt.Errorf("failed to terminate unhealthy runner process: %v", err))
}
case StatusMonitoringCancelled:
// On cancellation via context, CommandContext will terminate the process, so no action.
}
}()
}
215 changes: 215 additions & 0 deletions internal/http/manage_runner_health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package http

import (
"context"
"net/http"
"net/http/httptest"
"os/exec"
"sync"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func init() {
healthCheckTimeout = 20 * time.Millisecond
healthCheckInterval = 10 * time.Millisecond
initialDelay = 5 * time.Millisecond
healthCheckMaxFailures = 2
}

func TestSendRunnerHealthCheckRequest(t *testing.T) {
tests := []struct {
name string
serverResponse int
serverDelay time.Duration
expectError bool
}{
{
name: "successful health check",
serverResponse: http.StatusOK,
expectError: false,
},
{
name: "unhealthy response",
serverResponse: http.StatusServiceUnavailable,
expectError: true,
},
{
name: "timeout failure",
serverResponse: http.StatusOK,
serverDelay: healthCheckTimeout * 2,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
if tt.serverDelay > 0 {
time.Sleep(tt.serverDelay)
}
w.WriteHeader(tt.serverResponse)
}))
defer srv.Close()

err := sendRunnerHealthCheckRequest(srv.URL)

if tt.expectError {
assert.Error(t, err, "expected error but got nil")
} else {
assert.NoError(t, err, "unexpected error")
}
})
}
}

func TestMonitorRunnerHealth(t *testing.T) {
tests := []struct {
name string
serverFn http.HandlerFunc
expectedStatus HealthStatus
timeout time.Duration
}{
{
name: "healthy runner",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
expectedStatus: StatusMonitoringCancelled,
timeout: 200 * time.Millisecond,
},
{
name: "unhealthy runner",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
},
expectedStatus: StatusUnhealthy,
timeout: 500 * time.Millisecond,
},
{
name: "alternating health status",
serverFn: func() http.HandlerFunc {
isHealthy := true
return func(w http.ResponseWriter, _ *http.Request) {
if isHealthy {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
isHealthy = !isHealthy
}
}(),
expectedStatus: StatusMonitoringCancelled,
timeout: 200 * time.Millisecond,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(tt.serverFn)
defer srv.Close()

ctx, cancel := context.WithTimeout(context.Background(), tt.timeout)
defer cancel()

var wg sync.WaitGroup
resultChan := monitorRunnerHealth(ctx, srv.URL, &wg)

result := <-resultChan
assert.Equal(t, tt.expectedStatus, result.Status, "unexpected health status")

wg.Wait()
})
}
}

func TestManageRunnerHealth(t *testing.T) {
tests := []struct {
name string
serverFn http.HandlerFunc
expectKill bool
}{
{
name: "healthy runner not killed",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
expectKill: false,
},
{
name: "unhealthy runner killed",
serverFn: func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
},
expectKill: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(tt.serverFn)
defer srv.Close()

cmd := exec.Command("sleep", "60")
require.NoError(t, cmd.Start(), "Failed to start long-running dummy process")

done := make(chan error) // to help monitor process state
go func() {
done <- cmd.Wait()
}()

var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

ManageRunnerHealth(ctx, cmd, srv.URL, &wg)

// For a healthy runner, we wait long enough for 3 health checks to pass.
// For an unhealthy runner, we wait long enough for 2 health checks to
// fail and then trigger kill. This sleep ensures we do not check runner
// health too early, i.e. before monitoring can detect unhealthy status.
time.Sleep(healthCheckInterval * time.Duration(healthCheckMaxFailures+1))

// check if monitored process was killed or kept as expected
select {
case <-done:
assert.True(t, tt.expectKill, "Process was killed but should have been left running")

case <-time.After(100 * time.Millisecond):
if tt.expectKill {
err := cmd.Process.Signal(syscall.Signal(0))
assert.Error(t, err, "Expected process to be killed but it was still running")
if err == nil {
assert.NoError(t, cmd.Process.Kill(), "Failed to kill process during cleanup")
}
}
}

wg.Wait()
})
}
}

func TestContextCancellation(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup

resultChan := monitorRunnerHealth(ctx, srv.URL, &wg)

time.Sleep(20 * time.Millisecond) // short-lived until context is cancelled
cancel()

result := <-resultChan
assert.Equal(t, StatusMonitoringCancelled, result.Status, "unexpected status after context cancellation")

wg.Wait()
}