diff --git a/zooid/events.go b/zooid/events.go index 632128c..86ce46a 100644 --- a/zooid/events.go +++ b/zooid/events.go @@ -194,11 +194,17 @@ func (events *EventStore) queryEventsWith(ctx context.Context, runner squirrel.B filter.Limit = maxLimit } - observer := QueryDuration.With(prometheus.Labels{"instance": events.Config.Schema}) + // WithLabelValues avoids the map allocation that With(Labels{...}) + // does on every call — this is the per-query hot path. + totalObserver := QueryDuration.WithLabelValues(events.Config.Schema) + dbObserver := QueryDBDuration.WithLabelValues(events.Config.Schema) + drainObserver := QueryDrainDuration.WithLabelValues(events.Config.Schema) queryStart := time.Now() + var drainTotal time.Duration + rows, err := events.buildSelectQuery(filter).RunWith(runner).QueryContext(ctx) if err != nil { - observer.Observe(time.Since(queryStart).Seconds()) + observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal) log.Printf("QueryEvents query error: %v", err) return } @@ -245,13 +251,16 @@ func (events *EventStore) queryEventsWith(ctx context.Context, runner squirrel.B continue } - if !yield(evt) { - observer.Observe(time.Since(queryStart).Seconds()) + yieldStart := time.Now() + cont := yield(evt) + drainTotal += time.Since(yieldStart) + if !cont { + observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal) return } } - observer.Observe(time.Since(queryStart).Seconds()) + observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal) if err := rows.Err(); err != nil { log.Printf("QueryEvents row iteration error: %v", err) @@ -259,6 +268,18 @@ func (events *EventStore) queryEventsWith(ctx context.Context, runner squirrel.B } } +// observeQueryTimings emits the three query-duration histograms in one +// place: total wall time, DB-side time (total - drain), and consumer-drain +// time. (wall - drainTotal) is non-negative because drainTotal is the sum +// of disjoint sub-intervals of the overall query wall time — yields run +// sequentially inside the iter.Seq, never concurrently. +func observeQueryTimings(total, db, drain prometheus.Observer, queryStart time.Time, drainTotal time.Duration) { + wall := time.Since(queryStart) + total.Observe(wall.Seconds()) + db.Observe((wall - drainTotal).Seconds()) + drain.Observe(drainTotal.Seconds()) +} + func (events *EventStore) buildSelectQuery(filter nostr.Filter) squirrel.SelectBuilder { eventsTable := events.Schema.Prefix("events") eventTagsTable := events.Schema.Prefix("event_tags") diff --git a/zooid/metrics.go b/zooid/metrics.go index 3cf0f26..60de967 100644 --- a/zooid/metrics.go +++ b/zooid/metrics.go @@ -83,10 +83,43 @@ var ( Help: "Total chat messages in database", }, []string{"instance"}) + // Buckets cover up to the 30s dbOpTimeout (events.go) used by + // top-level QueryEvents callers. DefBuckets' last finite bucket is + // 10s, which clips percentiles in histogram_quantile for queries in + // the 10–30s range. Internal callers like replaceEventOnce wrap + // queryEventsWith with a 60s budget, so a small minority of reads + // can exceed 30s and land in +Inf — accepted in exchange for not + // growing the bucket count for an uncommon path. + queryDurationBuckets = []float64{ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 20, 30, + } + + // QueryDuration is total wall time from query submit to last row + // yielded. Includes time blocked in `yield(evt)` waiting for the + // consumer. Kept as the historical metric so existing dashboards and + // alerts don't break; for diagnosing whether slowness is in Postgres + // or the WebSocket peer, prefer QueryDBDuration / QueryDrainDuration. QueryDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "zooid_query_duration_seconds", - Help: "Duration of database queries (DB execution and row scanning)", - Buckets: prometheus.DefBuckets, + Help: "Total wall time of database queries (DB + row scan + parse + consumer drain)", + Buckets: queryDurationBuckets, + }, []string{"instance"}) + + // QueryDBDuration is QueryDuration minus the time blocked in + // `yield(evt)`. Approximates Postgres + driver + scan + parse time. + QueryDBDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "zooid_query_db_seconds", + Help: "DB-side query time (total minus consumer drain)", + Buckets: queryDurationBuckets, + }, []string{"instance"}) + + // QueryDrainDuration is the time spent blocked yielding events to + // the consumer. High values indicate client back-pressure, not slow + // Postgres. + QueryDrainDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "zooid_query_drain_seconds", + Help: "Duration spent blocked yielding query results to the consumer (back-pressure)", + Buckets: queryDurationBuckets, }, []string{"instance"}) ) @@ -106,6 +139,8 @@ func init() { eventsTotal, messagesTotal, QueryDuration, + QueryDBDuration, + QueryDrainDuration, ) } diff --git a/zooid/metrics_test.go b/zooid/metrics_test.go index 6515b93..463455e 100644 --- a/zooid/metrics_test.go +++ b/zooid/metrics_test.go @@ -5,10 +5,13 @@ import ( "fmt" "sync" "testing" + "time" "fiatjaf.com/nostr" "fiatjaf.com/nostr/khatru" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" ) func createMetricsTestInstance(t *testing.T) *Instance { @@ -340,16 +343,134 @@ func TestMetrics_QueryDurationHistogram(t *testing.T) { inst := createMetricsTestInstance(t) // Count observations before - before := testutil.CollectAndCount(QueryDuration) + beforeTotal := testutil.CollectAndCount(QueryDuration) + beforeDB := testutil.CollectAndCount(QueryDBDuration) + beforeDrain := testutil.CollectAndCount(QueryDrainDuration) - // Run a query to trigger the histogram + // Run a query to trigger the histograms for range inst.Events.QueryEvents(nostr.Filter{Kinds: []nostr.Kind{1}}, 10) { } - // The histogram should still be collectible (no panics, no errors) - after := testutil.CollectAndCount(QueryDuration) - if after < before { - t.Errorf("histogram metric count decreased: before=%d after=%d", before, after) + // All three histograms should still be collectible (no panics, no errors) + if afterTotal := testutil.CollectAndCount(QueryDuration); afterTotal < beforeTotal { + t.Errorf("QueryDuration metric count decreased: before=%d after=%d", beforeTotal, afterTotal) + } + if afterDB := testutil.CollectAndCount(QueryDBDuration); afterDB < beforeDB { + t.Errorf("QueryDBDuration metric count decreased: before=%d after=%d", beforeDB, afterDB) + } + if afterDrain := testutil.CollectAndCount(QueryDrainDuration); afterDrain < beforeDrain { + t.Errorf("QueryDrainDuration metric count decreased: before=%d after=%d", beforeDrain, afterDrain) + } +} + +// readHistogram returns the current sample count and sum for a labeled +// histogram child. +func readHistogram(t *testing.T, vec *prometheus.HistogramVec, label string) (count uint64, sum float64) { + t.Helper() + m, err := vec.GetMetricWithLabelValues(label) + if err != nil { + t.Fatalf("GetMetricWithLabelValues: %v", err) + } + var pb dto.Metric + if err := m.(prometheus.Metric).Write(&pb); err != nil { + t.Fatalf("histogram.Write: %v", err) + } + h := pb.GetHistogram() + return h.GetSampleCount(), h.GetSampleSum() +} + +// TestMetrics_QueryDrainAccounting proves that time blocked inside +// `yield(evt)` is recorded against QueryDrainDuration and excluded from +// QueryDBDuration. Without this split, a back-pressured WebSocket peer +// is indistinguishable from slow Postgres in the DB metric. +func TestMetrics_QueryDrainAccounting(t *testing.T) { + inst := createMetricsTestInstance(t) + label := inst.Config.Schema + + // Insert a handful of plain text-note events under a fresh author so + // the query returns a known number of rows. + const nEvents = 5 + sec := nostr.Generate() + for i := 0; i < nEvents; i++ { + evt := nostr.Event{ + Kind: 1, + CreatedAt: nostr.Now(), + PubKey: sec.Public(), + Tags: nostr.Tags{}, + Content: fmt.Sprintf("drain-test-%d", i), + } + evt.Sign(sec) + if err := inst.Events.SaveEvent(evt); err != nil { + t.Fatalf("SaveEvent[%d]: %v", i, err) + } + } + + // Snapshot the histogram counts/sums before the slow-consumer query. + totalCountBefore, totalSumBefore := readHistogram(t, QueryDuration, label) + dbCountBefore, dbSumBefore := readHistogram(t, QueryDBDuration, label) + drainCountBefore, drainSumBefore := readHistogram(t, QueryDrainDuration, label) + + // Run the query with a consumer that sleeps per event. The sleep time + // should accumulate in QueryDrainDuration and be excluded from + // QueryDBDuration. + const sleepPer = 50 * time.Millisecond + seen := 0 + queryStart := time.Now() + for range inst.Events.QueryEvents(nostr.Filter{Kinds: []nostr.Kind{1}, Authors: []nostr.PubKey{sec.Public()}}, 100) { + seen++ + time.Sleep(sleepPer) + } + wall := time.Since(queryStart) + + if seen != nEvents { + t.Fatalf("expected %d events from QueryEvents, got %d", nEvents, seen) + } + + totalCountAfter, totalSumAfter := readHistogram(t, QueryDuration, label) + dbCountAfter, dbSumAfter := readHistogram(t, QueryDBDuration, label) + drainCountAfter, drainSumAfter := readHistogram(t, QueryDrainDuration, label) + + // Exactly one observation per query on each histogram. + if delta := totalCountAfter - totalCountBefore; delta != 1 { + t.Errorf("QueryDuration sample-count delta = %d, want 1", delta) + } + if delta := dbCountAfter - dbCountBefore; delta != 1 { + t.Errorf("QueryDBDuration sample-count delta = %d, want 1", delta) + } + if delta := drainCountAfter - drainCountBefore; delta != 1 { + t.Errorf("QueryDrainDuration sample-count delta = %d, want 1", delta) + } + + totalDelta := totalSumAfter - totalSumBefore + dbDelta := dbSumAfter - dbSumBefore + drainDelta := drainSumAfter - drainSumBefore + + // Drain should account for at least 90% of the cumulative sleep — + // scheduler jitter can shave a little off but shouldn't cut deeper. + expectedDrain := float64(nEvents) * sleepPer.Seconds() + if drainDelta < expectedDrain*0.9 { + t.Errorf("drain delta = %.3fs, want >= %.3fs (~ %d × %s)", + drainDelta, expectedDrain*0.9, nEvents, sleepPer) + } + + // DB time must be strictly less than the drain time — the query + // itself runs against an in-process testcontainer Postgres and + // returns 5 rows; it should be milliseconds, far below the 250ms of + // induced sleep. + if dbDelta >= drainDelta { + t.Errorf("dbDelta (%.3fs) should be < drainDelta (%.3fs); drain accounting may be broken", + dbDelta, drainDelta) + } + + // Total = DB + drain, modulo float arithmetic. Allow 1ms tolerance. + if got := dbDelta + drainDelta; got < totalDelta-0.001 || got > totalDelta+0.001 { + t.Errorf("db (%.3fs) + drain (%.3fs) = %.3fs, want ≈ total (%.3fs)", + dbDelta, drainDelta, got, totalDelta) + } + + // Sanity: total must not exceed measured wall time of the loop. + if totalDelta > wall.Seconds()+0.05 { + t.Errorf("total observation %.3fs exceeds measured wall time %.3fs", totalDelta, wall.Seconds()) } }