Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions cmd/api/api/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"net/http"

"github.com/c2h5oh/datasize"
"github.com/onkernel/hypeman/lib/instances"
Expand Down Expand Up @@ -236,40 +236,66 @@ func (s *ApiService) RestoreInstance(ctx context.Context, request oapi.RestoreIn
return oapi.RestoreInstance200JSONResponse(instanceToOAPI(*inst)), nil
}

// GetInstanceLogs streams instance logs
func (s *ApiService) GetInstanceLogs(ctx context.Context, request oapi.GetInstanceLogsRequestObject) (oapi.GetInstanceLogsResponseObject, error) {
log := logger.FromContext(ctx)
// logsStreamResponse implements oapi.GetInstanceLogsResponseObject with proper SSE flushing
type logsStreamResponse struct {
logChan <-chan string
}

follow := false
if request.Params.Follow != nil {
follow = *request.Params.Follow
func (r logsStreamResponse) VisitGetInstanceLogsResponse(w http.ResponseWriter) error {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
w.WriteHeader(200)

flusher, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("streaming not supported")
}

for line := range r.logChan {
fmt.Fprintf(w, "data: %s\n\n", line)
flusher.Flush()
}
return nil
}

// GetInstanceLogs streams instance logs via SSE
// With follow=false (default), streams last N lines then closes
// With follow=true, streams last N lines then continues following new output
func (s *ApiService) GetInstanceLogs(ctx context.Context, request oapi.GetInstanceLogsRequestObject) (oapi.GetInstanceLogsResponseObject, error) {
tail := 100
if request.Params.Tail != nil {
tail = *request.Params.Tail
}

logs, err := s.InstanceManager.GetInstanceLogs(ctx, request.Id, follow, tail)
follow := false
if request.Params.Follow != nil {
follow = *request.Params.Follow
}

logChan, err := s.InstanceManager.StreamInstanceLogs(ctx, request.Id, tail, follow)
if err != nil {
switch {
case errors.Is(err, instances.ErrNotFound):
return oapi.GetInstanceLogs404JSONResponse{
Code: "not_found",
Message: "instance not found",
}, nil
case errors.Is(err, instances.ErrTailNotFound):
return oapi.GetInstanceLogs500JSONResponse{
Code: "dependency_missing",
Message: "tail command not found on server - required for log streaming",
}, nil
default:
log.Error("failed to get instance logs", "error", err, "id", request.Id)
return oapi.GetInstanceLogs500JSONResponse{
Code: "internal_error",
Message: "failed to get instance logs",
Message: "failed to stream logs",
}, nil
}
}

return oapi.GetInstanceLogs200TexteventStreamResponse{
Body: strings.NewReader(logs),
ContentLength: int64(len(logs)),
}, nil
return logsStreamResponse{logChan: logChan}, nil
}

// AttachVolume attaches a volume to an instance (not yet implemented)
Expand Down
6 changes: 6 additions & 0 deletions cmd/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type Config struct {
DNSServer string
MaxConcurrentBuilds int
MaxOverlaySize string
LogMaxSize string
LogMaxFiles int
LogRotateInterval string
}

// Load loads configuration from environment variables
Expand All @@ -37,6 +40,9 @@ func Load() *Config {
DNSServer: getEnv("DNS_SERVER", "1.1.1.1"),
MaxConcurrentBuilds: getEnvInt("MAX_CONCURRENT_BUILDS", 1),
MaxOverlaySize: getEnv("MAX_OVERLAY_SIZE", "100GB"),
LogMaxSize: getEnv("LOG_MAX_SIZE", "50MB"),
LogMaxFiles: getEnvInt("LOG_MAX_FILES", 1),
LogRotateInterval: getEnv("LOG_ROTATE_INTERVAL", "5m"),
}

return cfg
Expand Down
31 changes: 31 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

"github.com/c2h5oh/datasize"
"github.com/getkin/kin-openapi/openapi3filter"
"github.com/ghodss/yaml"
"github.com/go-chi/chi/v5"
Expand Down Expand Up @@ -50,6 +51,16 @@ func run() error {
logger.Warn("JWT_SECRET not configured - API authentication will fail")
}

// Validate log rotation config
var logMaxSize datasize.ByteSize
if err := logMaxSize.UnmarshalText([]byte(app.Config.LogMaxSize)); err != nil {
return fmt.Errorf("invalid LOG_MAX_SIZE %q: %w", app.Config.LogMaxSize, err)
}
logRotateInterval, err := time.ParseDuration(app.Config.LogRotateInterval)
if err != nil {
return fmt.Errorf("invalid LOG_ROTATE_INTERVAL %q: %w", app.Config.LogRotateInterval, err)
}

// Ensure system files (kernel, initrd) exist before starting server
logger.Info("Ensuring system files...")
if err := app.SystemManager.EnsureSystemFiles(app.Ctx); err != nil {
Expand Down Expand Up @@ -177,6 +188,26 @@ func run() error {
return nil
})

// Log rotation scheduler
grp.Go(func() error {
ticker := time.NewTicker(logRotateInterval)
defer ticker.Stop()

logger.Info("log rotation scheduler started", "interval", app.Config.LogRotateInterval, "max_size", logMaxSize, "max_files", app.Config.LogMaxFiles)
for {
select {
case <-gctx.Done():
return nil
case <-ticker.C:
if err := app.InstanceManager.RotateLogs(gctx, int64(logMaxSize), app.Config.LogMaxFiles); err != nil {
logger.Error("log rotation failed", "error", err)
} else {
logger.Info("log rotation completed", "max_size", logMaxSize, "max_files", app.Config.LogMaxFiles)
}
}
}
})

return grp.Wait()
}

