diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index d018b6b..71be1be 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -33,6 +33,8 @@ jobs: version="$(bash .github/workflows/determine_docker_image_tag.sh)" export MINIO_ROOT_USER="minioadmin" export MINIO_ROOT_PASSWORD=$(openssl rand -base64 32) + export LOG_AGGREGATOR_URL="http://log-aggregator:8080" + export LOG_AGGREGATOR_ENABLED="true" bash k8/generate.sh $version env: WORKER_COUNT: 1 @@ -50,7 +52,9 @@ jobs: echo "Host: $FRONTEND_URL" npx wait-on "$FRONTEND_URL/service/control/health" kubectl wait --timeout 10m --for=condition=ready pod -l role=worker - ROOT_TEST_URL=$FRONTEND_URL npm run test + AGG_PORT=$(kubectl get svc log-aggregator -o=jsonpath='{.spec.ports[?(@.port==8080)].nodePort}') + LOG_AGGREGATOR_URL="http://127.0.0.1:$AGG_PORT" + ROOT_TEST_URL=$FRONTEND_URL LOG_AGGREGATOR_URL=$LOG_AGGREGATOR_URL npm run test env: FLAKINESS_ACCESS_TOKEN: ${{ secrets.FLAKINESS_ACCESS_TOKEN }} - name: Upload playwright-report @@ -75,6 +79,7 @@ jobs: - file-service - frontend - control-service + - log-aggregator - squid steps: - uses: actions/checkout@v4 diff --git a/control-service/Dockerfile b/control-service/Dockerfile index 1f2064c..8c6a050 100644 --- a/control-service/Dockerfile +++ b/control-service/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.25-alpine as builder +FROM golang:1.25-alpine AS builder WORKDIR /root COPY go.mod /root/ COPY go.sum /root/ diff --git a/control-service/main.go b/control-service/main.go index 838c019..5103345 100644 --- a/control-service/main.go +++ b/control-service/main.go @@ -1,10 +1,11 @@ package main import ( + "bytes" "context" "errors" "fmt" - "io/ioutil" + "io" "math/rand" "net/http" "os" @@ -15,12 +16,14 @@ import ( "github.com/labstack/echo/v4" "github.com/mxschmitt/try-playwright/internal/echoutils" + "github.com/mxschmitt/try-playwright/internal/logagg" "github.com/mxschmitt/try-playwright/internal/workertypes" log "github.com/sirupsen/logrus" "github.com/getsentry/sentry-go" sentryecho "github.com/getsentry/sentry-go/echo" + "github.com/google/uuid" "github.com/streadway/amqp" clientv3 "go.etcd.io/etcd/client/v3" "k8s.io/client-go/kubernetes" @@ -36,8 +39,11 @@ const ( func init() { rand.Seed(time.Now().UTC().UnixNano()) - log.SetFormatter(&log.TextFormatter{ - TimestampFormat: time.StampMilli, + log.SetFormatter(&log.JSONFormatter{ + TimestampFormat: time.RFC3339Nano, + FieldMap: log.FieldMap{ + log.FieldKeyMsg: "message", + }, }) } @@ -133,44 +139,78 @@ func getTurnstileIP(c echo.Context) string { return c.RealIP() } +func respondError(c echo.Context, status int, requestID, testID string, logBuffer *bytes.Buffer, msg string) error { + return c.JSON(status, echo.Map{ + "error": msg, + "requestId": requestID, + "testId": testID, + "logs": echo.Map{ + "control": logBuffer.String(), + }, + }) +} + func (s *server) handleRun(c echo.Context) error { + requestID := uuid.New().String() + testID := c.Request().Header.Get("X-Test-ID") + if testID == "" { + testID = requestID + } + c.Set("requestId", requestID) + c.Set("testId", testID) + c.Response().Header().Set("X-Request-ID", requestID) + c.Response().Header().Set("X-Test-ID", testID) + logBuffer := &bytes.Buffer{} + defer logagg.DeferPost("control", &testID, &requestID, logBuffer) + requestScopedLogger := log.New() + requestScopedLogger.SetFormatter(log.StandardLogger().Formatter) + requestScopedLogger.SetLevel(log.GetLevel()) + requestScopedLogger.SetOutput(io.MultiWriter(os.Stdout, logBuffer)) + logger := requestScopedLogger.WithFields(log.Fields{ + "request-id": requestID, + "testId": testID, + "service": "control", + }) + logger.Logger.AddHook(logagg.NewHook()) + var req *workertypes.WorkerRequestPayload if err := c.Bind(&req); err != nil { - return c.JSON(http.StatusBadRequest, echo.Map{ - "error": "could not decode request body", - }) + return respondError(c, http.StatusBadRequest, requestID, testID, logBuffer, "could not decode request body") + } + req.RequestID = requestID + if req.TestID == "" { + req.TestID = testID } if !req.Language.IsValid() { - return c.JSON(http.StatusBadRequest, echo.Map{ - "error": "could not recognize language", - }) + return respondError(c, http.StatusBadRequest, requestID, testID, logBuffer, "could not recognize language") } - log.Printf("Validating turnstile") + logger.Printf("Validating turnstile") if err := ValidateTurnstile(c.Request().Context(), req.Token, getTurnstileIP(c), os.Getenv("TURNSTILE_SECRET_KEY")); err != nil { - log.Printf("Could not validate turnstile: %v", err) - return c.JSON(http.StatusUnauthorized, echo.Map{ - "error": err.Error(), - }) + logger.Printf("Could not validate turnstile: %v", err) + return respondError(c, http.StatusUnauthorized, requestID, testID, logBuffer, err.Error()) } - log.Printf("Validated turnstile successfully") - log.Printf("Obtaining worker") + logger = logger.WithField("request-id", requestID) + logger.Printf("Validated turnstile successfully") + logger.Printf("Obtaining worker") var worker *Worker select { case worker = <-s.workers[req.Language].GetCh(): case <-time.After(WORKER_TIMEOUT * time.Second): - log.Println("Got Worker timeout, was not able to get a worker!") - return c.JSON(http.StatusServiceUnavailable, echo.Map{ - "error": "Timeout in getting a worker!", - }) + logger.Println("Got Worker timeout, was not able to get a worker!") + return respondError(c, http.StatusServiceUnavailable, requestID, testID, logBuffer, "Timeout in getting a worker!") } - logger := log.WithField("worker-id", worker.id) + logger = logger.WithFields(log.Fields{ + "worker-id": worker.id, + "testId": testID, + }) logger.Infof("Received code: '%s'", req.Code) logger.Info("Obtained worker successfully") logger.Info("Publishing job") - if err := worker.Publish(req.Code); err != nil { - return fmt.Errorf("could not create new worker job: %w", err) + if err := worker.Publish(req.Code, req.RequestID, req.TestID); err != nil { + logger.Errorf("could not create new worker job: %v", err) + return respondError(c, http.StatusInternalServerError, requestID, testID, logBuffer, "could not create new worker job") } logger.Println("Published message") @@ -205,13 +245,22 @@ func (s *server) handleRun(c echo.Context) error { if timeout { return c.JSON(http.StatusServiceUnavailable, echo.Map{ - "error": "Execution timeout!", + "error": "Execution timeout!", + "requestId": requestID, + "testId": testID, + "logs": echo.Map{ + "control": logBuffer.String(), + }, }) } + payload.RequestID = requestID + payload.TestID = testID if !payload.Success { return c.JSON(http.StatusBadRequest, payload) } + c.Response().Header().Set("X-Request-ID", requestID) + c.Response().Header().Set("X-Test-ID", testID) return c.JSON(http.StatusOK, payload) } @@ -228,7 +277,7 @@ func (s *server) handleShareGet(c echo.Context) error { } func (s *server) handleShareCreate(c echo.Context) error { - code, err := ioutil.ReadAll(http.MaxBytesReader(c.Response().Writer, c.Request().Body, 1<<20)) + code, err := io.ReadAll(http.MaxBytesReader(c.Response().Writer, c.Request().Body, 1<<20)) if err != nil { return fmt.Errorf("could not read request body: %w", err) } diff --git a/control-service/workers.go b/control-service/workers.go index 1e85b40..1efc2ec 100644 --- a/control-service/workers.go +++ b/control-service/workers.go @@ -191,6 +191,14 @@ func (w *Worker) createPod() error { Name: "FILE_SERVICE_URL", Value: "http://file:8080", }, + { + Name: "LOG_AGGREGATOR_URL", + Value: os.Getenv("LOG_AGGREGATOR_URL"), + }, + { + Name: "LOG_AGGREGATOR_ENABLED", + Value: os.Getenv("LOG_AGGREGATOR_ENABLED"), + }, }, Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ @@ -219,9 +227,11 @@ func determineWorkerImageName(language workertypes.WorkerLanguage) string { return fmt.Sprintf("ghcr.io/mxschmitt/try-playwright/worker-%s:%s", language, tag) } -func (w *Worker) Publish(code string) error { - msgBody, err := json.Marshal(map[string]string{ - "code": code, +func (w *Worker) Publish(code string, requestID string, testID string) error { + msgBody, err := json.Marshal(&workertypes.WorkerRequestPayload{ + Code: code, + RequestID: requestID, + TestID: testID, }) if err != nil { return fmt.Errorf("could not marshal json: %v", err) diff --git a/e2e/tests/api.spec.ts b/e2e/tests/api.spec.ts index 517ea8f..0c5842d 100644 --- a/e2e/tests/api.spec.ts +++ b/e2e/tests/api.spec.ts @@ -1,5 +1,24 @@ import { expect, test as base, APIResponse } from '@playwright/test'; +async function attachAggregatorLogs(testId?: string) { + const testInfo = test.info(); + const effectiveTestId = testId || testInfo.testId; + const base = (process.env.LOG_AGGREGATOR_URL || '').replace(/\/$/, ''); + if (!base) return; + try { + const res = await fetch(`${base}/logs/${encodeURIComponent(effectiveTestId)}`); + if (!res.ok) return; + const body = await res.text(); + if (body.trim().length === 0) return; + await testInfo.attach(`logs-${effectiveTestId}`, { + body, + contentType: 'text/plain', + }); + } catch { + // best-effort; ignore + } +} + type TestFixtures = { executeCode: (code: string, language: string) => Promise }; @@ -7,13 +26,23 @@ type TestFixtures = { const test = base.extend({ executeCode: async ({ request }, use) => { await use(async (code: string, language: string) => { - return await request.post('/service/control/run', { + const requestId = test.info().testId; // align requestId with Playwright testId for log correlation + const testId = test.info().testId; + const resp = await request.post('/service/control/run', { + headers: { + 'X-Request-ID': requestId, + 'X-Test-ID': testId, + }, data: { code, - language + language, + requestId, + testId, }, timeout: 30 * 1000, - }) + }); + await attachAggregatorLogs(testId); + return resp; }); }, }); diff --git a/e2e/tests/visual.spec.ts b/e2e/tests/visual.spec.ts index e455046..51fd114 100644 --- a/e2e/tests/visual.spec.ts +++ b/e2e/tests/visual.spec.ts @@ -1,14 +1,41 @@ import { expect, test as base, Page } from '@playwright/test'; +async function attachAggregatorLogs(testId?: string) { + const testInfo = test.info(); + const effectiveTestId = testId || testInfo.testId; + const base = (process.env.LOG_AGGREGATOR_URL || '').replace(/\/$/, ''); + if (!base) return; + try { + const res = await fetch(`${base}/logs/${encodeURIComponent(effectiveTestId)}`); + if (!res.ok) return; + const body = await res.text(); + if (body.trim().length === 0) return; + await testInfo.attach(`logs-${effectiveTestId}`, { + body, + contentType: 'text/plain', + }); + } catch { + // best-effort; ignore + } +} + class TryPlaywrightPage { constructor(private readonly page: Page) { } async executeExample(nth: number): Promise { await this.page.goto('/?l=javascript'); await this.page.locator(`.rs-panel-group > .rs-panel:nth-child(${nth})`).click(); + const responsePromise = this.page.waitForResponse("**/service/control/run"); await Promise.all([ - this.page.waitForResponse("**/service/control/run"), + responsePromise, this.page.getByRole('button', { name: 'Run' }).click(), - ]) + ]); + const resp = await responsePromise; + try { + const payload = await resp.json(); + await attachAggregatorLogs(payload?.testId); + } catch (_) { + // ignore: best-effort log attachment + } } async getConsoleLines(): Promise { let consoleLines = this.page.locator(".rs-panel-body code") diff --git a/file-service/Dockerfile b/file-service/Dockerfile index 919cef6..ed28c27 100644 --- a/file-service/Dockerfile +++ b/file-service/Dockerfile @@ -1,11 +1,11 @@ -FROM golang:1.25-alpine as builder +FROM golang:1.25-alpine AS builder WORKDIR /root COPY go.mod /root/ COPY go.sum /root/ RUN go mod download COPY file-service/* /root/ -COPY internal/echoutils /root/internal/echoutils +COPY internal/ /root/internal/ RUN CGO_ENABLED=0 GOOS=linux go build -o /app main.go FROM alpine:latest diff --git a/file-service/main.go b/file-service/main.go index 5f833ef..6e1e4d9 100644 --- a/file-service/main.go +++ b/file-service/main.go @@ -15,6 +15,7 @@ import ( "github.com/h2non/filetype" "github.com/mxschmitt/try-playwright/internal/echoutils" + "github.com/mxschmitt/try-playwright/internal/logagg" log "github.com/sirupsen/logrus" "github.com/getsentry/sentry-go" @@ -93,6 +94,32 @@ type publicFile struct { } func (s *server) handleUploadImage(c echo.Context) error { + requestID := c.Request().Header.Get("X-Request-ID") + if requestID == "" { + requestID = uuid.New().String() + } + testID := c.Request().Header.Get("X-Test-ID") + if testID == "" { + testID = requestID + } + logBuffer := &bytes.Buffer{} + defer logagg.DeferPost("file-service", &testID, &requestID, logBuffer) + requestScopedLogger := log.New() + requestScopedLogger.SetFormatter(&log.JSONFormatter{ + TimestampFormat: time.RFC3339Nano, + FieldMap: log.FieldMap{ + log.FieldKeyMsg: "message", + }, + }) + requestScopedLogger.SetLevel(log.GetLevel()) + requestScopedLogger.SetOutput(io.MultiWriter(os.Stdout, logBuffer)) + logger := requestScopedLogger.WithFields(log.Fields{ + "request-id": requestID, + "testId": testID, + "service": "file-service", + }) + logger.Logger.AddHook(logagg.NewHook()) + // Maximum of 10MB if err := c.Request().ParseMultipartForm(10 << 20); err != nil { return fmt.Errorf("could not parse form: %w", err) @@ -127,6 +154,7 @@ func (s *server) handleUploadImage(c echo.Context) error { if err != nil { return fmt.Errorf("could not generate public URL: %w", err) } + logger.Infof("stored file %s (mime: %s)", files[i].Filename, mimeType.MIME.Value) outFiles = append(outFiles, publicFile{ Extension: fileExtension, FileName: files[i].Filename, @@ -134,6 +162,8 @@ func (s *server) handleUploadImage(c echo.Context) error { }) } } + c.Response().Header().Set("X-Request-ID", requestID) + c.Response().Header().Set("X-Test-ID", testID) return c.JSON(http.StatusCreated, outFiles) } diff --git a/internal/logagg/hook.go b/internal/logagg/hook.go new file mode 100644 index 0000000..cc12de4 --- /dev/null +++ b/internal/logagg/hook.go @@ -0,0 +1,43 @@ +package logagg + +import ( + "context" + "strings" + + log "github.com/sirupsen/logrus" +) + +// hook sends each logrus entry to the log-aggregator best-effort. +type hook struct{} + +func NewHook() log.Hook { + return &hook{} +} + +func (h *hook) Levels() []log.Level { + return log.AllLevels +} + +func (h *hook) Fire(entry *log.Entry) error { + if entry == nil { + return nil + } + testID := getString(entry.Data, "testId") + requestID := getString(entry.Data, "request-id") + service := getString(entry.Data, "service") + message := entry.Message + if message == "" || testID == "" { + return nil + } + Post(context.Background(), service, testID, requestID, message) + return nil +} + +func getString(m log.Fields, key string) string { + if v, ok := m[key]; ok { + if s, ok := v.(string); ok { + return strings.TrimSpace(s) + } + } + return "" +} diff --git a/internal/logagg/logagg.go b/internal/logagg/logagg.go new file mode 100644 index 0000000..d383024 --- /dev/null +++ b/internal/logagg/logagg.go @@ -0,0 +1,87 @@ +package logagg + +import ( + "bytes" + "context" + "encoding/json" + "net" + "net/http" + "os" + "strings" + "time" +) + +var httpClient = &http.Client{ + Timeout: 3 * time.Second, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 1 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 1 * time.Second, + ResponseHeaderTimeout: 2 * time.Second, + IdleConnTimeout: 10 * time.Second, + MaxIdleConns: 10, + MaxIdleConnsPerHost: 2, + }, +} + +type payload struct { + TestID string `json:"testId"` + RequestID string `json:"requestId,omitempty"` + Service string `json:"service"` + Message string `json:"message"` +} + +// Post sends logs to the log-aggregator if LOG_AGGREGATOR_URL is set. +// Best-effort: ignores errors and returns quickly. +func Post(ctx context.Context, service, testID, requestID, message string) { + if os.Getenv("LOG_AGGREGATOR_ENABLED") != "true" { + return + } + baseURL := strings.TrimSuffix(os.Getenv("LOG_AGGREGATOR_URL"), "/") + if baseURL == "" || testID == "" || message == "" { + return + } + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + body, err := json.Marshal(payload{ + TestID: testID, + RequestID: requestID, + Service: service, + Message: message, + }) + if err != nil { + return + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, baseURL+"/logs", bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return + } + resp.Body.Close() +} + +// DeferPost returns a function intended to be deferred so logs are sent at the end of a request. +// testID/requestID may be nil; the buffer is read at call-time. +func DeferPost(service string, testID, requestID *string, buf *bytes.Buffer) func() { + return func() { + if buf == nil { + return + } + tid := "" + rid := "" + if testID != nil { + tid = *testID + } + if requestID != nil { + rid = *requestID + } + Post(context.Background(), service, tid, rid, buf.String()) + } +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 9f56785..e2e3a46 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -6,32 +6,50 @@ import ( "errors" "fmt" "io" - "log" "mime/multipart" "net/http" "os" "os/exec" "path/filepath" "strings" + "time" + "github.com/mxschmitt/try-playwright/internal/logagg" "github.com/mxschmitt/try-playwright/internal/workertypes" + log "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) type executionHandler func(worker *Worker, code string) error type Worker struct { - options *WorkerExectionOptions - channel *amqp.Channel - TmpDir string - output *bytes.Buffer - files []string - env []string + options *WorkerExectionOptions + channel *amqp.Channel + TmpDir string + requestID string + testID string + logger *log.Logger + logBuffer *bytes.Buffer + output *bytes.Buffer + files []string + env []string } var queue_name = fmt.Sprintf("rpc_queue_%s", os.Getenv("WORKER_ID")) func (w *Worker) Run() { + w.logBuffer = new(bytes.Buffer) + w.logger = log.New() + w.logger.SetFormatter(&log.JSONFormatter{ + TimestampFormat: time.RFC3339Nano, + FieldMap: log.FieldMap{ + log.FieldKeyMsg: "message", + }, + }) + w.logger.SetOutput(io.MultiWriter(os.Stdout, w.logBuffer)) + w.logger.SetLevel(log.InfoLevel) + w.logger.AddHook(logagg.NewHook()) + if w.options.ExecutionDirectory != "" { w.TmpDir = w.options.ExecutionDirectory } else { @@ -125,18 +143,40 @@ func (w *Worker) consumeMessage(incomingMessages <-chan amqp.Delivery) error { if err := json.Unmarshal(incomingMessage.Body, &incomingMessageParsed); err != nil { return fmt.Errorf("could not parse incoming amqp message: %w", err) } - outgoingMessage := &workertypes.WorkerResponsePayload{Version: os.Getenv("PLAYWRIGHT_VERSION")} + w.requestID = incomingMessageParsed.RequestID + w.testID = incomingMessageParsed.TestID + defer logagg.DeferPost("worker", &w.testID, &w.requestID, w.logBuffer) + if w.requestID != "" { + w.AddEnv("PLAYWRIGHT_REQUEST_ID", w.requestID) + } + if w.testID != "" { + w.AddEnv("PLAYWRIGHT_TEST_ID", w.testID) + } + w.logger.WithFields(log.Fields{ + "request-id": w.requestID, + "testId": w.testID, + "service": "worker", + }).Info("received execution message") + outgoingMessage := &workertypes.WorkerResponsePayload{ + Version: os.Getenv("PLAYWRIGHT_VERSION"), + Files: []workertypes.File{}, + } if err := w.options.Handler(w, incomingMessageParsed.Code); err != nil { outgoingMessage.Success = false outgoingMessage.Error = err.Error() } else { outgoingMessage.Success = true - outgoingMessage.Files, err = w.uploadFiles() + files, err := w.uploadFiles() if err != nil { return fmt.Errorf("could not upload files: %w", err) } + if files != nil { + outgoingMessage.Files = files + } } outgoingMessage.Output = w.options.TransformOutput(w.output.String()) + outgoingMessage.RequestID = w.requestID + outgoingMessage.TestID = w.testID outgoingMessageBody, err := json.Marshal(outgoingMessage) if err != nil { return fmt.Errorf("could not marshal outgoing message payload: %w", err) @@ -164,6 +204,13 @@ func (w *Worker) consumeMessage(incomingMessages <-chan amqp.Delivery) error { var uploadFilesEndpoint = fmt.Sprintf("%s/api/v1/file/upload", os.Getenv("FILE_SERVICE_URL")) func (w *Worker) uploadFiles() ([]workertypes.File, error) { + if len(w.files) == 0 { + return nil, nil + } + w.logger.WithFields(log.Fields{ + "request-id": w.requestID, + "testId": w.testID, + }).Infof("uploading %d file(s) to file service", len(w.files)) var b bytes.Buffer requestWriter := multipart.NewWriter(&b) for i, filePath := range w.files { @@ -189,6 +236,12 @@ func (w *Worker) uploadFiles() ([]workertypes.File, error) { return nil, fmt.Errorf("could not create new request: %w", err) } req.Header.Set("Content-Type", requestWriter.FormDataContentType()) + if w.requestID != "" { + req.Header.Set("X-Request-ID", w.requestID) + } + if w.testID != "" { + req.Header.Set("X-Test-ID", w.testID) + } res, err := http.DefaultClient.Do(req) if err != nil { diff --git a/internal/workertypes/types.go b/internal/workertypes/types.go index 59e1030..27a344d 100644 --- a/internal/workertypes/types.go +++ b/internal/workertypes/types.go @@ -7,18 +7,22 @@ type File struct { } type WorkerResponsePayload struct { - Success bool `json:"success"` - Error string `json:"error"` - Version string `json:"version"` - Duration int64 `json:"duration"` - Files []File `json:"files"` - Output string `json:"output"` + Success bool `json:"success"` + Error string `json:"error"` + Version string `json:"version"` + Duration int64 `json:"duration"` + Files []File `json:"files"` + Output string `json:"output"` + RequestID string `json:"requestId"` + TestID string `json:"testId"` } type WorkerRequestPayload struct { - Token string `json:"token"` - Code string `json:"code"` - Language WorkerLanguage `json:"language"` + Token string `json:"token"` + Code string `json:"code"` + RequestID string `json:"requestId"` + TestID string `json:"testId"` + Language WorkerLanguage `json:"language"` } type WorkerLanguage string diff --git a/k8/control-deployment.yaml.tpl b/k8/control-deployment.yaml.tpl index 8c96acf..2fa2e1e 100644 --- a/k8/control-deployment.yaml.tpl +++ b/k8/control-deployment.yaml.tpl @@ -31,6 +31,10 @@ spec: value: ${DOCKER_TAG} - name: TURNSTILE_SECRET_KEY value: "${TURNSTILE_SECRET_KEY}" + - name: LOG_AGGREGATOR_URL + value: "${LOG_AGGREGATOR_URL}" + - name: LOG_AGGREGATOR_ENABLED + value: "${LOG_AGGREGATOR_ENABLED}" image: ghcr.io/mxschmitt/try-playwright/control-service:${DOCKER_TAG} name: control imagePullPolicy: IfNotPresent diff --git a/k8/file-deployment.yaml.tpl b/k8/file-deployment.yaml.tpl index c4a5ffc..266bc27 100644 --- a/k8/file-deployment.yaml.tpl +++ b/k8/file-deployment.yaml.tpl @@ -27,6 +27,10 @@ spec: value: "${MINIO_ROOT_PASSWORD}" - name: FILE_SERVICE_SENTRY_DSN value: https://3911972a34944ec5bd8b681a252d4f1d@o359550.ingest.sentry.io/5479804 + - name: LOG_AGGREGATOR_URL + value: "${LOG_AGGREGATOR_URL}" + - name: LOG_AGGREGATOR_ENABLED + value: "${LOG_AGGREGATOR_ENABLED}" image: ghcr.io/mxschmitt/try-playwright/file-service:${DOCKER_TAG} imagePullPolicy: IfNotPresent name: file diff --git a/k8/log-aggregator-deployment.yaml.tpl b/k8/log-aggregator-deployment.yaml.tpl new file mode 100644 index 0000000..aa0db17 --- /dev/null +++ b/k8/log-aggregator-deployment.yaml.tpl @@ -0,0 +1,26 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + io.kompose.service: log-aggregator + name: log-aggregator +spec: + replicas: 1 + selector: + matchLabels: + io.kompose.service: log-aggregator + template: + metadata: + labels: + io.kompose.service: log-aggregator + spec: + containers: + - env: + - name: LOG_AGGREGATOR_PORT + value: "8080" + image: ghcr.io/mxschmitt/try-playwright/log-aggregator:${DOCKER_TAG} + name: log-aggregator + ports: + - containerPort: 8080 + imagePullPolicy: IfNotPresent + restartPolicy: Always diff --git a/k8/log-aggregator-service.yaml b/k8/log-aggregator-service.yaml new file mode 100644 index 0000000..3fbdd8c --- /dev/null +++ b/k8/log-aggregator-service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: log-aggregator +spec: + type: NodePort + selector: + io.kompose.service: log-aggregator + ports: + - name: http + port: 8080 + targetPort: 8080 + nodePort: 30092 diff --git a/log-aggregator/Dockerfile b/log-aggregator/Dockerfile new file mode 100644 index 0000000..1cc16fa --- /dev/null +++ b/log-aggregator/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.25-alpine AS builder +WORKDIR /root +COPY go.mod go.sum /root/ +RUN go mod download + +COPY log-aggregator /root/log-aggregator +RUN CGO_ENABLED=0 GOOS=linux go build -o /app ./log-aggregator + +FROM alpine:latest +RUN apk --no-cache add ca-certificates +COPY --from=builder /app /app +CMD ["/app"] diff --git a/log-aggregator/main.go b/log-aggregator/main.go new file mode 100644 index 0000000..f35ceba --- /dev/null +++ b/log-aggregator/main.go @@ -0,0 +1,180 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" + "sync" + "time" +) + +const ( + storeTTL = 60 * time.Minute + cleanupInterval = 5 * time.Minute + maxRequestBodySize = 1 << 20 // 1MB +) + +type logEntry struct { + ts time.Time + service string + requestID string + message string +} + +type testLog struct { + entries []logEntry + expires time.Time +} + +type store struct { + mu sync.Mutex + items map[string]*testLog +} + +func newStore() *store { + return &store{ + items: make(map[string]*testLog), + } +} + +func (s *store) add(testID, requestID, service, message string) { + s.mu.Lock() + defer s.mu.Unlock() + now := time.Now() + tl, ok := s.items[testID] + if !ok { + tl = &testLog{} + s.items[testID] = tl + } + lines := splitLines(message) + for _, line := range lines { + if strings.TrimSpace(line) == "" { + continue + } + tl.entries = append(tl.entries, logEntry{ + ts: now, + service: service, + requestID: requestID, + message: line, + }) + } + tl.expires = now.Add(storeTTL) +} + +func (s *store) get(testID string) []logEntry { + s.mu.Lock() + defer s.mu.Unlock() + tl, ok := s.items[testID] + if !ok { + return nil + } + out := make([]logEntry, len(tl.entries)) + copy(out, tl.entries) + return out +} + +func (s *store) cleanupExpired() { + for { + time.Sleep(cleanupInterval) + now := time.Now() + s.mu.Lock() + for k, v := range s.items { + if v.expires.Before(now) { + delete(s.items, k) + } + } + s.mu.Unlock() + } +} + +type postPayload struct { + TestID string `json:"testId"` + RequestID string `json:"requestId"` + Service string `json:"service"` + Message string `json:"message"` +} + +func splitLines(s string) []string { + // Normalize to \n then split. + s = strings.ReplaceAll(s, "\r\n", "\n") + return strings.Split(s, "\n") +} + +func main() { + log.SetOutput(os.Stdout) + log.SetFlags(log.LstdFlags | log.Lmicroseconds) + + mem := newStore() + go mem.cleanupExpired() + + mux := http.NewServeMux() + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + mux.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize) + defer r.Body.Close() + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "could not read body", http.StatusBadRequest) + return + } + var p postPayload + if err := json.Unmarshal(body, &p); err != nil { + http.Error(w, "could not parse json", http.StatusBadRequest) + return + } + if strings.TrimSpace(p.TestID) == "" || strings.TrimSpace(p.Message) == "" { + http.Error(w, "missing testId or message", http.StatusBadRequest) + return + } + mem.add(p.TestID, p.RequestID, p.Service, p.Message) + w.WriteHeader(http.StatusAccepted) + }) + + mux.HandleFunc("/logs/", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + testID := strings.TrimPrefix(r.URL.Path, "/logs/") + if testID == "" { + http.Error(w, "missing testId", http.StatusBadRequest) + return + } + entries := mem.get(testID) + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + for _, e := range entries { + line := fmt.Sprintf("%s [%s] [request:%s] %s\n", e.ts.UTC().Format(time.RFC3339Nano), e.service, e.requestID, e.message) + if _, err := w.Write([]byte(line)); err != nil { + log.Printf("error writing log line: %v", err) + break + } + } + }) + + addr := ":8080" + if fromEnv := os.Getenv("LOG_AGGREGATOR_PORT"); fromEnv != "" { + addr = ":" + fromEnv + } + server := &http.Server{ + Addr: addr, + Handler: mux, + } + + log.Printf("log-aggregator listening on %s", addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("could not start server: %v", err) + } +} diff --git a/worker-csharp/Dockerfile b/worker-csharp/Dockerfile index da728b9..43582f8 100644 --- a/worker-csharp/Dockerfile +++ b/worker-csharp/Dockerfile @@ -1,5 +1,5 @@ ARG PLAYWRIGHT_VERSION=1.57.0 -FROM golang:1.25-alpine as builder +FROM golang:1.25-alpine AS builder WORKDIR /root COPY go.mod /root/ COPY go.sum /root/ diff --git a/worker-java/Dockerfile b/worker-java/Dockerfile index 81bf4f1..3acd4a6 100644 --- a/worker-java/Dockerfile +++ b/worker-java/Dockerfile @@ -1,5 +1,5 @@ ARG PLAYWRIGHT_VERSION=1.52.0 -FROM golang:1.25-alpine as builder +FROM golang:1.25-alpine AS builder WORKDIR /root COPY go.mod /root/ COPY go.sum /root/ diff --git a/worker-javascript/Dockerfile b/worker-javascript/Dockerfile index 3b935f6..38de258 100644 --- a/worker-javascript/Dockerfile +++ b/worker-javascript/Dockerfile @@ -1,5 +1,5 @@ ARG PLAYWRIGHT_VERSION=1.57.0 -FROM golang:1.25-alpine as builder +FROM golang:1.25-alpine AS builder WORKDIR /root COPY go.mod /root/ COPY go.sum /root/ diff --git a/worker-python/Dockerfile b/worker-python/Dockerfile index 13118a7..479f268 100644 --- a/worker-python/Dockerfile +++ b/worker-python/Dockerfile @@ -1,5 +1,5 @@ ARG PLAYWRIGHT_VERSION=1.57.0 -FROM golang:1.25-alpine as builder +FROM golang:1.25-alpine AS builder WORKDIR /root COPY go.mod /root/ COPY go.sum /root/