Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions zooid/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,17 @@ func (events *EventStore) queryEventsWith(ctx context.Context, runner squirrel.B
filter.Limit = maxLimit
}

observer := QueryDuration.With(prometheus.Labels{"instance": events.Config.Schema})
// WithLabelValues avoids the map allocation that With(Labels{...})
// does on every call — this is the per-query hot path.
totalObserver := QueryDuration.WithLabelValues(events.Config.Schema)
dbObserver := QueryDBDuration.WithLabelValues(events.Config.Schema)
drainObserver := QueryDrainDuration.WithLabelValues(events.Config.Schema)
queryStart := time.Now()
var drainTotal time.Duration

rows, err := events.buildSelectQuery(filter).RunWith(runner).QueryContext(ctx)
if err != nil {
observer.Observe(time.Since(queryStart).Seconds())
observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal)
log.Printf("QueryEvents query error: %v", err)
return
}
Expand Down Expand Up @@ -245,20 +251,35 @@ func (events *EventStore) queryEventsWith(ctx context.Context, runner squirrel.B
continue
}

if !yield(evt) {
observer.Observe(time.Since(queryStart).Seconds())
yieldStart := time.Now()
cont := yield(evt)
drainTotal += time.Since(yieldStart)
if !cont {
observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal)
return
}
}

observer.Observe(time.Since(queryStart).Seconds())
observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal)

if err := rows.Err(); err != nil {
log.Printf("QueryEvents row iteration error: %v", err)
}
}
}

// observeQueryTimings emits the three query-duration histograms in one
// place: total wall time, DB-side time (total - drain), and consumer-drain
// time. (wall - drainTotal) is non-negative because drainTotal is the sum
// of disjoint sub-intervals of the overall query wall time — yields run
// sequentially inside the iter.Seq, never concurrently.
func observeQueryTimings(total, db, drain prometheus.Observer, queryStart time.Time, drainTotal time.Duration) {
wall := time.Since(queryStart)
total.Observe(wall.Seconds())
db.Observe((wall - drainTotal).Seconds())
drain.Observe(drainTotal.Seconds())
}

func (events *EventStore) buildSelectQuery(filter nostr.Filter) squirrel.SelectBuilder {
eventsTable := events.Schema.Prefix("events")
eventTagsTable := events.Schema.Prefix("event_tags")
Expand Down
39 changes: 37 additions & 2 deletions zooid/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,43 @@ var (
Help: "Total chat messages in database",
}, []string{"instance"})

// Buckets cover up to the 30s dbOpTimeout (events.go) used by
// top-level QueryEvents callers. DefBuckets' last finite bucket is
// 10s, which clips percentiles in histogram_quantile for queries in
// the 10–30s range. Internal callers like replaceEventOnce wrap
// queryEventsWith with a 60s budget, so a small minority of reads
// can exceed 30s and land in +Inf — accepted in exchange for not
// growing the bucket count for an uncommon path.
queryDurationBuckets = []float64{
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 20, 30,
}

// QueryDuration is total wall time from query submit to last row
// yielded. Includes time blocked in `yield(evt)` waiting for the
// consumer. Kept as the historical metric so existing dashboards and
// alerts don't break; for diagnosing whether slowness is in Postgres
// or the WebSocket peer, prefer QueryDBDuration / QueryDrainDuration.
QueryDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "zooid_query_duration_seconds",
Help: "Duration of database queries (DB execution and row scanning)",
Buckets: prometheus.DefBuckets,
Help: "Total wall time of database queries (DB + row scan + parse + consumer drain)",
Buckets: queryDurationBuckets,
}, []string{"instance"})

// QueryDBDuration is QueryDuration minus the time blocked in
// `yield(evt)`. Approximates Postgres + driver + scan + parse time.
QueryDBDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "zooid_query_db_seconds",
Help: "DB-side query time (total minus consumer drain)",
Buckets: queryDurationBuckets,
}, []string{"instance"})

// QueryDrainDuration is the time spent blocked yielding events to
// the consumer. High values indicate client back-pressure, not slow
// Postgres.
QueryDrainDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "zooid_query_drain_seconds",
Help: "Duration spent blocked yielding query results to the consumer (back-pressure)",
Buckets: queryDurationBuckets,
}, []string{"instance"})
)

Expand All @@ -106,6 +139,8 @@ func init() {
eventsTotal,
messagesTotal,
QueryDuration,
QueryDBDuration,
QueryDrainDuration,
)
}

Expand Down
133 changes: 127 additions & 6 deletions zooid/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"fmt"
"sync"
"testing"
"time"

"fiatjaf.com/nostr"
"fiatjaf.com/nostr/khatru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
)