Expand Down
155 changes: 97 additions & 58 deletions lib/instances/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,92 +4,131 @@ import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"

"github.com/onkernel/hypeman/lib/logger"
)

// getInstanceLogs returns the last N lines of instance console logs
func (m *manager) getInstanceLogs(
ctx context.Context,
id string,
follow bool,
tail int,
) (string, error) {
// ErrTailNotFound is returned when the tail command is not available
var ErrTailNotFound = fmt.Errorf("tail command not found: required for log streaming")

// StreamInstanceLogs streams instance console logs
// Returns last N lines, then continues following if follow=true
func (m *manager) streamInstanceLogs(ctx context.Context, id string, tail int, follow bool) (<-chan string, error) {
log := logger.FromContext(ctx)
log.DebugContext(ctx, "getting instance logs", "id", id, "follow", follow, "tail", tail)

// 1. Verify instance exists
_, err := m.loadMetadata(id)
if err != nil {
log.ErrorContext(ctx, "failed to load instance metadata", "id", id, "error", err)
return "", err
}
log.DebugContext(ctx, "starting log stream", "id", id, "tail", tail, "follow", follow)

logPath := m.paths.InstanceConsoleLog(id)
// Verify tail command is available
if _, err := exec.LookPath("tail"); err != nil {
return nil, ErrTailNotFound
}

// 2. Check if log file exists
if _, err := os.Stat(logPath); os.IsNotExist(err) {
log.DebugContext(ctx, "no log file exists yet", "id", id)
return "", nil // No logs yet
if _, err := m.loadMetadata(id); err != nil {
return nil, err
}

// 3. For now, only support tail (not follow)
logPath := m.paths.InstanceConsoleLog(id)

// Build tail command
args := []string{"-n", strconv.Itoa(tail)}
if follow {
log.WarnContext(ctx, "follow mode not yet implemented", "id", id)
return "", fmt.Errorf("follow not yet implemented")
args = append(args, "-f")
}
args = append(args, logPath)

cmd := exec.CommandContext(ctx, "tail", args...)

// 4. Read last N lines
result, err := tailFile(logPath, tail)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.ErrorContext(ctx, "failed to read log file", "id", id, "error", err)
return "", err
return nil, fmt.Errorf("create stdout pipe: %w", err)
}

if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("start tail: %w", err)
}

log.DebugContext(ctx, "retrieved instance logs", "id", id, "bytes", len(result))
return result, nil

out := make(chan string, 100)

go func() {
defer close(out)
defer cmd.Process.Kill()

scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
select {
case <-ctx.Done():
log.DebugContext(ctx, "log stream cancelled", "id", id)
return
case out <- scanner.Text():
}
}

if err := scanner.Err(); err != nil {
log.ErrorContext(ctx, "scanner error", "id", id, "error", err)
}

// Wait for tail to exit (important for non-follow mode)
cmd.Wait()
}()

return out, nil
}

