Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 68 additions & 21 deletions zooid/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ type EventStore struct {
var _ eventstore.Store = (*EventStore)(nil)

func (events *EventStore) Init() error {
// Base tables and the indexes whose definitions reference only
// columns present in those CREATE TABLE statements. Indexes that
// depend on columns added by later migrations live in those
// migration files, not here, so we don't reference a column that
// doesn't exist yet on a pre-migration schema.
statements := []string{
events.Schema.Render(`
CREATE TABLE IF NOT EXISTS {{.Name}}__events (
Expand All @@ -107,6 +112,7 @@ func (events *EventStore) Init() error {
event_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
kind INTEGER,
FOREIGN KEY (event_id) REFERENCES {{.Name}}__events(id) ON DELETE CASCADE
)`),
events.Schema.Render(`CREATE INDEX IF NOT EXISTS {{.Name}}__idx_event_tags_event_id ON {{.Name}}__event_tags(event_id)`),
Expand Down Expand Up @@ -202,7 +208,13 @@ func (events *EventStore) queryEventsWith(ctx context.Context, runner squirrel.B
queryStart := time.Now()
var drainTotal time.Duration

rows, err := events.buildSelectQuery(filter).RunWith(runner).QueryContext(ctx)
qb, err := events.buildSelectQuery(filter)
if err != nil {
observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal)
log.Printf("QueryEvents buildSelectQuery error: %v", err)
return
}
rows, err := qb.RunWith(runner).QueryContext(ctx)
if err != nil {
observeQueryTimings(totalObserver, dbObserver, drainObserver, queryStart, drainTotal)
log.Printf("QueryEvents query error: %v", err)
Expand Down Expand Up @@ -280,7 +292,7 @@ func observeQueryTimings(total, db, drain prometheus.Observer, queryStart time.T
drain.Observe(drainTotal.Seconds())
}

func (events *EventStore) buildSelectQuery(filter nostr.Filter) squirrel.SelectBuilder {
func (events *EventStore) buildSelectQuery(filter nostr.Filter) (squirrel.SelectBuilder, error) {
eventsTable := events.Schema.Prefix("events")
eventTagsTable := events.Schema.Prefix("event_tags")

Expand All @@ -304,6 +316,18 @@ func (events *EventStore) buildSelectQuery(filter nostr.Filter) squirrel.SelectB
return tagFilters[i].key < tagFilters[j].key
})

// Pre-compute filter.Kinds once. Used both inside the tag CTE (to
// push kind into the (key, value, kind, event_id) covering index —
// critical for hot groups whose tag rows are dominated by membership
// events, see issue #23) and on the outer events query.
var kindInts []interface{}
if len(filter.Kinds) > 0 {
kindInts = make([]interface{}, len(filter.Kinds))
for i, k := range filter.Kinds {
kindInts[i] = int(k)
}
}

// When tag filters are present, use a materialized CTE to force the
// planner to resolve tag lookups FIRST via the covering index, then
// join the small result set to events.
Expand All @@ -330,7 +354,24 @@ func (events *EventStore) buildSelectQuery(filter nostr.Filter) squirrel.SelectB
From(eventTagsTable).
Where(squirrel.Eq{"key": tf.key}).
Where(squirrel.Eq{"value": tf.values})
sql, args, _ := subQ.ToSql()
if len(kindInts) > 0 {
// `kind IS NULL` keeps reads correct for un-backfilled
// rows (event_tags.kind is added nullable; backfill is a
// separate ops step). Drop the IS NULL branch in a
// follow-up once the backfill is verified complete.
subQ = subQ.Where(squirrel.Or{
squirrel.Eq{"kind": kindInts},
squirrel.Expr("kind IS NULL"),
})
}
sql, args, err := subQ.ToSql()
if err != nil {
// squirrel.Select.ToSql only fails for malformed builder
// state, not user input. Propagate so callers in the
// query and count paths can log and short-circuit
// instead of crashing the process.
return squirrel.SelectBuilder{}, fmt.Errorf("buildSelectQuery: tag CTE ToSql: %w", err)
}
cteParts = append(cteParts, sql)
cteArgs = append(cteArgs, args...)
}
Expand Down Expand Up @@ -370,11 +411,7 @@ func (events *EventStore) buildSelectQuery(filter nostr.Filter) squirrel.SelectB
qb = qb.Where(squirrel.Eq{col + "pubkey": authorStrs})
}

if len(filter.Kinds) > 0 {
kindInts := make([]interface{}, len(filter.Kinds))
for i, kind := range filter.Kinds {
kindInts[i] = int(kind)
}
if len(kindInts) > 0 {
qb = qb.Where(squirrel.Eq{col + "kind": kindInts})
}

Expand All @@ -390,7 +427,7 @@ func (events *EventStore) buildSelectQuery(filter nostr.Filter) squirrel.SelectB
qb = qb.Limit(uint64(filter.Limit))
}

return qb
return qb, nil
}

// buildTagFilteredQuery constructs a raw SQL query using a materialized CTE
Expand Down Expand Up @@ -496,30 +533,37 @@ func (events *EventStore) saveEventWith(ctx context.Context, runner squirrel.Bas
}

// Insert single-letter tags into event_tags, chunked to stay below
// Postgres's 65535 extended-protocol parameter limit. With 3 columns per
// row, 15000 rows × 3 = 45000 params is well under the 65535 cap and
// cuts the inner round-trip count by ~3× vs. 5k batches — important for
// kind-39002 (NIP-29 member list) saves where the whole transaction runs
// under SERIALIZABLE isolation and contention is dominated by wall-clock
// duration of the critical section (issues #13, #16).
// Postgres's 65535 extended-protocol parameter limit. With 4 columns
// per row (event_id, key, value, kind), 15000 rows × 4 = 60000 params
// stays under the 65535 cap and matches the round-trip economy that
// matters most for kind-39002 (NIP-29 member list) saves — those run
// under SERIALIZABLE isolation and contention is dominated by the
// wall-clock duration of the critical section (issues #13, #16). If
// another column is ever added here the batch size must drop.
const tagInsertBatchSize = 15000

eventID := evt.ID.Hex()
eventKind := int(evt.Kind)
tagsTable := events.Schema.Prefix("event_tags")
batch := sb.Insert(tagsTable).Columns("event_id", "key", "value")
// kind is denormalized here so the tag-filter CTE in buildSelectQuery
// can pre-filter by kind via the (key, value, kind, event_id) covering
// index — without it, hot groups whose tag-rows are dominated by
// membership events (kinds 9000/9021) hash-join 90k+ rows just to throw
// 95% of them away on the kind filter. See zooid issue #23.
batch := sb.Insert(tagsTable).Columns("event_id", "key", "value", "kind")
n := 0

for _, tag := range evt.Tags {
if len(tag) < 2 || len(tag[0]) != 1 {
continue
}
batch = batch.Values(eventID, tag[0], tag[1])
batch = batch.Values(eventID, tag[0], tag[1], eventKind)
n++
if n >= tagInsertBatchSize {
if _, err := batch.RunWith(runner).ExecContext(ctx); err != nil {
return fmt.Errorf("failed to save tags for event '%s': %w", evt.ID, err)
}
batch = sb.Insert(tagsTable).Columns("event_id", "key", "value")
batch = sb.Insert(tagsTable).Columns("event_id", "key", "value", "kind")
n = 0
}
}
Expand Down Expand Up @@ -648,16 +692,19 @@ func (events *EventStore) CountEvents(filter nostr.Filter) (uint32, error) {
// Strip limit for a true total count; ORDER BY in the subquery is
// optimized away by PostgreSQL's planner inside COUNT(*).
filter.Limit = 0
qb := events.buildSelectQuery(filter).RemoveLimit()
qb, err := events.buildSelectQuery(filter)
if err != nil {
return 0, fmt.Errorf("failed to build count query: %w", err)
}
qb = qb.RemoveLimit()

countQb := sb.Select("COUNT(*)").FromSelect(qb, "subquery")

ctx, cancel := context.WithTimeout(events.rootCtx, dbOpTimeout)
defer cancel()

var count uint32
err := countQb.RunWith(GetDb()).QueryRowContext(ctx).Scan(&count)
if err != nil {
if err := countQb.RunWith(GetDb()).QueryRowContext(ctx).Scan(&count); err != nil {
return 0, fmt.Errorf("failed to count events: %w", err)
}

Expand Down
Loading
Loading