func createMetricsTestInstance(t *testing.T) *Instance {
Expand Down Expand Up @@ -340,16 +343,134 @@ func TestMetrics_QueryDurationHistogram(t *testing.T) {
inst := createMetricsTestInstance(t)

// Count observations before
before := testutil.CollectAndCount(QueryDuration)
beforeTotal := testutil.CollectAndCount(QueryDuration)
beforeDB := testutil.CollectAndCount(QueryDBDuration)
beforeDrain := testutil.CollectAndCount(QueryDrainDuration)

// Run a query to trigger the histogram
// Run a query to trigger the histograms
for range inst.Events.QueryEvents(nostr.Filter{Kinds: []nostr.Kind{1}}, 10) {
}

// The histogram should still be collectible (no panics, no errors)
after := testutil.CollectAndCount(QueryDuration)
if after < before {
t.Errorf("histogram metric count decreased: before=%d after=%d", before, after)
// All three histograms should still be collectible (no panics, no errors)
if afterTotal := testutil.CollectAndCount(QueryDuration); afterTotal < beforeTotal {
t.Errorf("QueryDuration metric count decreased: before=%d after=%d", beforeTotal, afterTotal)
}
if afterDB := testutil.CollectAndCount(QueryDBDuration); afterDB < beforeDB {
t.Errorf("QueryDBDuration metric count decreased: before=%d after=%d", beforeDB, afterDB)
}
if afterDrain := testutil.CollectAndCount(QueryDrainDuration); afterDrain < beforeDrain {
t.Errorf("QueryDrainDuration metric count decreased: before=%d after=%d", beforeDrain, afterDrain)
}
}

// readHistogram returns the current sample count and sum for a labeled
// histogram child.
func readHistogram(t *testing.T, vec *prometheus.HistogramVec, label string) (count uint64, sum float64) {
t.Helper()
m, err := vec.GetMetricWithLabelValues(label)
if err != nil {
t.Fatalf("GetMetricWithLabelValues: %v", err)
}
var pb dto.Metric
if err := m.(prometheus.Metric).Write(&pb); err != nil {
t.Fatalf("histogram.Write: %v", err)
}
h := pb.GetHistogram()
return h.GetSampleCount(), h.GetSampleSum()
}

// TestMetrics_QueryDrainAccounting proves that time blocked inside
// `yield(evt)` is recorded against QueryDrainDuration and excluded from
// QueryDBDuration. Without this split, a back-pressured WebSocket peer
// is indistinguishable from slow Postgres in the DB metric.
func TestMetrics_QueryDrainAccounting(t *testing.T) {
inst := createMetricsTestInstance(t)
label := inst.Config.Schema

// Insert a handful of plain text-note events under a fresh author so
// the query returns a known number of rows.
const nEvents = 5
sec := nostr.Generate()
for i := 0; i < nEvents; i++ {
evt := nostr.Event{
Kind: 1,
CreatedAt: nostr.Now(),
PubKey: sec.Public(),
Tags: nostr.Tags{},
Content: fmt.Sprintf("drain-test-%d", i),
}
evt.Sign(sec)
if err := inst.Events.SaveEvent(evt); err != nil {
t.Fatalf("SaveEvent[%d]: %v", i, err)
}
}

// Snapshot the histogram counts/sums before the slow-consumer query.
totalCountBefore, totalSumBefore := readHistogram(t, QueryDuration, label)
dbCountBefore, dbSumBefore := readHistogram(t, QueryDBDuration, label)
drainCountBefore, drainSumBefore := readHistogram(t, QueryDrainDuration, label)

// Run the query with a consumer that sleeps per event. The sleep time
// should accumulate in QueryDrainDuration and be excluded from
// QueryDBDuration.
const sleepPer = 50 * time.Millisecond
seen := 0
queryStart := time.Now()
for range inst.Events.QueryEvents(nostr.Filter{Kinds: []nostr.Kind{1}, Authors: []nostr.PubKey{sec.Public()}}, 100) {
seen++
time.Sleep(sleepPer)
}
wall := time.Since(queryStart)

if seen != nEvents {
t.Fatalf("expected %d events from QueryEvents, got %d", nEvents, seen)
}

totalCountAfter, totalSumAfter := readHistogram(t, QueryDuration, label)
dbCountAfter, dbSumAfter := readHistogram(t, QueryDBDuration, label)
drainCountAfter, drainSumAfter := readHistogram(t, QueryDrainDuration, label)

// Exactly one observation per query on each histogram.
if delta := totalCountAfter - totalCountBefore; delta != 1 {
t.Errorf("QueryDuration sample-count delta = %d, want 1", delta)
}
if delta := dbCountAfter - dbCountBefore; delta != 1 {
t.Errorf("QueryDBDuration sample-count delta = %d, want 1", delta)
}
if delta := drainCountAfter - drainCountBefore; delta != 1 {
t.Errorf("QueryDrainDuration sample-count delta = %d, want 1", delta)
}

totalDelta := totalSumAfter - totalSumBefore
dbDelta := dbSumAfter - dbSumBefore
drainDelta := drainSumAfter - drainSumBefore

// Drain should account for at least 90% of the cumulative sleep —
// scheduler jitter can shave a little off but shouldn't cut deeper.
expectedDrain := float64(nEvents) * sleepPer.Seconds()
if drainDelta < expectedDrain*0.9 {
t.Errorf("drain delta = %.3fs, want >= %.3fs (~ %d × %s)",
drainDelta, expectedDrain*0.9, nEvents, sleepPer)
}

// DB time must be strictly less than the drain time — the query
// itself runs against an in-process testcontainer Postgres and
// returns 5 rows; it should be milliseconds, far below the 250ms of
// induced sleep.
if dbDelta >= drainDelta {
t.Errorf("dbDelta (%.3fs) should be < drainDelta (%.3fs); drain accounting may be broken",
dbDelta, drainDelta)
}

// Total = DB + drain, modulo float arithmetic. Allow 1ms tolerance.
if got := dbDelta + drainDelta; got < totalDelta-0.001 || got > totalDelta+0.001 {
t.Errorf("db (%.3fs) + drain (%.3fs) = %.3fs, want ≈ total (%.3fs)",
dbDelta, drainDelta, got, totalDelta)
}

// Sanity: total must not exceed measured wall time of the loop.
if totalDelta > wall.Seconds()+0.05 {
t.Errorf("total observation %.3fs exceeds measured wall time %.3fs", totalDelta, wall.Seconds())
}
}

Expand Down
Loading