// tailFile reads the last n lines from a file efficiently
func tailFile(path string, n int) (string, error) {
file, err := os.Open(path)
// rotateLogIfNeeded performs copytruncate rotation if file exceeds maxBytes
// Keeps up to maxFiles old backups (.1, .2, etc.)
func rotateLogIfNeeded(path string, maxBytes int64, maxFiles int) error {
info, err := os.Stat(path)
if err != nil {
return "", fmt.Errorf("open log file: %w", err)
if os.IsNotExist(err) {
return nil // Nothing to rotate
}
return fmt.Errorf("stat log file: %w", err)
}
defer file.Close()

// For simplicity, read entire file and take last N lines
// TODO: Optimize for very large log files with reverse reading
var lines []string
scanner := bufio.NewScanner(file)
if info.Size() < maxBytes {
return nil // Under limit, nothing to do
}

for scanner.Scan() {
lines = append(lines, scanner.Text())
// Shift old backups (.1 -> .2, .2 -> .3, etc.)
for i := maxFiles; i >= 1; i-- {
oldPath := fmt.Sprintf("%s.%d", path, i)
newPath := fmt.Sprintf("%s.%d", path, i+1)

if i == maxFiles {
// Delete the oldest backup
os.Remove(oldPath)
} else {
// Shift to next number
os.Rename(oldPath, newPath)
}
}

if err := scanner.Err(); err != nil {
return "", fmt.Errorf("read log file: %w", err)
// Copy current log to .1
src, err := os.Open(path)
if err != nil {
return fmt.Errorf("open log for rotation: %w", err)
}

// Take last n lines
start := 0
if len(lines) > n {
start = len(lines) - n
dst, err := os.Create(path + ".1")
if err != nil {
src.Close()
return fmt.Errorf("create backup: %w", err)
}

result := ""
for _, line := range lines[start:] {
result += line + "\n"
_, err = io.Copy(dst, src)
src.Close()
dst.Close()
if err != nil {
return fmt.Errorf("copy to backup: %w", err)
}

return result, nil
}
// Truncate original (keeps file descriptor valid for writers)
if err := os.Truncate(path, 0); err != nil {
return fmt.Errorf("truncate log: %w", err)
}

// followLogFile streams log file contents (for SSE implementation)
// Returns a channel that emits new log lines
func followLogFile(ctx context.Context, path string) (<-chan string, error) {
// TODO: Implement with fsnotify or tail -f equivalent
return nil, fmt.Errorf("not implemented")
return nil
}

32 changes: 25 additions & 7 deletions lib/instances/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type Manager interface {
DeleteInstance(ctx context.Context, id string) error
StandbyInstance(ctx context.Context, id string) (*Instance, error)
RestoreInstance(ctx context.Context, id string) (*Instance, error)
GetInstanceLogs(ctx context.Context, id string, follow bool, tail int) (string, error)
StreamInstanceLogs(ctx context.Context, id string, tail int, follow bool) (<-chan string, error)
RotateLogs(ctx context.Context, maxBytes int64, maxFiles int) error
AttachVolume(ctx context.Context, id string, volumeId string, req AttachVolumeRequest) (*Instance, error)
DetachVolume(ctx context.Context, id string, volumeId string) (*Instance, error)
}
Expand Down Expand Up @@ -107,12 +108,29 @@ func (m *manager) GetInstance(ctx context.Context, id string) (*Instance, error)
return m.getInstance(ctx, id)
}

// GetInstanceLogs returns instance console logs
func (m *manager) GetInstanceLogs(ctx context.Context, id string, follow bool, tail int) (string, error) {
lock := m.getInstanceLock(id)
lock.RLock()
defer lock.RUnlock()
return m.getInstanceLogs(ctx, id, follow, tail)
// StreamInstanceLogs streams instance console logs
// Returns last N lines, then continues following if follow=true
func (m *manager) StreamInstanceLogs(ctx context.Context, id string, tail int, follow bool) (<-chan string, error) {
// Note: No lock held during streaming - we read from the file continuously
// and the file is append-only, so this is safe
return m.streamInstanceLogs(ctx, id, tail, follow)
}

// RotateLogs rotates console logs for all instances that exceed maxBytes
func (m *manager) RotateLogs(ctx context.Context, maxBytes int64, maxFiles int) error {
instances, err := m.listInstances(ctx)
if err != nil {
return fmt.Errorf("list instances for rotation: %w", err)
}

var lastErr error
for _, inst := range instances {
logPath := m.paths.InstanceConsoleLog(inst.Id)
if err := rotateLogIfNeeded(logPath, maxBytes, maxFiles); err != nil {
lastErr = err // Continue with other instances, but track error
}
}
return lastErr
}

// AttachVolume attaches a volume to an instance (not yet implemented)
Expand Down
Loading
Loading