diff --git a/docs/plans/everlight-supernode-compat-implementation-plan.md b/docs/plans/everlight-supernode-compat-implementation-plan.md new file mode 100644 index 00000000..5a441af4 --- /dev/null +++ b/docs/plans/everlight-supernode-compat-implementation-plan.md @@ -0,0 +1,265 @@ +# Everlight Supernode Compatibility Plan (from Lumera PR #113) + +Author: NightCrawler +Date: 2026-04-14 +Status: Proposed implementation plan (supernode-side) + +--- + +## 1) Scope and objective + +This plan defines **all required supernode-side changes** so `supernode` is fully compatible with Lumera Everlight Phase 1 behavior introduced in `lumera` PR #113. + +Primary goals: + +1. Supernode emits the **right epoch report data** for Everlight payout and storage-full state transitions. +2. Supernode audit epoch-report submission remains valid under current audit-module validation semantics. +3. Supernode/query usage is aligned with chain changes around `STORAGE_FULL` and new payout query surfaces. +4. Supernode + SDK behavior is covered by deterministic tests (unit + integration/system-level). + +--- + +## 2) Ground truth from chain (what changed and what supernode must respect) + +From `lumera` PR #113 and latest follow-up commits: + +- Everlight payout weighting uses `audit.v1 HostReport.cascade_kademlia_db_bytes`. +- `STORAGE_FULL` transition authority is now audit epoch report path (not legacy supernode metrics path). +- `SubmitEpochReport` semantics on chain currently enforce: + - if reporter is prober in epoch: peer observations for assigned targets are required, + - if reporter is not prober: peer observations are rejected. +- `GetTopSuperNodesForBlock` default selection excludes `POSTPONED` + `STORAGE_FULL` unless explicit state filters are used. +- New supernode query surfaces exist on chain: + - `pool-state`, `sn-eligibility`, `payout-history`. + +Implication: supernode must submit valid epoch reports (host + role-appropriate peer observations) and include cascade bytes in host report. + +--- + +## 3) Current supernode behavior and gaps (RCA) + +### 3.1 Current host reporter (`supernode/host_reporter/service.go`) + +Current behavior: +- Submits one `MsgSubmitEpochReport` per epoch. +- Queries assigned targets and builds `StorageChallengeObservations` correctly. +- Reports `disk_usage_percent` from local storage metrics. + +Gap: +- **Does not populate `HostReport.cascade_kademlia_db_bytes`**. + +Impact: +- Everlight payout eligibility/weight can evaluate as missing/zero bytes, causing payout misses or exclusion despite real stored data. + +### 3.2 Audit submit path (`pkg/lumera/modules/audit_msg/impl.go`) + +Current behavior: +- Builds and submits `MsgSubmitEpochReport`. +- Defensive copy of observations. + +Status: +- Compatible; no required API-level changes. + +### 3.3 Supernode chain query usage + +Current usage: +- Cascade selection path relies on `GetTopSupernodes` for action workflows. +- Bootstrap/routing path also uses list/top queries depending on component. + +Risk area: +- Default top query excludes `STORAGE_FULL`. This is desired for storage-action selection but must be understood by features expecting compute-eligible storage-full behavior. + +### 3.4 Legacy metrics collector (`supernode/supernode_metrics/*`) + +Current status: +- Legacy metrics tx path is superseded by audit epoch reports for this feature. + +Required posture: +- Do not rely on this path for Everlight payout bytes or storage-full transitions. + +--- + +## 4) Required implementation changes (supernode repo) + +## A) Mandatory functional changes + +### A1. Populate `cascade_kademlia_db_bytes` in host epoch report + +Target: +- `supernode/host_reporter/service.go` + +Required behavior: +- During `tick()`, set: + - `HostReport.CascadeKademliaDbBytes` = current node’s measured Cascade Kademlia DB bytes. + +Accepted source options (implementation choice): +1. Reuse existing P2P SQLite size calculation path (`sqliteOnDiskSizeBytes`) via an exported accessor. +2. Reuse status subsystem database metrics if available as absolute bytes (preferred to avoid duplicate logic). + +Important: +- Keep unit consistent with chain expectation: **bytes (not MB)**. +- If only MB is available from status pipeline, convert MB -> bytes deterministically. + +### A2. Keep epoch report submission role-aware and valid + +`host_reporter` already builds observations from `GetAssignedTargets`; preserve this. + +Hard requirements: +- For prober epochs: include one observation per assigned target with required port count. +- For non-prober epochs: send no observations. + +### A3. Operational logging clarity + +Add structured log fields on successful submit: +- epoch_id +- disk_usage_percent +- cascade_kademlia_db_bytes +- observations_count +- assigned_targets_count + +Purpose: rapid production diagnosis when payout/eligibility is questioned. + +--- + +## B) Query/client integration additions (recommended) + +### B1. Add supernode query client wrappers for Everlight surfaces + +Target: +- `pkg/lumera/modules/supernode` (or equivalent query module) + +Add methods: +- `GetPoolState(ctx)` +- `GetSNEligibility(ctx, validatorAddr)` +- `GetPayoutHistory(ctx, validatorAddr, pagination)` + +Why: +- Needed for operator diagnostics and future automation. +- Enables supernode runtime health endpoints to expose payout-readiness status. + +### B2. Add optional compatibility diagnostics endpoint/command + +Expose a compact “Everlight readiness” check in supernode runtime/admin tooling: +- current epoch report bytes value +- current state (ACTIVE/STORAGE_FULL/etc) +- `sn-eligibility` result + reason + +--- + +## C) Selection/behavior policy alignment checks + +### C1. Cascade action selection assumptions + +Review call sites that depend on `GetTopSupernodes` default filtering. + +Goal: +- Ensure storage actions still intentionally avoid `STORAGE_FULL` nodes unless explicit policy says otherwise. +- Document this intentionally in code comments near selection points. + +### C2. P2P bootstrap path sanity + +Paths using supernode list/top queries for routing/bootstrap should be reviewed for unintended exclusion impacts. + +Expected: +- bootstrap should not silently collapse due to state filter assumptions. + +--- + +## 5) Tests required on supernode side + +## Unit tests + +### U1. Host report bytes population + +File: +- `supernode/host_reporter/service_test.go` + +Verify: +- `cascade_kademlia_db_bytes` is populated and >0 when DB exists. +- Units are bytes. +- fallback behavior when measurement unavailable. + +### U2. Epoch role submission correctness + +Verify: +- prober epoch -> observations count matches assigned targets. +- non-prober epoch -> observations omitted. +- no nil observations. + +### U3. Report payload compatibility + +Via mocked audit msg module: +- `SubmitEpochReport` payload includes disk + cascade bytes + expected observation structure. + +## Integration tests (supernode repo) + +### I1. End-to-end epoch report compatibility + +With local chain/devnet harness: +- ensure report accepted for both prober/non-prober epochs. +- ensure no `invalid peer observations` under normal operation. + +### I2. Everlight query smoke checks (if wrappers added) + +Verify wrappers decode: +- `pool-state`, `sn-eligibility`, `payout-history`. + +### I3. Storage-full/payout readiness diagnostics + +After a high disk report: +- state transition visible via chain query. +- `sn-eligibility` reason/value observable. + +--- + +## 6) SDK / shared client considerations + +If `sdk-go` or shared internal clients are used by supernode admin tools: +- align with latest query/CLI flags and request shapes. +- ensure audit assigned-targets and epoch-anchor calls use current flag/proto semantics. + +No protocol-breaking SDK changes are required for `SubmitEpochReport`; this is payload completeness + query wrapper work. + +--- + +## 7) Rollout plan (safe order) + +1. Implement host bytes population (`cascade_kademlia_db_bytes`) + unit tests. +2. Add/confirm role-aware observation tests. +3. Add query wrappers + smoke tests. +4. Run supernode integration test flow against lumera branch with Everlight. +5. Document runtime/operator diagnostics commands. +6. Release with explicit compatibility notes. + +--- + +## 8) Risk register + +- **R1**: Wrong units (MB vs bytes) -> payout distortion. +- **R2**: Missing observations for prober epochs -> rejected reports. +- **R3**: Query wrapper drift against chain proto updates. +- **R4**: Using default top-node query where STORAGE_FULL inclusion is expected. + +Mitigations are covered by U1/U2/I1/I2/C1 checks. + +--- + +## 9) Definition of done + +Supernode-side compatibility is complete when: + +- host reporter includes `cascade_kademlia_db_bytes` in submitted epoch reports, +- epoch reports are accepted in both prober/non-prober roles without manual intervention, +- query wrappers for Everlight surfaces are available and tested, +- integration run confirms no report-validation regressions, +- docs include operator guidance for readiness and troubleshooting. + +--- + +## 10) PR breakdown recommendation (supernode) + +- PR A: host reporter payload + tests (mandatory) +- PR B: query wrappers + diagnostics + tests (recommended) +- PR C: optional bootstrap/selection policy hardening docs/comments + +This allows smallest-risk merge path while unblocking Everlight compatibility quickly. diff --git a/go.mod b/go.mod index 6ce1c3c7..96cee481 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/LumeraProtocol/supernode/v2 -go 1.25.5 +go 1.25.9 replace ( github.com/envoyproxy/protoc-gen-validate => github.com/bufbuild/protoc-gen-validate v1.3.0 @@ -12,11 +12,11 @@ require ( cosmossdk.io/math v1.5.3 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/DataDog/zstd v1.5.7 - github.com/LumeraProtocol/lumera v1.11.2-0.20260413145614-4ffe74bb13dc + github.com/LumeraProtocol/lumera v1.12.0-rc github.com/LumeraProtocol/rq-go v0.2.1 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/cenkalti/backoff/v4 v4.3.0 - github.com/cometbft/cometbft v0.38.20 + github.com/cometbft/cometbft v0.38.21 github.com/cosmos/btcutil v1.0.5 github.com/cosmos/cosmos-sdk v0.53.5 github.com/cosmos/go-bip39 v1.0.0 diff --git a/go.sum b/go.sum index e015dfd1..12319b95 100644 --- a/go.sum +++ b/go.sum @@ -111,8 +111,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/LumeraProtocol/lumera v1.11.2-0.20260413145614-4ffe74bb13dc h1:B43KT06s/4lE/LyVQevb0Xr5XqKy6nlel1fZh7G7w14= -github.com/LumeraProtocol/lumera v1.11.2-0.20260413145614-4ffe74bb13dc/go.mod h1:p2sZZG3bLzSBdaW883qjuU3DXXY4NJzTTwLywr8uI0w= +github.com/LumeraProtocol/lumera v1.12.0-rc h1:Mfae496LpjYhf1SvAE/bsmtjgdoOD8WAJFRCier8xsg= +github.com/LumeraProtocol/lumera v1.12.0-rc/go.mod h1:/G9LTPZB+261tHoWoj7q+1fn+O/VV0zzagwLdsThSNo= github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4= github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8= github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4= @@ -239,8 +239,8 @@ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1: github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coder/websocket v1.8.7 h1:jiep6gmlfP/yq2w1gBoubJEXL9gf8x3bp6lzzX8nJxE= github.com/coder/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= -github.com/cometbft/cometbft v0.38.20 h1:i9v9rvh3Z4CZvGSWrByAOpiqNq5WLkat3r/tE/B49RU= -github.com/cometbft/cometbft v0.38.20/go.mod h1:UCu8dlHqvkAsmAFmWDRWNZJPlu6ya2fTWZlDrWsivwo= +github.com/cometbft/cometbft v0.38.21 h1:qcIJSH9LiwU5s6ZgKR5eRbsLNucbubfraDs5bzgjtOI= +github.com/cometbft/cometbft v0.38.21/go.mod h1:UCu8dlHqvkAsmAFmWDRWNZJPlu6ya2fTWZlDrWsivwo= github.com/cometbft/cometbft-db v0.14.1 h1:SxoamPghqICBAIcGpleHbmoPqy+crij/++eZz3DlerQ= github.com/cometbft/cometbft-db v0.14.1/go.mod h1:KHP1YghilyGV/xjD5DP3+2hyigWx0WTp9X+0Gnx0RxQ= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index 4ecf53aa..3d5410db 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -16,8 +16,23 @@ import ( ) const ( - bootstrapRefreshInterval = 10 * time.Minute - defaultSuperNodeP2PPort int = 4445 + bootstrapRefreshInterval = 10 * time.Minute + // allowlistRefreshInterval is a faster refresh cadence for just the + // chain-derived routing/store allowlists. A full bootstrap cycle + // (bootstrapRefreshInterval) also refreshes replication_info, pings + // peers and reseeds the routing table; that is expensive and we do + // not want to pay that cost every time a supernode's on-chain state + // changes. This lightweight refresh re-queries ListSuperNodes and + // updates the allowlists + replication_info.Active flags only. + allowlistRefreshInterval = 30 * time.Second + // allowlistOpportunisticMinInterval is the minimum wall-clock gap + // between opportunistic allowlist refreshes from hot RPC paths + // (IterateBatchStore). Debounces a burst of concurrent uploads so + // they don't each spawn their own ListSuperNodes chain query. A + // successful refresh from the 30s background ticker counts, so in + // steady state the opportunistic path is always a no-op. + allowlistOpportunisticMinInterval = 10 * time.Second + defaultSuperNodeP2PPort int = 4445 ) // seed a couple of obviously bad addrs (unless in integration tests) @@ -108,16 +123,22 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes return nil } -// loadBootstrapCandidatesFromChain returns active supernodes (by latest state) -// mapped by "ip:port". No pings here. -func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, error) { +// loadBootstrapCandidatesFromChain returns routing candidates (by latest state) mapped by "ip:port", +// plus two allowlists: +// - routingIDs: Active + Postponed +// - storeIDs: Active only +// +// No pings here. +func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, map[[32]byte]struct{}, map[[32]byte]struct{}, error) { resp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx) if err != nil { - return nil, nil, fmt.Errorf("failed to list supernodes: %w", err) + return nil, nil, nil, fmt.Errorf("failed to list supernodes: %w", err) } - activeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) + routingIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) + storeIDs := make(map[[32]byte]struct{}, len(resp.Supernodes)) mapNodes := make(map[string]*Node, len(resp.Supernodes)) + selfID := strings.TrimSpace(string(s.options.ID)) for _, sn := range resp.Supernodes { if len(sn.States) == 0 { continue @@ -130,7 +151,16 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress latestState = int32(st.State) } } - if latestState != 1 { // SuperNodeStateActive = 1 + // Record self-state regardless of routing eligibility; the STORE + // RPC self-guard needs the authoritative value even when self is + // DISABLED/STOPPED/PENALIZED (in which case all states are + // non-store-eligible anyway). + if selfID != "" && strings.TrimSpace(sn.SupernodeAccount) == selfID { + s.setSelfState(latestState) + } + // Routing/read eligibility: {ACTIVE, POSTPONED, STORAGE_FULL}. + // Store/write eligibility: {ACTIVE} only. + if !isRoutingEligibleState(latestState) { continue } @@ -148,7 +178,10 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress } else if len(h) == 32 { var key [32]byte copy(key[:], h) - activeIDs[key] = struct{}{} + routingIDs[key] = struct{}{} + if isStoreEligibleState(latestState) { + storeIDs[key] = struct{}{} + } } // latest IP by height @@ -190,7 +223,7 @@ func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress node.ID = []byte(id) mapNodes[full] = node } - return mapNodes, activeIDs, nil + return mapNodes, routingIDs, storeIDs, nil } // upsertBootstrapNode inserts/updates replication_info for the discovered node (Active=false). @@ -245,6 +278,24 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro if err := s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes); err != nil { return err } + allow := make(map[[32]byte]struct{}, len(s.options.BootstrapNodes)) + for _, n := range s.options.BootstrapNodes { + if n == nil || len(n.ID) == 0 { + continue + } + h, err := utils.Blake3Hash(n.ID) + if err != nil || len(h) != 32 { + continue + } + var key [32]byte + copy(key[:], h) + allow[key] = struct{}{} + } + // Config bootstrap has no chain states; treat provided peers as both routing/store-eligible. + s.setRoutingAllowlist(ctx, allow) + s.setStoreAllowlist(ctx, allow) + s.pruneIneligibleRoutingPeers(ctx) + for _, n := range s.options.BootstrapNodes { if err := s.upsertBootstrapNode(ctx, n); err != nil { logtrace.Warn(ctx, "bootstrap upsert failed", logtrace.Fields{ @@ -265,15 +316,21 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro } selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port) - cands, activeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) + cands, routingIDs, storeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) if err != nil { return err } - // Update eligibility gate from chain Active state and prune any peers that slipped in via + // Update routing/read gate from chain state and prune any peers that slipped in via // inbound traffic before the last bootstrap refresh. - s.setRoutingAllowlist(ctx, activeIDs) + s.setRoutingAllowlist(ctx, routingIDs) + // Write/replication targets are Active-only. + s.setStoreAllowlist(ctx, storeIDs) s.pruneIneligibleRoutingPeers(ctx) + // Eagerly flip replication_info.Active=false for peers that are no + // longer store-eligible (STORAGE_FULL/POSTPONED/evicted). Closes the + // window between chain transition and next successful ping. + s.pruneIneligibleStorePeers(ctx) // Upsert candidates to replication_info seen := make(map[string]struct{}, len(cands)) @@ -303,13 +360,6 @@ func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) erro // This keeps replication_info and routing table current as the validator set changes. func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string) { go func() { - // Initial sync - if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { - logtrace.Warn(ctx, "initial bootstrap sync failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - } t := time.NewTicker(bootstrapRefreshInterval) defer t.Stop() @@ -329,14 +379,127 @@ func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string }() } +// RefreshAllowlistsFromChain re-queries the chain and refreshes ONLY the +// routing/store allowlists, the self-state cache, and the +// replication_info.Active flags. It does not ping peers, does not reseed the +// routing table, and does not upsert bootstrap candidates into replication_info. +// +// Called on a short interval (allowlistRefreshInterval) and opportunistically +// at the top of write-path RPCs (IterateBatchStore) so that on-chain +// STORAGE_FULL / POSTPONED transitions propagate to the local write-gate +// within O(seconds) rather than O(bootstrapRefreshInterval). +// +// Safe to call concurrently with SyncBootstrapOnce: both use the same +// setRoutingAllowlist/setStoreAllowlist serialization (mutexes in DHT). +func (s *DHT) RefreshAllowlistsFromChain(ctx context.Context) error { + if s == nil { + return nil + } + if integrationTestEnabled() { + return nil + } + if s.options == nil || s.options.LumeraClient == nil { + return nil + } + supernodeAddr, err := s.getSupernodeAddress(ctx) + if err != nil { + return fmt.Errorf("get supernode address: %w", err) + } + selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port) + _, routingIDs, storeIDs, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) + if err != nil { + return err + } + s.setRoutingAllowlist(ctx, routingIDs) + s.setStoreAllowlist(ctx, storeIDs) + // Flip replication_info.Active=false for peers no longer store-eligible. + s.pruneIneligibleStorePeers(ctx) + // Stamp the successful-refresh timestamp so MaybeRefreshAllowlists can + // debounce opportunistic callers on hot RPC paths. + s.lastAllowlistRefreshUnixNano.Store(time.Now().UnixNano()) + return nil +} + +// MaybeRefreshAllowlists is a debounced variant of RefreshAllowlistsFromChain +// intended for hot RPC paths (e.g. IterateBatchStore). It skips the chain +// query if ANY refresh attempt (successful or not) happened within +// allowlistOpportunisticMinInterval. The 30-second background +// StartAllowlistRefresher is the primary convergence mechanism; this +// function exists only to shrink the worst-case staleness window for the +// very first upload after a chain state transition. +// +// The debounce stamps on attempt (not success) so a persistently-failing +// chain does not produce one RPC attempt per batch; the background ticker +// retries on its own cadence. +// +// Returns true iff a refresh was attempted, false iff the call was skipped +// due to debounce. Returned err, if any, is from the underlying +// RefreshAllowlistsFromChain. +func (s *DHT) MaybeRefreshAllowlists(ctx context.Context) (refreshed bool, err error) { + if s == nil { + return false, nil + } + if integrationTestEnabled() { + return false, nil + } + last := s.lastAllowlistRefreshUnixNano.Load() + if last != 0 && time.Since(time.Unix(0, last)) < allowlistOpportunisticMinInterval { + return false, nil + } + // Stamp BEFORE the call so subsequent in-flight callers see the + // debounce, and so a chain failure does not reset the window to the + // next caller's wall clock. The background StartAllowlistRefresher is + // responsible for eventual convergence if this attempt fails. + s.lastAllowlistRefreshUnixNano.Store(time.Now().UnixNano()) + return true, s.RefreshAllowlistsFromChain(ctx) +} + +// StartAllowlistRefresher runs RefreshAllowlistsFromChain every +// allowlistRefreshInterval. Short cadence ensures on-chain state transitions +// are reflected in the local p2p write/read gates before the next upload +// tries to dispatch STORE RPCs. +func (s *DHT) StartAllowlistRefresher(ctx context.Context) { + go func() { + t := time.NewTicker(allowlistRefreshInterval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if err := s.RefreshAllowlistsFromChain(ctx); err != nil { + logtrace.Debug(ctx, "periodic allowlist refresh failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) + } + } + } + }() +} + // ConfigureBootstrapNodes wires to the new sync/refresher (no pings here). func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error { - // One-time sync; start refresher in the background + // One-time sync attempt; keep service running if it fails and rely on refresher retries. if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { - return err + logtrace.Warn(ctx, "initial bootstrap sync failed; continuing with periodic refresher", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) } + // Always start periodic retries so transient chain/API outages can recover. s.StartBootstrapRefresher(ctx, bootstrapNodes) + // Also start a fast allowlist-only refresher so on-chain + // STORAGE_FULL / POSTPONED transitions propagate to the local p2p + // write-gate within O(seconds) rather than O(bootstrapRefreshInterval). + // Without this, a supernode that just flipped to STORAGE_FULL remains + // in the local storeAllowlist until the next full bootstrap cycle and + // the client will dispatch STORE RPCs that the peer correctly rejects, + // tanking the success rate and failing uploads. + s.StartAllowlistRefresher(ctx) + return nil } diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 7466693f..6446797d 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -69,7 +69,8 @@ type DHT struct { metrics DHTMetrics // routingAllowlist is a fast in-memory gate of which peers are eligible to - // participate in the routing table (based on chain state: Active only). + // participate in routing/read lookup paths (based on chain state). + // Current policy: Active + Postponed are routing-eligible. // // Hot paths do only an atomic check + map lookup; updates happen on the // bootstrap refresh cadence. @@ -77,6 +78,29 @@ type DHT struct { routingAllow map[[32]byte]struct{} // blake3(peerID) -> exists routingAllowReady atomic.Bool routingAllowCount atomic.Int64 + + // storeAllowlist is a fast in-memory gate of which peers are eligible for + // write/replication targets. + // Current policy: Active only. + storeAllowMu sync.RWMutex + storeAllow map[[32]byte]struct{} // blake3(peerID) -> exists + storeAllowReady atomic.Bool + storeAllowCount atomic.Int64 + + // lastAllowlistRefreshUnixNano is the timestamp of the most recent + // successful RefreshAllowlistsFromChain call (unix nanoseconds, 0 if + // never). Used by MaybeRefreshAllowlists to debounce opportunistic + // refreshes on hot RPC paths so a burst of concurrent uploads does not + // fan out to one chain ListSuperNodes query per upload. + lastAllowlistRefreshUnixNano atomic.Int64 + + // selfState caches this node's latest chain state, refreshed on the + // bootstrap cadence. Used by STORE RPC self-guards to reject new-key + // writes when self is not ACTIVE (e.g. STORAGE_FULL, POSTPONED). + // A value of 0 (Unspecified) means "unknown" — handlers should not + // reject in that case to avoid lockout during bootstrap. + selfState atomic.Int32 + selfStateReady atomic.Bool } // bootstrapIgnoreList seeds the in-memory ignore list with nodes that are @@ -144,11 +168,11 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct // Avoid accidentally locking ourselves out due to transient chain issues. if len(allow) == 0 { if !s.routingAllowReady.Load() { - logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving gating disabled (bootstrap)", logtrace.Fields{ + logtrace.Debug(ctx, "routing allowlist from chain is empty; leaving routing gating disabled (bootstrap)", logtrace.Fields{ logtrace.FieldModule: "p2p", }) } else { - logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero active supernodes; retaining previous allowlist", logtrace.Fields{ + logtrace.Warn(ctx, "routing allowlist update skipped: chain returned zero routing-eligible supernodes; retaining previous allowlist", logtrace.Fields{ logtrace.FieldModule: "p2p", }) } @@ -164,7 +188,48 @@ func (s *DHT) setRoutingAllowlist(ctx context.Context, allow map[[32]byte]struct logtrace.Debug(ctx, "routing allowlist updated", logtrace.Fields{ logtrace.FieldModule: "p2p", - "active_peers": len(allow), + "routing_peers": len(allow), + }) +} + +func (s *DHT) setStoreAllowlist(ctx context.Context, allow map[[32]byte]struct{}) { + if s == nil { + return + } + // Integration tests may use synthetic bootstrap sets; do not enforce chain-state gating. + if integrationTestEnabled() { + return + } + // Avoid accidentally blocking ALL writes network-wide due to a transient + // chain issue. If ListSuperNodes returns zero write-eligible peers we + // retain the previous allowlist rather than clamping storeAllowCount to 0 + // and failing eligibleForStore for every peer. Symmetric with the + // setRoutingAllowlist guard. Pre-bootstrap this is a debug log (expected + // while the chain is still warming up); post-bootstrap it is a WARN because + // it indicates either a chain outage or a genuine empty-active-set condition. + if len(allow) == 0 { + if !s.storeAllowReady.Load() { + logtrace.Debug(ctx, "store allowlist from chain is empty; leaving store gating disabled (bootstrap)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + }) + } else { + logtrace.Warn(ctx, "store allowlist update skipped: chain returned zero store-eligible supernodes; retaining previous allowlist", logtrace.Fields{ + logtrace.FieldModule: "p2p", + }) + } + return + } + + s.storeAllowMu.Lock() + s.storeAllow = allow + s.storeAllowMu.Unlock() + + s.storeAllowCount.Store(int64(len(allow))) + s.storeAllowReady.Store(true) + + logtrace.Debug(ctx, "store allowlist updated", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "store_peers": len(allow), }) } @@ -172,15 +237,20 @@ func (s *DHT) eligibleForRouting(n *Node) bool { if s == nil { return false } + if n == nil || len(n.ID) == 0 { + return false + } // In integration tests allow everything; chain state gating is not stable/available there. if integrationTestEnabled() { return true } - // If allowlist isn't ready (or was never populated), do not gate to avoid blocking bootstrap. - if !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + // Bootstrap-safe behavior: until first non-empty chain allowlist arrives, + // keep routing/read gating disabled to avoid accidental lockout. + if !s.routingAllowReady.Load() { return true } - if n == nil || len(n.ID) == 0 { + // Once initialized, an empty active set means no routing-eligible peers. + if s.routingAllowCount.Load() == 0 { return false } @@ -197,14 +267,63 @@ func (s *DHT) eligibleForRouting(n *Node) bool { return ok } +func (s *DHT) eligibleForStore(n *Node) bool { + if s == nil { + return false + } + if n == nil || len(n.ID) == 0 { + return false + } + // In integration tests allow everything; chain state gating is not stable/available there. + if integrationTestEnabled() { + return true + } + // If the store allowlist isn't ready yet, avoid blocking writes during bootstrap. + if !s.storeAllowReady.Load() { + return true + } + // Once initialized, an empty active set means no write-eligible peers. + if s.storeAllowCount.Load() == 0 { + return false + } + + n.SetHashedID() + if len(n.HashedID) != 32 { + return false + } + var key [32]byte + copy(key[:], n.HashedID) + + s.storeAllowMu.RLock() + _, ok := s.storeAllow[key] + s.storeAllowMu.RUnlock() + return ok +} + func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node { if s == nil || len(nodes) == 0 { return nodes } - // Fast path: not enforcing (integration tests / not ready / empty list) - if integrationTestEnabled() || !s.routingAllowReady.Load() || s.routingAllowCount.Load() == 0 { + // Fast path for integration tests only. + if integrationTestEnabled() { return nodes } + // If the routing allowlist has not been initialized yet, keep gating disabled + // but still sanitize malformed node entries. + if !s.routingAllowReady.Load() { + out := nodes[:0] + for _, n := range nodes { + if n == nil || len(n.ID) == 0 { + continue + } + out = append(out, n) + } + return out + } + // Once initialized, empty means no routing-eligible peers. + if s.routingAllowCount.Load() == 0 { + return nil + } out := nodes[:0] for _, n := range nodes { @@ -930,6 +1049,11 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, } top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, hashes[i], ignoredSet, nil) + // Defensive: ensure we only ask routing-eligible peers. In steady + // state the hashtable is already filtered on admission; this closes + // the door against any future regression where a non-eligible peer + // enters via a new path. + top6.Nodes = s.filterEligibleNodes(top6.Nodes) closestMu.Lock() globalClosestContacts[keys[i]] = top6 closestMu.Unlock() @@ -1283,6 +1407,8 @@ func (s *DHT) BatchRetrieveStream( continue } topK := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, hashes[i], ignoredSet, nil) + // Defensive: filter to routing-eligible peers (see BatchRetrieve). + topK.Nodes = s.filterEligibleNodes(topK.Nodes) closestMu.Lock() globalClosestContacts[keys[i]] = topK closestMu.Unlock() @@ -2013,8 +2139,8 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { } node.SetHashedID() - // Chain-state gating: only allow Active supernodes into the routing table. - // This prevents postponed/disabled/stopped nodes from being admitted via inbound traffic. + // Chain-state routing gate (enabled after allowlist initialization): + // only chain-allowlisted peers may enter the routing table. if !s.eligibleForRouting(node) { logtrace.Debug(ctx, "Rejecting node: not eligible for routing", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -2105,6 +2231,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, launched := 0 for i := 0; i < Alpha && i < nl.Len(); i++ { n := nl.Nodes[i] + if !s.eligibleForStore(n) { + continue + } if s.ignorelist.Banned(n) { continue } @@ -2146,6 +2275,9 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, finalStoreCount := atomic.LoadInt32(&storeCount) for i := Alpha; i < nl.Len() && finalStoreCount < int32(Alpha); i++ { n := nl.Nodes[i] + if !s.eligibleForStore(n) { + continue + } if s.ignorelist.Banned(n) { logtrace.Debug(ctx, "Ignore banned node during sequential store", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -2278,15 +2410,47 @@ func (s *DHT) addKnownNodes(ctx context.Context, nodes []*Node, knownNodes map[s // during this run; success rate is successful responses divided by this count. // If the success rate is below `minimumDataStoreSuccessRate`, an error is // returned alongside the measured rate and request count. +// +// Authoritative peer rejections of the form "self not store-eligible" / +// "store rejected: self not store-eligible" are NOT counted as failures: +// the peer is telling us its on-chain state has changed (STORAGE_FULL or +// otherwise write-ineligible) and our local allowlist is stale. Counting +// those responses as failures would let a single recently-transitioned peer +// tank the success rate and fail the entire upload. Instead we decrement the +// request count (so the peer is treated as though it had never been in the +// candidate set) and prune it from the in-memory store allowlist so the +// same run does not re-pick it. A background refresh +// (StartAllowlistRefresher) brings the authoritative allowlist in sync with +// the chain within O(seconds). func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, id string) error { + // Best-effort debounced allowlist refresh on the write hot path. This + // is the first line of defense against a stale storeAllowlist on the + // very first upload after a chain state transition. The peer-rejection + // handling below is the second line of defense for races we miss. The + // debounce (allowlistOpportunisticMinInterval) ensures a burst of + // concurrent uploads does not fan out to one chain query per batch; + // the 30-second background StartAllowlistRefresher handles steady state. + if _, err := s.MaybeRefreshAllowlists(ctx); err != nil { + logtrace.Debug(ctx, "dht: opportunistic allowlist refresh failed", logtrace.Fields{ + logtrace.FieldModule: "dht", + "task_id": id, + logtrace.FieldError: err.Error(), + }) + } globalClosestContacts := make(map[string]*NodeList) knownNodes := make(map[string]*Node) hashes := make([][]byte, len(values)) + routingNodeCount := len(s.ht.nodes()) + candidateLimit := routingNodeCount + if candidateLimit < Alpha { + candidateLimit = Alpha + } ignoreList := s.ignorelist.ToNodeList() ignoredSet := hashedIDSetFromNodes(ignoreList) + keysWithoutCandidates := 0 { - f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": len(s.ht.nodes()), logtrace.FieldRole: "client"} + f := logtrace.Fields{logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), "len_nodes": routingNodeCount, logtrace.FieldRole: "client"} if o := logtrace.OriginFromContext(ctx); o != "" { f[logtrace.FieldOrigin] = o } @@ -2295,11 +2459,39 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i for i := 0; i < len(values); i++ { target, _ := utils.Blake3Hash(values[i]) hashes[i] = target - top6 := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(Alpha, target, ignoredSet, nil) + candidates := s.ht.closestContactsWithIncludingNodeWithIgnoredSet(candidateLimit, target, ignoredSet, nil) - globalClosestContacts[base58.Encode(target)] = top6 - // log.WithContext(ctx).WithField("top 6", top6).Info("iterate batch store begin") - s.addKnownNodes(ctx, top6.Nodes, knownNodes) + writeEligible := make([]*Node, 0, Alpha) + for _, n := range candidates.Nodes { + if s.eligibleForStore(n) { + writeEligible = append(writeEligible, n) + if len(writeEligible) >= Alpha { + break + } + } + } + if len(writeEligible) == 0 { + keysWithoutCandidates++ + } + globalClosestContacts[base58.Encode(target)] = &NodeList{Nodes: writeEligible} + // log.WithContext(ctx).WithField("top 6", candidates).Info("iterate batch store begin") + s.addKnownNodes(ctx, writeEligible, knownNodes) + } + + if keysWithoutCandidates > 0 { + logtrace.Error(ctx, "dht: batch store skipped (keys without eligible store nodes)", logtrace.Fields{ + logtrace.FieldModule: "dht", + "task_id": id, + "keys": len(values), + "keys_without_nodes": keysWithoutCandidates, + "len_nodes": routingNodeCount, + "banned_nodes": len(ignoreList), + "routing_allow_ready": s.routingAllowReady.Load(), + "routing_allow_count": s.routingAllowCount.Load(), + "store_allow_ready": s.storeAllowReady.Load(), + "store_allow_count": s.storeAllowCount.Load(), + }) + return fmt.Errorf("no eligible store peers for %d/%d keys", keysWithoutCandidates, len(values)) } storageMap := make(map[string][]int) // This will store the index of the data in the values array that needs to be stored to the node @@ -2325,10 +2517,12 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i logtrace.FieldModule: "dht", "task_id": id, "keys": len(values), - "len_nodes": len(s.ht.nodes()), + "len_nodes": routingNodeCount, "banned_nodes": len(ignoreList), "routing_allow_ready": s.routingAllowReady.Load(), "routing_allow_count": s.routingAllowCount.Load(), + "store_allow_ready": s.storeAllowReady.Load(), + "store_allow_count": s.storeAllowCount.Load(), }) return fmt.Errorf("no candidate nodes for batch store") } @@ -2360,12 +2554,34 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i if v.Status.ErrMsg != "" { errMsg = v.Status.ErrMsg } - logtrace.Error(ctx, "Batch store to node failed", logtrace.Fields{ - logtrace.FieldModule: "dht", - "err": errMsg, - "task_id": id, - "node": nodeAddr, - }) + // Authoritative "peer is not store-eligible" rejection. + // The peer's on-chain state changed (typically to + // STORAGE_FULL) and the caller's local storeAllowlist is + // stale. Do not count this against success rate; instead + // treat the candidate as if it had never been eligible + // and prune it from the in-memory allowlist so the same + // run does not re-pick it. + if isPeerStoreIneligibleError(errMsg) { + requests-- + if response.Receiver != nil { + s.pruneStoreAllowEntry(response.Receiver) + } else if response.Message != nil && response.Message.Sender != nil { + s.pruneStoreAllowEntry(response.Message.Sender) + } + logtrace.Info(ctx, "dht: peer reported not store-eligible; excluded from success-rate denominator", logtrace.Fields{ + logtrace.FieldModule: "dht", + "task_id": id, + "node": nodeAddr, + "err": errMsg, + }) + } else { + logtrace.Error(ctx, "Batch store to node failed", logtrace.Fields{ + logtrace.FieldModule: "dht", + "err": errMsg, + "task_id": id, + "node": nodeAddr, + }) + } } } } @@ -2414,6 +2630,9 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ for key, node := range nodes { logtrace.Debug(ctx, "Preparing batch store to node", logtrace.Fields{logtrace.FieldModule: "dht", "node": node.String()}) + if !s.eligibleForStore(node) { + continue + } if s.ignorelist.Banned(node) { logtrace.Debug(ctx, "Ignoring banned node in batch store network call", logtrace.Fields{ logtrace.FieldModule: "dht", diff --git a/p2p/kademlia/dht_batch_store_test.go b/p2p/kademlia/dht_batch_store_test.go index 25965b3c..72ff9e86 100644 --- a/p2p/kademlia/dht_batch_store_test.go +++ b/p2p/kademlia/dht_batch_store_test.go @@ -28,7 +28,8 @@ func TestIterateBatchStore_NoCandidateNodes_ReturnsError(t *testing.T) { if err == nil { t.Fatalf("expected error, got nil") } - if !strings.Contains(err.Error(), "no candidate nodes") { + if !strings.Contains(err.Error(), "no eligible store peers") && + !strings.Contains(err.Error(), "no candidate nodes") { t.Fatalf("unexpected error: %v", err) } } diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index 48f8ffc9..58aec7d3 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -258,6 +258,18 @@ func (s *Network) handleStoreData(ctx context.Context, message *Message) (res [] value, err := s.dht.store.Retrieve(ctx, key) if err != nil || len(value) == 0 { + // Self-state gate: non-ACTIVE supernodes (STORAGE_FULL, POSTPONED, + // etc.) must not accept new-key writes. Replication of an + // already-held key is still permitted (the Retrieve above returned + // a non-empty value, short-circuiting this branch — newKeys=0). + if s.dht.shouldRejectStore(1) { + logtrace.Warn(ctx, "rejecting STORE: self is not store-eligible", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "sender": message.Sender.String(), + "self_state": s.dht.selfState.Load(), + }) + return s.generateResponseMessage(ctx, StoreData, message.Sender, ResultFailed, "store rejected: self not store-eligible") + } // store the data to queries storage if err := s.dht.store.Store(ctx, key, request.Data, request.Type, false); err != nil { err = errors.Errorf("store the data: %w", err) @@ -1228,6 +1240,22 @@ func (s *Network) handleBatchStoreData(ctx context.Context, message *Message) (r // add the sender to queries hash table s.dht.addNode(ctx, message.Sender) + // Self-state gate: non-ACTIVE supernodes (STORAGE_FULL, POSTPONED, etc.) + // must not accept new-key batch writes. Delegates the decision to + // shouldRejectBatchStore so the STORE and BatchStore handlers share a + // single source of truth; any future refinement to shouldRejectStore + // (logging, metrics, grace periods) flows into both paths uniformly. + if reject, newKeys := s.dht.shouldRejectBatchStore(ctx, request.Data); reject { + logtrace.Warn(ctx, "rejecting BatchStore: self is not store-eligible", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "sender": message.Sender.String(), + "self_state": s.dht.selfState.Load(), + "new_keys": newKeys, + "total_keys": len(request.Data), + }) + return s.generateResponseMessage(ctx, BatchStoreData, message.Sender, ResultFailed, "batch store rejected: self not store-eligible") + } + if err := s.dht.store.StoreBatch(ctx, request.Data, 1, false); err != nil { err = errors.Errorf("batch store the data: %w", err) return s.generateResponseMessage(ctx, BatchStoreData, message.Sender, ResultFailed, err.Error()) diff --git a/p2p/kademlia/node_activity.go b/p2p/kademlia/node_activity.go index 969321b9..e8b1329f 100644 --- a/p2p/kademlia/node_activity.go +++ b/p2p/kademlia/node_activity.go @@ -51,10 +51,10 @@ func (s *DHT) checkNodeActivity(ctx context.Context) { node := s.makeNode([]byte(info.ID), info.IP, info.Port) - // Chain-state gating: do not spend cycles pinging or promoting peers that - // are not eligible to participate in routing (e.g., postponed). + // Chain-state routing gate: do not spend cycles on peers that are not + // eligible to participate in routing/read paths (e.g., disabled/stopped). // Note: eligibility changes asynchronously on the bootstrap refresh cadence; - // replication_info.Active is therefore eventually consistent with chain state. + // replication_info.Active is eventually consistent with store eligibility. if !s.eligibleForRouting(node) { if info.Active { s.removeNode(ctx, node) @@ -137,7 +137,7 @@ func (s *DHT) handlePingFailure(ctx context.Context, wasActive bool, n *Node, er } func (s *DHT) handlePingSuccess(ctx context.Context, wasActive bool, n *Node) { - // Never promote an ineligible node into routing/active replication set. + // Never keep a non-routing-eligible node in routing/replication sets. if !s.eligibleForRouting(n) { if wasActive { s.removeNode(ctx, n) @@ -155,14 +155,26 @@ func (s *DHT) handlePingSuccess(ctx context.Context, wasActive bool, n *Node) { // clear from ignorelist and ensure presence in routing s.ignorelist.Delete(n) + s.addNode(ctx, n) - if !wasActive { + // Store/replication eligibility is stricter than routing/read eligibility. + if !s.eligibleForStore(n) { + if wasActive { + if uerr := s.store.UpdateIsActive(ctx, string(n.ID), false, false); uerr != nil { + logtrace.Error(ctx, "failed to update replication info, node is inactive (store-ineligible)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: uerr.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + } + } + } else if !wasActive { logtrace.Debug(ctx, "node found to be active again", logtrace.Fields{ logtrace.FieldModule: "p2p", "ip": n.IP, "node_id": string(n.ID), }) - s.addNode(ctx, n) if uerr := s.store.UpdateIsActive(ctx, string(n.ID), true, false); uerr != nil { logtrace.Error(ctx, "failed to update replication info, node is active", logtrace.Fields{ logtrace.FieldModule: "p2p", diff --git a/p2p/kademlia/peer_store_rejection_test.go b/p2p/kademlia/peer_store_rejection_test.go new file mode 100644 index 00000000..a6382b0c --- /dev/null +++ b/p2p/kademlia/peer_store_rejection_test.go @@ -0,0 +1,133 @@ +package kademlia + +import ( + "context" + "testing" +) + +// Regression test for production-gate finding: +// The STORE / BatchStore peer self-guard correctly rejects writes when the +// peer's on-chain state is not store-eligible (e.g. STORAGE_FULL). But the +// CLIENT was counting those authoritative rejections as failures, dropping +// its measured success rate below the 75% threshold and failing entire +// cascade uploads whenever a peer had just transitioned to STORAGE_FULL and +// the caller's local storeAllowlist was still stale. +// +// Invariants locked in by this test: +// +// (a) isPeerStoreIneligibleError must recognize both the STORE and +// BatchStoreData emitter phrasings. The marker is shared via the +// package-level constant peerStoreIneligibleMarker. +// (b) isPeerStoreIneligibleError must NOT match generic store errors +// (timeout, network failure, ErrFailed without the marker). +// (c) pruneStoreAllowEntry must remove the given peer from the in-memory +// allowlist and decrement the atomic count so subsequent hot-path +// lookups in the same run do not re-select the peer. +// (d) pruneStoreAllowEntry is a no-op when the DHT is nil, the node is +// nil / has an empty ID, or the allowlist is unset. + +func TestIsPeerStoreIneligibleError(t *testing.T) { + cases := []struct { + name string + msg string + want bool + }{ + {"empty_false", "", false}, + {"generic_store_failure_false", "context deadline exceeded", false}, + {"rpc_internal_false", "rpc error: code = Internal", false}, + // Server-side emitter strings (see network.go:handleStoreData + + // handleBatchStoreData). We match on the marker substring so wrapping + // with additional context (gRPC framing, SDK wrapping) still counts. + {"batch_store_phrase_true", "batch store rejected: self not store-eligible", true}, + {"single_store_phrase_true", "store rejected: self not store-eligible", true}, + {"wrapped_true", "rpc error: code = Internal desc = batch store rejected: self not store-eligible", true}, + // Defensive: accidental typo variants must not match (forces exact marker). + {"near_miss_false", "self not store_eligible", false}, + {"case_sensitive_false", "SELF NOT STORE-ELIGIBLE", false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := isPeerStoreIneligibleError(tc.msg); got != tc.want { + t.Fatalf("isPeerStoreIneligibleError(%q) = %v, want %v", tc.msg, got, tc.want) + } + }) + } +} + +// Cross-check: the client-side recognizer never drifts from the server-side +// emitter wording. If a future refactor changes one without the other, this +// test fails loudly. +func TestPeerStoreIneligibleMarker_PresentInServerEmitters(t *testing.T) { + // These are the exact strings emitted by handleStoreData and + // handleBatchStoreData in network.go. Keeping them here makes this test + // the single contract surface between client and server. + serverEmitted := []string{ + "store rejected: self not store-eligible", + "batch store rejected: self not store-eligible", + } + for _, s := range serverEmitted { + if !isPeerStoreIneligibleError(s) { + t.Fatalf("server-emitted string %q not recognized by client-side isPeerStoreIneligibleError", s) + } + } +} + +func TestPruneStoreAllowEntry_RemovesPeerAndDecrementsCount(t *testing.T) { + ctx := context.Background() + d := &DHT{} + + // Seed 3 peers in the store allowlist via setStoreAllowlist so the + // ready flag and count are consistent with normal code paths. + peers := []*Node{ + {ID: []byte("peer-A")}, + {ID: []byte("peer-B")}, + {ID: []byte("peer-C")}, + } + allow := make(map[[32]byte]struct{}, len(peers)) + for _, p := range peers { + p.SetHashedID() + var k [32]byte + copy(k[:], p.HashedID) + allow[k] = struct{}{} + } + d.setStoreAllowlist(ctx, allow) + if got, want := d.storeAllowCount.Load(), int64(3); got != want { + t.Fatalf("seed count: got %d, want %d", got, want) + } + + // Prune peer-B. + d.pruneStoreAllowEntry(peers[1]) + if got, want := d.storeAllowCount.Load(), int64(2); got != want { + t.Fatalf("count after prune: got %d, want %d", got, want) + } + // Verify eligibleForStore honors the prune. + if d.eligibleForStore(peers[1]) { + t.Fatalf("pruned peer must NOT be store-eligible") + } + if !d.eligibleForStore(peers[0]) || !d.eligibleForStore(peers[2]) { + t.Fatalf("non-pruned peers must remain store-eligible") + } + + // Idempotent: pruning the same peer again is a no-op. + d.pruneStoreAllowEntry(peers[1]) + if got, want := d.storeAllowCount.Load(), int64(2); got != want { + t.Fatalf("count after double-prune: got %d, want %d", got, want) + } +} + +func TestPruneStoreAllowEntry_NoOpOnEdgeCases(t *testing.T) { + // Nil receiver. + var d *DHT + d.pruneStoreAllowEntry(&Node{ID: []byte("x")}) // must not panic + + d = &DHT{} + // Nil node. + d.pruneStoreAllowEntry(nil) // must not panic + // Empty ID. + d.pruneStoreAllowEntry(&Node{ID: nil}) + d.pruneStoreAllowEntry(&Node{ID: []byte{}}) + // Unset allowlist: no panic, no side-effect. + if got := d.storeAllowCount.Load(); got != 0 { + t.Fatalf("unexpected count change on no-op: %d", got) + } +} diff --git a/p2p/kademlia/prune_store_peers_test.go b/p2p/kademlia/prune_store_peers_test.go new file mode 100644 index 00000000..fc1f7baf --- /dev/null +++ b/p2p/kademlia/prune_store_peers_test.go @@ -0,0 +1,153 @@ +package kademlia + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/domain" + "github.com/LumeraProtocol/supernode/v2/pkg/utils" +) + +// fakeStore is a minimal in-memory Store focused on replication_info +// operations used by pruneIneligibleStorePeers. All other methods are no-ops. +type fakeStore struct { + mu sync.Mutex + reps map[string]*domain.NodeReplicationInfo +} + +func newFakeStore() *fakeStore { return &fakeStore{reps: map[string]*domain.NodeReplicationInfo{}} } + +func (f *fakeStore) Store(ctx context.Context, key, data []byte, typ int, isOriginal bool) error { + return nil +} +func (f *fakeStore) Retrieve(ctx context.Context, key []byte) ([]byte, error) { return nil, nil } +func (f *fakeStore) Delete(ctx context.Context, key []byte) {} +func (f *fakeStore) GetKeysForReplication(ctx context.Context, from, to time.Time, maxKeys int) domain.KeysWithTimestamp { + return nil +} +func (f *fakeStore) Stats(ctx context.Context) (DatabaseStats, error) { return DatabaseStats{}, nil } +func (f *fakeStore) Close(ctx context.Context) {} +func (f *fakeStore) Count(ctx context.Context) (int, error) { return 0, nil } +func (f *fakeStore) DeleteAll(ctx context.Context) error { return nil } +func (f *fakeStore) UpdateKeyReplication(ctx context.Context, key []byte) error { + return nil +} +func (f *fakeStore) StoreBatch(ctx context.Context, values [][]byte, typ int, isOriginal bool) error { + return nil +} +func (f *fakeStore) GetAllReplicationInfo(ctx context.Context) ([]domain.NodeReplicationInfo, error) { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]domain.NodeReplicationInfo, 0, len(f.reps)) + for _, r := range f.reps { + out = append(out, *r) + } + return out, nil +} +func (f *fakeStore) UpdateReplicationInfo(ctx context.Context, rep domain.NodeReplicationInfo) error { + f.mu.Lock() + defer f.mu.Unlock() + cp := rep + f.reps[string(rep.ID)] = &cp + return nil +} +func (f *fakeStore) AddReplicationInfo(ctx context.Context, rep domain.NodeReplicationInfo) error { + return f.UpdateReplicationInfo(ctx, rep) +} +func (f *fakeStore) GetOwnCreatedAt(ctx context.Context) (time.Time, error) { + return time.Time{}, nil +} +func (f *fakeStore) StoreBatchRepKeys(values []string, id, ip string, port uint16) error { return nil } +func (f *fakeStore) GetAllToDoRepKeys(minA, maxA int) (domain.ToRepKeys, error) { return nil, nil } +func (f *fakeStore) DeleteRepKey(key string) error { return nil } +func (f *fakeStore) UpdateLastSeen(ctx context.Context, id string) error { return nil } +func (f *fakeStore) RetrieveBatchNotExist(ctx context.Context, keys []string, batchSize int) ([]string, error) { + return nil, nil +} +func (f *fakeStore) RetrieveBatchValues(ctx context.Context, keys []string, getFromCloud bool) ([][]byte, int, error) { + return nil, 0, nil +} +func (f *fakeStore) BatchDeleteRepKeys(keys []string) error { return nil } +func (f *fakeStore) IncrementAttempts(keys []string) error { return nil } +func (f *fakeStore) UpdateIsActive(ctx context.Context, id string, isActive, isAdjusted bool) error { + f.mu.Lock() + defer f.mu.Unlock() + if r, ok := f.reps[id]; ok { + r.Active = isActive + r.IsAdjusted = isAdjusted + } + return nil +} +func (f *fakeStore) UpdateIsAdjusted(ctx context.Context, id string, isAdjusted bool) error { + return nil +} +func (f *fakeStore) UpdateLastReplicated(ctx context.Context, id string, t time.Time) error { + return nil +} +func (f *fakeStore) RecordExists(nodeID string) (bool, error) { return false, nil } +func (f *fakeStore) GetLocalKeys(from, to time.Time) ([]string, error) { return nil, nil } +func (f *fakeStore) BatchDeleteRecords(keys []string) error { return nil } + +// compile-time check +var _ Store = (*fakeStore)(nil) + +// I5: eager prune flips replication_info.Active=false for peers that are +// routing-eligible (e.g. STORAGE_FULL) but not store-eligible. +func TestPruneIneligibleStorePeers_ClearsNonStorePeers(t *testing.T) { + ctx := context.Background() + store := newFakeStore() + + activeID := []byte("active-peer-id") + storageFullID := []byte("storage-full-peer-id") + now := time.Now() + for _, id := range [][]byte{activeID, storageFullID} { + _ = store.AddReplicationInfo(ctx, domain.NodeReplicationInfo{ + ID: id, IP: "127.0.0.1", Port: 4445, Active: true, + UpdatedAt: now, CreatedAt: now, + }) + } + + activeHash, _ := utils.Blake3Hash(activeID) + var ak [32]byte + copy(ak[:], activeHash) + + d := &DHT{store: store} + d.setStoreAllowlist(ctx, map[[32]byte]struct{}{ak: {}}) + + d.pruneIneligibleStorePeers(ctx) + + infos, _ := store.GetAllReplicationInfo(ctx) + byID := map[string]domain.NodeReplicationInfo{} + for _, i := range infos { + byID[string(i.ID)] = i + } + if got := byID[string(activeID)]; !got.Active { + t.Errorf("ACTIVE peer: Active=false; want true") + } + if got := byID[string(storageFullID)]; got.Active { + t.Errorf("STORAGE_FULL peer: Active=true; want false (eagerly pruned)") + } +} + +// I5 boundary: not-ready store allowlist must not flip anything. +func TestPruneIneligibleStorePeers_SkipsWhenNotReady(t *testing.T) { + ctx := context.Background() + store := newFakeStore() + id := []byte("some-peer") + now := time.Now() + _ = store.AddReplicationInfo(ctx, domain.NodeReplicationInfo{ + ID: id, IP: "127.0.0.1", Port: 4445, Active: true, + UpdatedAt: now, CreatedAt: now, + }) + + d := &DHT{store: store} + // No setStoreAllowlist => not ready. + d.pruneIneligibleStorePeers(ctx) + + infos, _ := store.GetAllReplicationInfo(ctx) + if len(infos) != 1 || !infos[0].Active { + t.Fatalf("expected no change when not-ready; got %+v", infos) + } +} diff --git a/p2p/kademlia/review_fixes_test.go b/p2p/kademlia/review_fixes_test.go new file mode 100644 index 00000000..252bd146 --- /dev/null +++ b/p2p/kademlia/review_fixes_test.go @@ -0,0 +1,194 @@ +package kademlia + +import ( + "context" + "testing" + "time" +) + +// Regression tests for three issues raised on PR #284 post-merge review: +// +// (1) setStoreAllowlist must retain the previous allowlist when given an +// empty map post-ready (symmetric with setRoutingAllowlist). +// (2) eligibleForStore must early-return false on nil node before +// consulting storeAllowReady (defense-in-depth consistency with +// eligibleForRouting). +// (3) MaybeRefreshAllowlists must debounce opportunistic refreshes from +// the IterateBatchStore hot path so concurrent uploads cannot fan +// out to one chain query per batch. +// +// shouldRejectBatchStore is covered indirectly via shouldRejectStore + the +// existing STORE-path tests; here we add a direct test for the short-circuit +// and the new-key counting path. + +// --- (1) setStoreAllowlist empty-map guard --- + +func TestSetStoreAllowlist_EmptyMapPostReady_RetainsPrevious(t *testing.T) { + ctx := context.Background() + d := &DHT{} + + // Seed a non-empty allowlist so ready=true, count=2. + seed := func(ids ...string) map[[32]byte]struct{} { + m := make(map[[32]byte]struct{}, len(ids)) + for _, id := range ids { + n := &Node{ID: []byte(id)} + n.SetHashedID() + var k [32]byte + copy(k[:], n.HashedID) + m[k] = struct{}{} + } + return m + } + d.setStoreAllowlist(ctx, seed("peer-A", "peer-B")) + if got, want := d.storeAllowCount.Load(), int64(2); got != want { + t.Fatalf("seed count: got %d, want %d", got, want) + } + if !d.storeAllowReady.Load() { + t.Fatalf("seed should mark ready") + } + + // Simulate a transient chain hiccup: empty map. Post-ready this MUST + // be rejected (retaining the previous allowlist) to avoid blocking + // all writes network-wide. + d.setStoreAllowlist(ctx, map[[32]byte]struct{}{}) + + if got, want := d.storeAllowCount.Load(), int64(2); got != want { + t.Fatalf("after empty update post-ready: count got %d, want %d (previous allowlist must be retained)", got, want) + } + // And the seeded peers must still be eligible. + a := &Node{ID: []byte("peer-A")} + if !d.eligibleForStore(a) { + t.Fatalf("peer-A must remain store-eligible after empty-map update was rejected") + } +} + +func TestSetStoreAllowlist_EmptyMapPreReady_NoOp(t *testing.T) { + ctx := context.Background() + d := &DHT{} + // Pre-ready empty update must not flip ready=true with count=0; that + // would immediately fail eligibleForStore for every peer during bootstrap. + d.setStoreAllowlist(ctx, map[[32]byte]struct{}{}) + if d.storeAllowReady.Load() { + t.Fatalf("pre-ready empty update must not flip ready=true") + } + if d.storeAllowCount.Load() != 0 { + t.Fatalf("pre-ready empty update must not set a non-zero count") + } +} + +// --- (2) eligibleForStore nil-check ordering --- + +func TestEligibleForStore_NilNode_ReturnsFalseEvenPreReady(t *testing.T) { + // Pre-ready the old code returned true for ANY input (including nil), + // matching the "don't lock out during bootstrap" intent but creating a + // nil-pointer risk downstream. The fix moves the nil check to the top + // so a nil *Node is always rejected regardless of ready state. Mirrors + // eligibleForRouting's ordering. + d := &DHT{} + if d.eligibleForStore(nil) { + t.Fatalf("eligibleForStore(nil) must be false pre-ready") + } + if d.eligibleForStore(&Node{ID: nil}) { + t.Fatalf("eligibleForStore(empty-ID) must be false pre-ready") + } + if d.eligibleForStore(&Node{ID: []byte{}}) { + t.Fatalf("eligibleForStore(empty-slice-ID) must be false pre-ready") + } + + // Non-nil real node pre-ready must still be eligible (bootstrap permissiveness). + n := &Node{ID: []byte("peer-real")} + if !d.eligibleForStore(n) { + t.Fatalf("eligibleForStore(valid node) must be true pre-ready (bootstrap permissive)") + } +} + +// --- (3) MaybeRefreshAllowlists debounce --- + +func TestMaybeRefreshAllowlists_Debounces(t *testing.T) { + ctx := context.Background() + d := &DHT{} + // integrationTestEnabled() returns true under INTEGRATION_TEST; assume + // it is not set. If your local env sets it, skip. + if integrationTestEnabled() { + t.Skip("INTEGRATION_TEST set; MaybeRefreshAllowlists is a no-op in that mode") + } + + // Simulate a very recent successful refresh. + d.lastAllowlistRefreshUnixNano.Store(time.Now().UnixNano()) + + // Within the debounce window this MUST skip (no chain RPC). + // Because d.options.LumeraClient is nil, RefreshAllowlistsFromChain + // would return nil (guard at the top); but the debounce is supposed + // to short-circuit BEFORE reaching any of that logic. We assert by + // checking the return value (refreshed=false => skipped). + refreshed, err := d.MaybeRefreshAllowlists(ctx) + if err != nil { + t.Fatalf("unexpected err from skipped call: %v", err) + } + if refreshed { + t.Fatalf("MaybeRefreshAllowlists must skip within the debounce window") + } +} + +func TestMaybeRefreshAllowlists_FiresWhenStale(t *testing.T) { + ctx := context.Background() + d := &DHT{} + if integrationTestEnabled() { + t.Skip("INTEGRATION_TEST set; MaybeRefreshAllowlists is a no-op in that mode") + } + + // No prior refresh (last == 0). MaybeRefreshAllowlists must attempt a + // refresh. Because LumeraClient is nil, RefreshAllowlistsFromChain + // returns nil silently and stamps the timestamp — so refreshed=true. + refreshed, err := d.MaybeRefreshAllowlists(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !refreshed { + t.Fatalf("MaybeRefreshAllowlists must fire when no prior refresh has been recorded") + } + if d.lastAllowlistRefreshUnixNano.Load() == 0 { + t.Fatalf("lastAllowlistRefreshUnixNano must be stamped after a successful refresh attempt") + } + + // Now stamp the prior refresh outside the debounce window (e.g. 60s ago). + d.lastAllowlistRefreshUnixNano.Store(time.Now().Add(-60 * time.Second).UnixNano()) + refreshed, err = d.MaybeRefreshAllowlists(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !refreshed { + t.Fatalf("MaybeRefreshAllowlists must fire when the last refresh is older than allowlistOpportunisticMinInterval") + } +} + +// --- (helper) shouldRejectBatchStore contract --- +// +// We can't easily unit-test the full shouldRejectBatchStore because it touches +// a storage backend. Instead we verify the short-circuit: when self is +// store-eligible, the function returns (false, 0) WITHOUT touching storage. +// We assert "without touching storage" by passing a nil store and confirming +// no panic. + +func TestShouldRejectBatchStore_ShortCircuitsOnSelfEligible(t *testing.T) { + d := &DHT{} + // No selfState set => pre-init => selfStoreEligible()==true => short-circuit. + // Pass arbitrary payload; if the function reached storage, we'd panic + // on nil d.store. The short-circuit must bypass the loop entirely. + reject, newKeys := d.shouldRejectBatchStore(context.Background(), [][]byte{[]byte("x"), []byte("y")}) + if reject { + t.Fatalf("must not reject when self is store-eligible") + } + if newKeys != 0 { + t.Fatalf("short-circuit must return newKeys=0 without counting; got %d", newKeys) + } +} + +// Sanity: defensive nil receiver. +func TestShouldRejectBatchStore_NilReceiver(t *testing.T) { + var d *DHT + reject, n := d.shouldRejectBatchStore(context.Background(), [][]byte{[]byte("x")}) + if reject || n != 0 { + t.Fatalf("nil receiver must return (false, 0); got (%v, %d)", reject, n) + } +} diff --git a/p2p/kademlia/supernode_state.go b/p2p/kademlia/supernode_state.go new file mode 100644 index 00000000..c31ab85f --- /dev/null +++ b/p2p/kademlia/supernode_state.go @@ -0,0 +1,237 @@ +package kademlia + +import ( + "context" + "strings" + + sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types" + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + "github.com/LumeraProtocol/supernode/v2/pkg/utils" +) + +// Supernode p2p eligibility policy. +// +// Chain is the single source of truth for state values; we re-use +// sntypes.SuperNodeState ordinals to avoid drift. +// +// Two orthogonal gates govern p2p behavior: +// +// - Routing / read eligibility: peer may sit in the routing table and +// answer FIND_NODE, FIND_VALUE, BatchRetrieve. +// - Store / write eligibility: peer may receive STORE / batch-store +// and be targeted by replication pushes. +// +// Post-LEP-5 / Everlight (lumera #113) policy: +// +// routing = {ACTIVE, POSTPONED, STORAGE_FULL} +// store = {ACTIVE} +// +// POSTPONED: probation; still holds data, must keep serving reads, no writes. +// STORAGE_FULL: disk-full; still holds data, still payout-eligible, no writes. +// +// All other states (UNSPECIFIED, DISABLED, STOPPED, PENALIZED) are excluded +// from both gates. + +// snStateInt normalizes a chain SuperNodeState to int32 for comparison. +func snStateInt(s sntypes.SuperNodeState) int32 { return int32(s) } + +// isRoutingEligibleState reports whether the given chain supernode state is +// eligible for participation in routing/read paths. +func isRoutingEligibleState(s int32) bool { + return s == snStateInt(sntypes.SuperNodeStateActive) || + s == snStateInt(sntypes.SuperNodeStatePostponed) || + s == snStateInt(sntypes.SuperNodeStateStorageFull) +} + +// isStoreEligibleState reports whether the given chain supernode state is +// eligible for write/replication targeting. +func isStoreEligibleState(s int32) bool { + return s == snStateInt(sntypes.SuperNodeStateActive) +} + +// shouldRejectStore is the single source of truth for the STORE / STORE_BATCH +// self-guard decision. Keeping the logic here rather than inlined at RPC call +// sites means future refinements (logging, metrics, grace periods) propagate +// to every write-path handler uniformly. +// +// newKeys > 0 && !selfStoreEligible() => reject +// +// When newKeys == 0 (all keys already held) replication is always allowed. +// When self is store-eligible (ACTIVE, or pre-bootstrap), always allow. +func (s *DHT) shouldRejectStore(newKeys int) bool { + if s == nil { + return false + } + if newKeys <= 0 { + return false + } + return !s.selfStoreEligible() +} + +// shouldRejectBatchStore is the batch-sized companion to shouldRejectStore. +// It implements the exact same semantic (reject iff any genuinely-new key is +// present AND self is not store-eligible) but preserves the short-circuit on +// the happy path: when self is store-eligible we skip the O(len(payloads)) +// storage retrievals entirely. +// +// On return `ok=true` the caller must accept the batch; on `ok=false` the +// caller must respond with ResultFailed ("batch store rejected: self not +// store-eligible"). The returned newKeys is the genuinely-new key count +// (0 if we short-circuited), useful for structured logging at the call site. +// +// This helper exists so that handleBatchStoreData does not reimplement the +// shouldRejectStore contract inline. A future change to shouldRejectStore +// (e.g. adding a grace period or metrics) flows into this function via the +// shared semantics and both RPC handlers stay aligned. +func (s *DHT) shouldRejectBatchStore(ctx context.Context, payloads [][]byte) (reject bool, newKeys int) { + if s == nil { + return false, 0 + } + // Happy path: self is store-eligible => accept without touching storage. + // Preserves the prior behavior's performance on the 99% case. + if s.selfStoreEligible() { + return false, 0 + } + // Self is not store-eligible. Count genuinely new keys; an all-replication + // batch (newKeys == 0) is still permitted so availability is preserved + // while self is in STORAGE_FULL / POSTPONED. + for _, data := range payloads { + k, _ := utils.Blake3Hash(data) + existing, rErr := s.store.Retrieve(ctx, k) + if rErr != nil || len(existing) == 0 { + newKeys++ + } + } + return s.shouldRejectStore(newKeys), newKeys +} + +// setSelfState caches the latest known chain state for this node. Safe for +// concurrent callers. Called by the bootstrap refresher. +func (s *DHT) setSelfState(state int32) { + if s == nil { + return + } + s.selfState.Store(state) + s.selfStateReady.Store(true) +} + +// selfStoreEligible reports whether this node is currently permitted to accept +// new-key STORE writes. Returns true when self-state is unknown (pre-bootstrap) +// to avoid lockout; returns true in integration-test mode. +func (s *DHT) selfStoreEligible() bool { + if s == nil { + return false + } + if integrationTestEnabled() { + return true + } + if !s.selfStateReady.Load() { + return true + } + return isStoreEligibleState(s.selfState.Load()) +} + +// pruneIneligibleStorePeers clears replication_info.Active for peers no longer +// in the store allowlist. Keeps the replication worker from pushing writes to +// STORAGE_FULL / POSTPONED / evicted peers between ping cycles. +// +// Integration tests bypass this via integrationTestEnabled() check in the +// allowlist setter (empty allowlist => ready=false => no-op here). +func (s *DHT) pruneIneligibleStorePeers(ctx context.Context) { + if s == nil || s.store == nil { + return + } + if !s.storeAllowReady.Load() { + return + } + infos, err := s.store.GetAllReplicationInfo(ctx) + if err != nil { + logtrace.Warn(ctx, "pruneIneligibleStorePeers: list replication info failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) + return + } + cleared := 0 + for _, info := range infos { + if !info.Active { + continue + } + // Derive the same key used by the allowlist. + node := &Node{ID: info.ID} + node.SetHashedID() + if len(node.HashedID) != 32 { + continue + } + var key [32]byte + copy(key[:], node.HashedID) + + s.storeAllowMu.RLock() + _, ok := s.storeAllow[key] + s.storeAllowMu.RUnlock() + if ok { + continue + } + if uerr := s.store.UpdateIsActive(ctx, string(info.ID), false, false); uerr != nil { + logtrace.Warn(ctx, "pruneIneligibleStorePeers: UpdateIsActive failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: uerr.Error(), + "node_id": string(info.ID), + }) + continue + } + cleared++ + } + if cleared > 0 { + logtrace.Info(ctx, "pruneIneligibleStorePeers: cleared replication_info.Active for ineligible peers", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "cleared": cleared, + }) + } +} + +// peerStoreIneligibleMarker is the unique substring emitted by both +// handleStoreData and handleBatchStoreData when the receiving node rejects a +// STORE / BatchStore RPC because its own on-chain state is not store-eligible. +// Kept as a constant so the client-side recognizer cannot drift from the +// server-side emitter without breaking the invariant test +// TestIsPeerStoreIneligibleError. +const peerStoreIneligibleMarker = "self not store-eligible" + +// isPeerStoreIneligibleError reports whether the given error message came +// from a peer's STORE / BatchStore self-guard. Used on the client side to +// distinguish authoritative "peer is not write-eligible" signals from real +// failures; the former must not be counted against the store success rate. +func isPeerStoreIneligibleError(msg string) bool { + if msg == "" { + return false + } + return strings.Contains(msg, peerStoreIneligibleMarker) +} + +// pruneStoreAllowEntry removes the given peer from the in-memory store +// allowlist. Called when a peer's response indicates its on-chain state has +// changed and the local allowlist is stale, so that subsequent hot-path +// lookups in the same run do not re-select the peer while the next +// RefreshAllowlistsFromChain converges. +func (s *DHT) pruneStoreAllowEntry(n *Node) { + if s == nil || n == nil || len(n.ID) == 0 { + return + } + n.SetHashedID() + if len(n.HashedID) != 32 { + return + } + var key [32]byte + copy(key[:], n.HashedID) + + s.storeAllowMu.Lock() + defer s.storeAllowMu.Unlock() + if s.storeAllow == nil { + return + } + if _, ok := s.storeAllow[key]; ok { + delete(s.storeAllow, key) + s.storeAllowCount.Store(int64(len(s.storeAllow))) + } +} diff --git a/p2p/kademlia/supernode_state_test.go b/p2p/kademlia/supernode_state_test.go new file mode 100644 index 00000000..672bf5f3 --- /dev/null +++ b/p2p/kademlia/supernode_state_test.go @@ -0,0 +1,164 @@ +package kademlia + +import ( + "context" + "testing" + + sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types" +) + +// I1 + I2 + I6: state classification SSoT. +func TestStateClassification_Table(t *testing.T) { + cases := []struct { + state sntypes.SuperNodeState + routingEligible bool + storeEligible bool + }{ + {sntypes.SuperNodeStateUnspecified, false, false}, + {sntypes.SuperNodeStateActive, true, true}, + {sntypes.SuperNodeStateDisabled, false, false}, + {sntypes.SuperNodeStateStopped, false, false}, + {sntypes.SuperNodeStatePenalized, false, false}, + {sntypes.SuperNodeStatePostponed, true, false}, + {sntypes.SuperNodeStateStorageFull, true, false}, + } + for _, tc := range cases { + s := snStateInt(tc.state) + if got := isRoutingEligibleState(s); got != tc.routingEligible { + t.Errorf("isRoutingEligibleState(%s) = %v, want %v", tc.state, got, tc.routingEligible) + } + if got := isStoreEligibleState(s); got != tc.storeEligible { + t.Errorf("isStoreEligibleState(%s) = %v, want %v", tc.state, got, tc.storeEligible) + } + } +} + +// I9: self-store-eligibility gate. Verify allow-during-bootstrap, integration +// test bypass, and post-init gate behavior. +func TestSelfStoreEligible(t *testing.T) { + t.Run("pre-init_allows_to_avoid_lockout", func(t *testing.T) { + d := &DHT{} + if !d.selfStoreEligible() { + t.Fatalf("pre-init should be permissive") + } + }) + + t.Run("post-init_active_allows", func(t *testing.T) { + d := &DHT{} + d.setSelfState(snStateInt(sntypes.SuperNodeStateActive)) + if !d.selfStoreEligible() { + t.Fatalf("ACTIVE must be store-eligible") + } + }) + + t.Run("post-init_storage_full_rejects", func(t *testing.T) { + d := &DHT{} + d.setSelfState(snStateInt(sntypes.SuperNodeStateStorageFull)) + if d.selfStoreEligible() { + t.Fatalf("STORAGE_FULL must NOT be store-eligible") + } + }) + + t.Run("post-init_postponed_rejects", func(t *testing.T) { + d := &DHT{} + d.setSelfState(snStateInt(sntypes.SuperNodeStatePostponed)) + if d.selfStoreEligible() { + t.Fatalf("POSTPONED must NOT be store-eligible") + } + }) +} + +// I3/I9: shouldRejectStore contract. +func TestShouldRejectStore(t *testing.T) { + cases := []struct { + name string + selfState *sntypes.SuperNodeState // nil = pre-init + newKeys int + wantReject bool + }{ + {"pre-init_any_allows", nil, 10, false}, + {"active_with_new_keys_allows", ptrState(sntypes.SuperNodeStateActive), 5, false}, + {"active_zero_new_keys_allows", ptrState(sntypes.SuperNodeStateActive), 0, false}, + {"storage_full_zero_new_keys_allows_replication", ptrState(sntypes.SuperNodeStateStorageFull), 0, false}, + {"storage_full_one_new_key_rejects", ptrState(sntypes.SuperNodeStateStorageFull), 1, true}, + {"postponed_new_keys_rejects", ptrState(sntypes.SuperNodeStatePostponed), 7, true}, + {"disabled_new_keys_rejects", ptrState(sntypes.SuperNodeStateDisabled), 1, true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + d := &DHT{} + if tc.selfState != nil { + d.setSelfState(snStateInt(*tc.selfState)) + } + if got := d.shouldRejectStore(tc.newKeys); got != tc.wantReject { + t.Fatalf("shouldRejectStore = %v, want %v", got, tc.wantReject) + } + }) + } +} + +func ptrState(s sntypes.SuperNodeState) *sntypes.SuperNodeState { return &s } + +// I9 server-ingress semantic: STORAGE_FULL/POSTPONED STILL serves reads — +// verified by the fact that read-path gates only consult eligibleForRouting +// (see TestEligibleForRouting_PreInit_AndPopulated + the stricter-containment +// test), and shouldRejectStore only fires on newKeys>0. +func TestEligibleForRouting_PreInit_AndPopulated(t *testing.T) { + ctx := context.Background() + + // Pre-init: routing gate disabled, everything eligible to avoid lockout. + d := &DHT{} + n := &Node{ID: []byte("peerA")} + if !d.eligibleForRouting(n) { + t.Fatalf("pre-init eligibleForRouting should return true") + } + + // Populate an empty allowlist post-init: no-op (ready stays false). + d.setRoutingAllowlist(ctx, map[[32]byte]struct{}{}) + if !d.eligibleForRouting(n) { + t.Fatalf("empty allowlist pre-ready should still allow") + } + + // Populate with one peer's hash. + n.SetHashedID() + var key [32]byte + copy(key[:], n.HashedID) + d.setRoutingAllowlist(ctx, map[[32]byte]struct{}{key: {}}) + if !d.eligibleForRouting(n) { + t.Fatalf("peer in allowlist must be eligible") + } + + // Peer not in allowlist is rejected. + other := &Node{ID: []byte("peerB")} + if d.eligibleForRouting(other) { + t.Fatalf("peer not in allowlist must be rejected") + } +} + +// I2 admission: store allowlist is stricter than routing. +func TestEligibleForStore_StrictlyContainedInRouting(t *testing.T) { + ctx := context.Background() + d := &DHT{} + + active := &Node{ID: []byte("active")} + active.SetHashedID() + storageFull := &Node{ID: []byte("storage_full")} + storageFull.SetHashedID() + + var ak, sk [32]byte + copy(ak[:], active.HashedID) + copy(sk[:], storageFull.HashedID) + + d.setRoutingAllowlist(ctx, map[[32]byte]struct{}{ak: {}, sk: {}}) + d.setStoreAllowlist(ctx, map[[32]byte]struct{}{ak: {}}) // STORAGE_FULL omitted + + if !d.eligibleForRouting(active) || !d.eligibleForRouting(storageFull) { + t.Fatalf("both peers must be routing-eligible") + } + if !d.eligibleForStore(active) { + t.Fatalf("ACTIVE peer must be store-eligible") + } + if d.eligibleForStore(storageFull) { + t.Fatalf("STORAGE_FULL peer must NOT be store-eligible") + } +} diff --git a/pkg/lumera/modules/supernode/impl.go b/pkg/lumera/modules/supernode/impl.go index 93e2d7e0..8bd88408 100644 --- a/pkg/lumera/modules/supernode/impl.go +++ b/pkg/lumera/modules/supernode/impl.go @@ -6,6 +6,7 @@ import ( "github.com/LumeraProtocol/lumera/x/supernode/v1/types" "github.com/LumeraProtocol/supernode/v2/pkg/errors" + "github.com/cosmos/cosmos-sdk/types/query" "google.golang.org/grpc" ) @@ -145,3 +146,30 @@ func (m *module) ListSuperNodes(ctx context.Context) (*types.QueryListSuperNodes } return resp, nil } + +// GetPoolState returns supernode reward pool state. +func (m *module) GetPoolState(ctx context.Context) (*types.QueryPoolStateResponse, error) { + resp, err := m.client.PoolState(ctx, &types.QueryPoolStateRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to get pool state: %w", err) + } + return resp, nil +} + +// GetSNEligibility returns payout eligibility for validator. +func (m *module) GetSNEligibility(ctx context.Context, validatorAddress string) (*types.QuerySNEligibilityResponse, error) { + resp, err := m.client.SNEligibility(ctx, &types.QuerySNEligibilityRequest{ValidatorAddress: validatorAddress}) + if err != nil { + return nil, fmt.Errorf("failed to get supernode eligibility: %w", err) + } + return resp, nil +} + +// GetPayoutHistory returns payout history for validator. +func (m *module) GetPayoutHistory(ctx context.Context, validatorAddress string, pagination *query.PageRequest) (*types.QueryPayoutHistoryResponse, error) { + resp, err := m.client.PayoutHistory(ctx, &types.QueryPayoutHistoryRequest{ValidatorAddress: validatorAddress, Pagination: pagination}) + if err != nil { + return nil, fmt.Errorf("failed to get payout history: %w", err) + } + return resp, nil +} diff --git a/pkg/lumera/modules/supernode/interface.go b/pkg/lumera/modules/supernode/interface.go index acdfea0b..e195f906 100644 --- a/pkg/lumera/modules/supernode/interface.go +++ b/pkg/lumera/modules/supernode/interface.go @@ -5,6 +5,7 @@ import ( "context" "github.com/LumeraProtocol/lumera/x/supernode/v1/types" + "github.com/cosmos/cosmos-sdk/types/query" "google.golang.org/grpc" ) @@ -25,6 +26,9 @@ type Module interface { GetSupernodeWithLatestAddress(ctx context.Context, address string) (*SuperNodeInfo, error) GetParams(ctx context.Context) (*types.QueryParamsResponse, error) ListSuperNodes(ctx context.Context) (*types.QueryListSuperNodesResponse, error) + GetPoolState(ctx context.Context) (*types.QueryPoolStateResponse, error) + GetSNEligibility(ctx context.Context, validatorAddress string) (*types.QuerySNEligibilityResponse, error) + GetPayoutHistory(ctx context.Context, validatorAddress string, pagination *query.PageRequest) (*types.QueryPayoutHistoryResponse, error) } // NewModule creates a new SuperNode module client diff --git a/pkg/lumera/modules/supernode/supernode_mock.go b/pkg/lumera/modules/supernode/supernode_mock.go index 9b3ea06e..cb55c4ed 100644 --- a/pkg/lumera/modules/supernode/supernode_mock.go +++ b/pkg/lumera/modules/supernode/supernode_mock.go @@ -14,6 +14,7 @@ import ( reflect "reflect" types "github.com/LumeraProtocol/lumera/x/supernode/v1/types" + query "github.com/cosmos/cosmos-sdk/types/query" gomock "go.uber.org/mock/gomock" ) @@ -56,6 +57,51 @@ func (mr *MockModuleMockRecorder) GetParams(ctx any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetParams", reflect.TypeOf((*MockModule)(nil).GetParams), ctx) } +// GetPayoutHistory mocks base method. +func (m *MockModule) GetPayoutHistory(ctx context.Context, validatorAddress string, pagination *query.PageRequest) (*types.QueryPayoutHistoryResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPayoutHistory", ctx, validatorAddress, pagination) + ret0, _ := ret[0].(*types.QueryPayoutHistoryResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPayoutHistory indicates an expected call of GetPayoutHistory. +func (mr *MockModuleMockRecorder) GetPayoutHistory(ctx, validatorAddress, pagination any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPayoutHistory", reflect.TypeOf((*MockModule)(nil).GetPayoutHistory), ctx, validatorAddress, pagination) +} + +// GetPoolState mocks base method. +func (m *MockModule) GetPoolState(ctx context.Context) (*types.QueryPoolStateResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPoolState", ctx) + ret0, _ := ret[0].(*types.QueryPoolStateResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPoolState indicates an expected call of GetPoolState. +func (mr *MockModuleMockRecorder) GetPoolState(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPoolState", reflect.TypeOf((*MockModule)(nil).GetPoolState), ctx) +} + +// GetSNEligibility mocks base method. +func (m *MockModule) GetSNEligibility(ctx context.Context, validatorAddress string) (*types.QuerySNEligibilityResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSNEligibility", ctx, validatorAddress) + ret0, _ := ret[0].(*types.QuerySNEligibilityResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSNEligibility indicates an expected call of GetSNEligibility. +func (mr *MockModuleMockRecorder) GetSNEligibility(ctx, validatorAddress any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSNEligibility", reflect.TypeOf((*MockModule)(nil).GetSNEligibility), ctx, validatorAddress) +} + // GetSuperNode mocks base method. func (m *MockModule) GetSuperNode(ctx context.Context, address string) (*types.QueryGetSuperNodeResponse, error) { m.ctrl.T.Helper() diff --git a/pkg/testutil/lumera.go b/pkg/testutil/lumera.go index c6eecb6b..e6825c7f 100644 --- a/pkg/testutil/lumera.go +++ b/pkg/testutil/lumera.go @@ -22,6 +22,7 @@ import ( cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdktypes "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/query" sdktx "github.com/cosmos/cosmos-sdk/types/tx" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" @@ -267,6 +268,18 @@ func (m *MockSupernodeModule) ListSuperNodes(ctx context.Context) (*supernodeTyp return &supernodeTypes.QueryListSuperNodesResponse{}, nil } +func (m *MockSupernodeModule) GetPoolState(ctx context.Context) (*supernodeTypes.QueryPoolStateResponse, error) { + return &supernodeTypes.QueryPoolStateResponse{}, nil +} + +func (m *MockSupernodeModule) GetSNEligibility(ctx context.Context, validatorAddress string) (*supernodeTypes.QuerySNEligibilityResponse, error) { + return &supernodeTypes.QuerySNEligibilityResponse{Eligible: true, Reason: "mock"}, nil +} + +func (m *MockSupernodeModule) GetPayoutHistory(ctx context.Context, validatorAddress string, pagination *query.PageRequest) (*supernodeTypes.QueryPayoutHistoryResponse, error) { + return &supernodeTypes.QueryPayoutHistoryResponse{}, nil +} + // ReportMetrics mocks broadcasting a metrics report transaction. func (m *MockSupernodeMsgModule) ReportMetrics(ctx context.Context, identity string, metrics supernodeTypes.SupernodeMetrics) (*sdktx.BroadcastTxResponse, error) { return &sdktx.BroadcastTxResponse{}, nil diff --git a/sdk/adapters/lumera/types.go b/sdk/adapters/lumera/types.go index c560d3fc..a631b7db 100644 --- a/sdk/adapters/lumera/types.go +++ b/sdk/adapters/lumera/types.go @@ -16,12 +16,13 @@ const ( type SUPERNODE_STATE string const ( - SUPERNODE_STATE_UNSPECIFIED SUPERNODE_STATE = "SUPERNODE_STATE_UNSPECIFIED" - SUPERNODE_STATE_ACTIVE SUPERNODE_STATE = "SUPERNODE_STATE_ACTIVE" - SUPERNODE_STATE_DISABLED SUPERNODE_STATE = "SUPERNODE_STATE_DISABLED" - SUPERNODE_STATE_STOPPED SUPERNODE_STATE = "SUPERNODE_STATE_STOPPED" - SUPERNODE_STATE_PENALIZED SUPERNODE_STATE = "SUPERNODE_STATE_PENALIZED" - SUPERNODE_STATE_POSTPONED SUPERNODE_STATE = "SUPERNODE_STATE_POSTPONED" + SUPERNODE_STATE_UNSPECIFIED SUPERNODE_STATE = "SUPERNODE_STATE_UNSPECIFIED" + SUPERNODE_STATE_ACTIVE SUPERNODE_STATE = "SUPERNODE_STATE_ACTIVE" + SUPERNODE_STATE_DISABLED SUPERNODE_STATE = "SUPERNODE_STATE_DISABLED" + SUPERNODE_STATE_STOPPED SUPERNODE_STATE = "SUPERNODE_STATE_STOPPED" + SUPERNODE_STATE_PENALIZED SUPERNODE_STATE = "SUPERNODE_STATE_PENALIZED" + SUPERNODE_STATE_POSTPONED SUPERNODE_STATE = "SUPERNODE_STATE_POSTPONED" + SUPERNODE_STATE_STORAGE_FULL SUPERNODE_STATE = "SUPERNODE_STATE_STORAGE_FULL" ) // Action represents an action registered on the Lumera blockchain @@ -65,7 +66,8 @@ func ParseSupernodeState(state string) SUPERNODE_STATE { SUPERNODE_STATE_DISABLED, SUPERNODE_STATE_STOPPED, SUPERNODE_STATE_PENALIZED, - SUPERNODE_STATE_POSTPONED: + SUPERNODE_STATE_POSTPONED, + SUPERNODE_STATE_STORAGE_FULL: return SUPERNODE_STATE(state) default: return SUPERNODE_STATE_UNSPECIFIED diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index 77ed5da2..8e1e2a88 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -168,6 +168,7 @@ The supernode will connect to the Lumera network and begin participating in the kr, appConfig.SupernodeConfig.KeyName, appConfig.BaseDir, + appConfig.GetP2PDataDir(), ) if err != nil { logtrace.Fatal(ctx, "Failed to initialize host reporter", logtrace.Fields{"error": err.Error()}) diff --git a/supernode/host_reporter/service.go b/supernode/host_reporter/service.go index 3dc2198d..e0c1c16e 100644 --- a/supernode/host_reporter/service.go +++ b/supernode/host_reporter/service.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "net" + "os" + "path/filepath" "strconv" "strings" "time" @@ -40,9 +42,10 @@ type Service struct { metrics *statussvc.MetricsCollector storagePaths []string + p2pDataDir string } -func NewService(identity string, lumeraClient lumera.Client, kr keyring.Keyring, keyName string, baseDir string) (*Service, error) { +func NewService(identity string, lumeraClient lumera.Client, kr keyring.Keyring, keyName string, baseDir string, p2pDataDir string) (*Service, error) { identity = strings.TrimSpace(identity) if identity == "" { return nil, fmt.Errorf("identity is empty") @@ -72,8 +75,12 @@ func NewService(identity string, lumeraClient lumera.Client, kr keyring.Keyring, } storagePaths := []string{} - if baseDir = strings.TrimSpace(baseDir); baseDir != "" { - // Match legacy disk reporting behavior: measure the volume where the supernode stores its data. + p2pDataDir = strings.TrimSpace(p2pDataDir) + if p2pDataDir != "" { + // Everlight requirement: disk usage must reflect the mount/volume where p2p data is stored. + storagePaths = []string{p2pDataDir} + } else if baseDir = strings.TrimSpace(baseDir); baseDir != "" { + // Fallback for legacy setups where p2p data dir isn't configured. storagePaths = []string{baseDir} } @@ -86,6 +93,7 @@ func NewService(identity string, lumeraClient lumera.Client, kr keyring.Keyring, dialTimeout: defaultDialTimeout, metrics: statussvc.NewMetricsCollector(), storagePaths: storagePaths, + p2pDataDir: strings.TrimSpace(p2pDataDir), }, nil } @@ -144,6 +152,9 @@ func (s *Service) tick(ctx context.Context) { if diskUsagePercent, ok := s.diskUsagePercent(tickCtx); ok { hostReport.DiskUsagePercent = diskUsagePercent } + if cascadeBytes, ok := s.cascadeKademliaDBBytes(tickCtx); ok { + hostReport.CascadeKademliaDbBytes = float64(cascadeBytes) + } if _, err := s.lumera.AuditMsg().SubmitEpochReport(tickCtx, epochID, hostReport, storageChallengeObservations); err != nil { logtrace.Warn(tickCtx, "epoch report submit failed", logtrace.Fields{ @@ -170,6 +181,30 @@ func (s *Service) diskUsagePercent(ctx context.Context) (float64, bool) { return infos[0].UsagePercent, true } +func (s *Service) cascadeKademliaDBBytes(_ context.Context) (uint64, bool) { + dir := strings.TrimSpace(s.p2pDataDir) + if dir == "" { + return 0, false + } + // Kademlia SQLite store uses data*.sqlite3 files (+ WAL/SHM sidecars). + matches, err := filepath.Glob(filepath.Join(dir, "data*.sqlite3*")) + if err != nil || len(matches) == 0 { + return 0, false + } + var total uint64 + for _, p := range matches { + st, err := os.Stat(p) + if err != nil || st == nil || st.IsDir() { + continue + } + total += uint64(st.Size()) + } + if total == 0 { + return 0, false + } + return total, true +} + func (s *Service) buildStorageChallengeObservations(ctx context.Context, epochID uint64, requiredOpenPorts []uint32, targets []string) []*audittypes.StorageChallengeObservation { if len(targets) == 0 { return nil diff --git a/supernode/host_reporter/service_test.go b/supernode/host_reporter/service_test.go index 93de4c3c..fe8ed21a 100644 --- a/supernode/host_reporter/service_test.go +++ b/supernode/host_reporter/service_test.go @@ -1,6 +1,11 @@ package host_reporter -import "testing" +import ( + "context" + "os" + "path/filepath" + "testing" +) func TestNormalizeProbeHost(t *testing.T) { t.Parallel() @@ -27,3 +32,39 @@ func TestNormalizeProbeHost(t *testing.T) { }) } } + +func TestCascadeKademliaDBBytes(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + mustWrite := func(name string, size int) { + p := filepath.Join(dir, name) + b := make([]byte, size) + if err := os.WriteFile(p, b, 0o600); err != nil { + t.Fatalf("write %s: %v", name, err) + } + } + + mustWrite("data001.sqlite3", 100) + mustWrite("data001.sqlite3-wal", 50) + mustWrite("data001.sqlite3-shm", 25) + mustWrite("unrelated.txt", 999) + + s := &Service{p2pDataDir: dir} + got, ok := s.cascadeKademliaDBBytes(context.Background()) + if !ok { + t.Fatalf("expected ok=true") + } + if want := uint64(175); got != want { + t.Fatalf("cascadeKademliaDBBytes=%d want %d", got, want) + } +} + +func TestCascadeKademliaDBBytes_NoMatches(t *testing.T) { + t.Parallel() + s := &Service{p2pDataDir: t.TempDir()} + _, ok := s.cascadeKademliaDBBytes(context.Background()) + if ok { + t.Fatalf("expected ok=false when no sqlite db files exist") + } +} diff --git a/supernode/host_reporter/tick_behavior_test.go b/supernode/host_reporter/tick_behavior_test.go new file mode 100644 index 00000000..27927c72 --- /dev/null +++ b/supernode/host_reporter/tick_behavior_test.go @@ -0,0 +1,226 @@ +package host_reporter + +import ( + "context" + "errors" + "testing" + "time" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + lumeraMock "github.com/LumeraProtocol/supernode/v2/pkg/lumera" + auditmsgmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/audit_msg" + nodemod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/node" + supernodemod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" + "github.com/cosmos/go-bip39" + "go.uber.org/mock/gomock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type stubAuditModule struct { + currentEpoch *audittypes.QueryCurrentEpochResponse + anchor *audittypes.QueryEpochAnchorResponse + epochReportErr error + assigned *audittypes.QueryAssignedTargetsResponse +} + +func (s *stubAuditModule) GetParams(ctx context.Context) (*audittypes.QueryParamsResponse, error) { + return &audittypes.QueryParamsResponse{}, nil +} +func (s *stubAuditModule) GetEpochAnchor(ctx context.Context, epochID uint64) (*audittypes.QueryEpochAnchorResponse, error) { + return s.anchor, nil +} +func (s *stubAuditModule) GetCurrentEpoch(ctx context.Context) (*audittypes.QueryCurrentEpochResponse, error) { + return s.currentEpoch, nil +} +func (s *stubAuditModule) GetCurrentEpochAnchor(ctx context.Context) (*audittypes.QueryCurrentEpochAnchorResponse, error) { + return &audittypes.QueryCurrentEpochAnchorResponse{}, nil +} +func (s *stubAuditModule) GetAssignedTargets(ctx context.Context, supernodeAccount string, epochID uint64) (*audittypes.QueryAssignedTargetsResponse, error) { + return s.assigned, nil +} +func (s *stubAuditModule) GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*audittypes.QueryEpochReportResponse, error) { + if s.epochReportErr != nil { + return nil, s.epochReportErr + } + return &audittypes.QueryEpochReportResponse{}, nil +} + +func testKeyringAndIdentity(t *testing.T) (keyring.Keyring, string, string) { + t.Helper() + interfaceRegistry := codectypes.NewInterfaceRegistry() + cryptocodec.RegisterInterfaces(interfaceRegistry) + cdc := codec.NewProtoCodec(interfaceRegistry) + kr := keyring.NewInMemory(cdc) + + entropy, err := bip39.NewEntropy(128) + if err != nil { + t.Fatalf("entropy: %v", err) + } + mnemonic, err := bip39.NewMnemonic(entropy) + if err != nil { + t.Fatalf("mnemonic: %v", err) + } + algoList, _ := kr.SupportedAlgorithms() + signingAlgo, err := keyring.NewSigningAlgoFromString("secp256k1", algoList) + if err != nil { + t.Fatalf("signing algo: %v", err) + } + hdPath := hd.CreateHDPath(118, 0, 0).String() + rec, err := kr.NewAccount("test", mnemonic, "", hdPath, signingAlgo) + if err != nil { + t.Fatalf("new account: %v", err) + } + addr, err := rec.GetAddress() + if err != nil { + t.Fatalf("get addr: %v", err) + } + return kr, "test", addr.String() +} + +func TestTick_ProberSubmitsObservationsForAssignedTargets(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + kr, keyName, identity := testKeyringAndIdentity(t) + auditMod := &stubAuditModule{ + currentEpoch: &audittypes.QueryCurrentEpochResponse{EpochId: 7}, + anchor: &audittypes.QueryEpochAnchorResponse{Anchor: audittypes.EpochAnchor{EpochId: 7}}, + epochReportErr: status.Error(codes.NotFound, "not found"), + assigned: &audittypes.QueryAssignedTargetsResponse{ + TargetSupernodeAccounts: []string{"snA", "snB"}, + RequiredOpenPorts: []uint32{4444}, + }, + } + auditMsg := auditmsgmod.NewMockModule(ctrl) + node := nodemod.NewMockModule(ctrl) + sn := supernodemod.NewMockModule(ctrl) + client := lumeraMock.NewMockClient(ctrl) + client.EXPECT().Audit().AnyTimes().Return(auditMod) + client.EXPECT().AuditMsg().AnyTimes().Return(auditMsg) + client.EXPECT().SuperNode().AnyTimes().Return(sn) + client.EXPECT().Node().AnyTimes().Return(node) + + sn.EXPECT().GetSupernodeWithLatestAddress(gomock.Any(), "snA").Return(&supernodemod.SuperNodeInfo{LatestAddress: "127.0.0.1:4444"}, nil) + sn.EXPECT().GetSupernodeWithLatestAddress(gomock.Any(), "snB").Return(&supernodemod.SuperNodeInfo{LatestAddress: "127.0.0.1:4444"}, nil) + auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), uint64(7), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ uint64, _ audittypes.HostReport, obs []*audittypes.StorageChallengeObservation) (*sdktx.BroadcastTxResponse, error) { + if len(obs) != 2 { + t.Fatalf("expected 2 observations, got %d", len(obs)) + } + for _, o := range obs { + if o == nil || o.TargetSupernodeAccount == "" || len(o.PortStates) != 1 { + t.Fatalf("invalid observation: %+v", o) + } + } + return &sdktx.BroadcastTxResponse{}, nil + }, + ) + + svc, err := NewService(identity, client, kr, keyName, "", "") + if err != nil { + t.Fatalf("new service: %v", err) + } + svc.dialTimeout = 10 * time.Millisecond + svc.tick(context.Background()) +} + +func TestTick_NonProberSubmitsHostOnly(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + kr, keyName, identity := testKeyringAndIdentity(t) + auditMod := &stubAuditModule{ + currentEpoch: &audittypes.QueryCurrentEpochResponse{EpochId: 8}, + anchor: &audittypes.QueryEpochAnchorResponse{Anchor: audittypes.EpochAnchor{EpochId: 8}}, + epochReportErr: status.Error(codes.NotFound, "not found"), + assigned: &audittypes.QueryAssignedTargetsResponse{ + TargetSupernodeAccounts: nil, + RequiredOpenPorts: []uint32{4444, 4445}, + }, + } + auditMsg := auditmsgmod.NewMockModule(ctrl) + node := nodemod.NewMockModule(ctrl) + sn := supernodemod.NewMockModule(ctrl) + client := lumeraMock.NewMockClient(ctrl) + client.EXPECT().Audit().AnyTimes().Return(auditMod) + client.EXPECT().AuditMsg().AnyTimes().Return(auditMsg) + client.EXPECT().SuperNode().AnyTimes().Return(sn) + client.EXPECT().Node().AnyTimes().Return(node) + auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), uint64(8), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ uint64, _ audittypes.HostReport, obs []*audittypes.StorageChallengeObservation) (*sdktx.BroadcastTxResponse, error) { + if len(obs) != 0 { + t.Fatalf("expected 0 observations for non-prober, got %d", len(obs)) + } + return &sdktx.BroadcastTxResponse{}, nil + }, + ) + + svc, err := NewService(identity, client, kr, keyName, "", "") + if err != nil { + t.Fatalf("new service: %v", err) + } + svc.tick(context.Background()) +} + +func TestTick_SkipsWhenEpochAlreadyReported(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + kr, keyName, identity := testKeyringAndIdentity(t) + auditMod := &stubAuditModule{ + currentEpoch: &audittypes.QueryCurrentEpochResponse{EpochId: 9}, + anchor: &audittypes.QueryEpochAnchorResponse{Anchor: audittypes.EpochAnchor{EpochId: 9}}, + epochReportErr: nil, + assigned: &audittypes.QueryAssignedTargetsResponse{}, + } + auditMsg := auditmsgmod.NewMockModule(ctrl) + node := nodemod.NewMockModule(ctrl) + sn := supernodemod.NewMockModule(ctrl) + client := lumeraMock.NewMockClient(ctrl) + client.EXPECT().Audit().AnyTimes().Return(auditMod) + client.EXPECT().AuditMsg().AnyTimes().Return(auditMsg) + client.EXPECT().SuperNode().AnyTimes().Return(sn) + client.EXPECT().Node().AnyTimes().Return(node) + auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + + svc, err := NewService(identity, client, kr, keyName, "", "") + if err != nil { + t.Fatalf("new service: %v", err) + } + svc.tick(context.Background()) +} + +func TestTick_SkipsOnEpochReportLookupError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + kr, keyName, identity := testKeyringAndIdentity(t) + auditMod := &stubAuditModule{ + currentEpoch: &audittypes.QueryCurrentEpochResponse{EpochId: 10}, + anchor: &audittypes.QueryEpochAnchorResponse{Anchor: audittypes.EpochAnchor{EpochId: 10}}, + epochReportErr: errors.New("rpc unavailable"), + assigned: &audittypes.QueryAssignedTargetsResponse{}, + } + auditMsg := auditmsgmod.NewMockModule(ctrl) + node := nodemod.NewMockModule(ctrl) + sn := supernodemod.NewMockModule(ctrl) + client := lumeraMock.NewMockClient(ctrl) + client.EXPECT().Audit().AnyTimes().Return(auditMod) + client.EXPECT().AuditMsg().AnyTimes().Return(auditMsg) + client.EXPECT().SuperNode().AnyTimes().Return(sn) + client.EXPECT().Node().AnyTimes().Return(node) + auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + + svc, err := NewService(identity, client, kr, keyName, "", "") + if err != nil { + t.Fatalf("new service: %v", err) + } + svc.tick(context.Background()) +} diff --git a/supernode/supernode_metrics/active_probing.go b/supernode/supernode_metrics/active_probing.go index 3545d979..51e2e04e 100644 --- a/supernode/supernode_metrics/active_probing.go +++ b/supernode/supernode_metrics/active_probing.go @@ -346,7 +346,7 @@ func buildProbeCandidates(supernodes []*sntypes.SuperNode) (senders []probeTarge if state == sntypes.SuperNodeStateStopped { continue } - if state != sntypes.SuperNodeStateActive && state != sntypes.SuperNodeStatePostponed { + if state != sntypes.SuperNodeStateActive && state != sntypes.SuperNodeStateStorageFull && state != sntypes.SuperNodeStatePostponed { continue } @@ -383,7 +383,7 @@ func buildProbeCandidates(supernodes []*sntypes.SuperNode) (senders []probeTarge } peersByID[peerID] = t receivers = append(receivers, t) - if state == sntypes.SuperNodeStateActive { + if state == sntypes.SuperNodeStateActive || state == sntypes.SuperNodeStateStorageFull { senders = append(senders, t) } } diff --git a/supernode/supernode_metrics/reachability_active_probing_test.go b/supernode/supernode_metrics/reachability_active_probing_test.go index 5f138fe9..857c8da5 100644 --- a/supernode/supernode_metrics/reachability_active_probing_test.go +++ b/supernode/supernode_metrics/reachability_active_probing_test.go @@ -24,6 +24,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode_msg" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" "github.com/LumeraProtocol/supernode/v2/pkg/reachability" + "github.com/cosmos/cosmos-sdk/types/query" ) func TestBuildProbeCandidatesFilters(t *testing.T) { @@ -59,6 +60,15 @@ func TestBuildProbeCandidatesFilters(t *testing.T) { {Height: height, Address: "203.0.113.3:4444"}, }, }, + { + SupernodeAccount: "storage_full", + States: []*sntypes.SuperNodeStateRecord{ + {Height: height, State: sntypes.SuperNodeStateStorageFull}, + }, + PrevIpAddresses: []*sntypes.IPAddressHistory{ + {Height: height, Address: "203.0.113.6:4444"}, + }, + }, { SupernodeAccount: "ipv6", States: []*sntypes.SuperNodeStateRecord{ @@ -89,11 +99,11 @@ func TestBuildProbeCandidatesFilters(t *testing.T) { } senders, receivers, peersByID := buildProbeCandidates(active) - if len(senders) != 4 { - t.Fatalf("expected 4 senders (ACTIVE only), got %d", len(senders)) + if len(senders) != 5 { + t.Fatalf("expected 5 senders (ACTIVE+STORAGE_FULL), got %d", len(senders)) } - if len(receivers) != 5 { - t.Fatalf("expected 5 receivers (ACTIVE+POSTPONED), got %d", len(receivers)) + if len(receivers) != 6 { + t.Fatalf("expected 6 receivers (ACTIVE+STORAGE_FULL+POSTPONED), got %d", len(receivers)) } if peersByID["a"].grpcPort != 4444 || peersByID["a"].p2pPort != 5555 || peersByID["a"].metricsHeight != height { t.Fatalf("unexpected peer a: %+v", peersByID["a"]) @@ -101,6 +111,9 @@ func TestBuildProbeCandidatesFilters(t *testing.T) { if peersByID["postponed"].identity == "" { t.Fatalf("expected postponed peer present") } + if peersByID["storage_full"].identity == "" { + t.Fatalf("expected storage_full peer present") + } } func TestOpenPortsClosedWhenQuorumSatisfiedAndNoEvidence(t *testing.T) { @@ -470,3 +483,42 @@ func (m *fakeSupernodeModule) GetParams(context.Context) (*sntypes.QueryParamsRe func (m *fakeSupernodeModule) ListSuperNodes(context.Context) (*sntypes.QueryListSuperNodesResponse, error) { return &sntypes.QueryListSuperNodesResponse{Supernodes: m.supernodes}, nil } +func (m *fakeSupernodeModule) GetPoolState(context.Context) (*sntypes.QueryPoolStateResponse, error) { + return &sntypes.QueryPoolStateResponse{}, nil +} +func (m *fakeSupernodeModule) GetSNEligibility(context.Context, string) (*sntypes.QuerySNEligibilityResponse, error) { + return &sntypes.QuerySNEligibilityResponse{Eligible: true, Reason: "mock"}, nil +} +func (m *fakeSupernodeModule) GetPayoutHistory(context.Context, string, *query.PageRequest) (*sntypes.QueryPayoutHistoryResponse, error) { + return &sntypes.QueryPayoutHistoryResponse{}, nil +} + +func TestBuildProbeCandidates_PostponedReceiveOnly_StorageFullCanSend(t *testing.T) { + height := int64(100) + supernodes := []*sntypes.SuperNode{ + mkSNState("active", "203.0.113.10", height, height, sntypes.SuperNodeStateActive), + mkSNState("storage", "203.0.113.11", height, height, sntypes.SuperNodeStateStorageFull), + mkSNState("postponed", "203.0.113.12", height, height, sntypes.SuperNodeStatePostponed), + } + + senders, receivers, _ := buildProbeCandidates(supernodes) + + senderSet := map[string]bool{} + for _, s := range senders { + senderSet[s.identity] = true + } + receiverSet := map[string]bool{} + for _, r := range receivers { + receiverSet[r.identity] = true + } + + if !senderSet["active"] || !senderSet["storage"] { + t.Fatalf("expected ACTIVE and STORAGE_FULL in senders, got=%v", senderSet) + } + if senderSet["postponed"] { + t.Fatalf("expected POSTPONED to be receive-only, got senders=%v", senderSet) + } + if !receiverSet["active"] || !receiverSet["storage"] || !receiverSet["postponed"] { + t.Fatalf("expected ACTIVE/STORAGE_FULL/POSTPONED in receivers, got=%v", receiverSet) + } +} diff --git a/supernode/verifier/verifier.go b/supernode/verifier/verifier.go index 7367c089..8fe24836 100644 --- a/supernode/verifier/verifier.go +++ b/supernode/verifier/verifier.go @@ -131,9 +131,9 @@ func (cv *ConfigVerifier) checkSupernodeExists(ctx context.Context, result *Veri func (cv *ConfigVerifier) checkSupernodeState(result *VerificationResult, supernodeInfo *snmodule.SuperNodeInfo) { state := adapterlumera.ParseSupernodeState(supernodeInfo.CurrentState) - allowedStates := fmt.Sprintf("%s or %s", adapterlumera.SUPERNODE_STATE_ACTIVE, adapterlumera.SUPERNODE_STATE_POSTPONED) + allowedStates := fmt.Sprintf("%s or %s or %s", adapterlumera.SUPERNODE_STATE_ACTIVE, adapterlumera.SUPERNODE_STATE_STORAGE_FULL, adapterlumera.SUPERNODE_STATE_POSTPONED) - if supernodeInfo.CurrentState == "" || state == adapterlumera.SUPERNODE_STATE_ACTIVE { + if supernodeInfo.CurrentState == "" || state == adapterlumera.SUPERNODE_STATE_ACTIVE || state == adapterlumera.SUPERNODE_STATE_STORAGE_FULL { return } diff --git a/supernode/verifier/verifier_test.go b/supernode/verifier/verifier_test.go new file mode 100644 index 00000000..e09ccefe --- /dev/null +++ b/supernode/verifier/verifier_test.go @@ -0,0 +1,38 @@ +package verifier + +import ( + "testing" + + snmodule "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" +) + +func TestCheckSupernodeState_AllowsStorageFull(t *testing.T) { + cv := &ConfigVerifier{} + result := &VerificationResult{Valid: true} + + cv.checkSupernodeState(result, &snmodule.SuperNodeInfo{CurrentState: "SUPERNODE_STATE_STORAGE_FULL"}) + + if !result.Valid { + t.Fatalf("expected STORAGE_FULL to be allowed, got invalid result: %+v", result.Errors) + } + if len(result.Errors) != 0 { + t.Fatalf("expected no errors, got: %+v", result.Errors) + } +} + +func TestCheckSupernodeState_PostponedWarnsNotErrors(t *testing.T) { + cv := &ConfigVerifier{} + result := &VerificationResult{Valid: true} + + cv.checkSupernodeState(result, &snmodule.SuperNodeInfo{CurrentState: "SUPERNODE_STATE_POSTPONED"}) + + if !result.Valid { + t.Fatalf("expected POSTPONED to remain valid with warning") + } + if len(result.Errors) != 0 { + t.Fatalf("expected no errors, got: %+v", result.Errors) + } + if len(result.Warnings) == 0 { + t.Fatalf("expected warning for POSTPONED state") + } +} diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 4760c7e8..bd92561c 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "cosmossdk.io/math" actiontypes "github.com/LumeraProtocol/lumera/x/action/v1/types" "github.com/LumeraProtocol/supernode/v2/pkg/keyring" "github.com/LumeraProtocol/supernode/v2/pkg/lumera" @@ -487,9 +488,36 @@ waitLoop: var toAddress string var amount string + // Validate fee-distribution invariant (value conservation). + // + // Pre-Everlight the finalize tx produced a single coin_spent event with + // amount == autoPrice (the action module paying the supernode). Under + // Everlight Phase 1 (lumera PR #113) the action module splits the fee + // across multiple bank sends — reward-pool share, optional foundation + // share, and the supernode payout — so no single event equals autoPrice. + // + // The real product invariant is value conservation: for this finalize + // message, total tokens leaving the action module equals autoPrice, and + // total tokens received across recipients equals autoPrice. That is + // independent of the split ratio (which is a governance-tunable parameter + // `RegistrationFeeShareBps`), so this assertion also survives future + // parameter changes. + // + // Discriminating message-triggered bank events from the tx gas-fee + // deduction: the gas-fee coin_spent/coin_received pair carries no + // `msg_index` attribute; events emitted from inside MsgFinalizeAction + // execution carry `msg_index=0`. We sum only the msg_index-tagged events. + autoPriceCoin, err := sdk.ParseCoinNormalized(autoPrice) + require.NoError(t, err, "autoPrice must parse as a coin") + + spenderTotals := make(map[string]sdk.Coins) + recvTotals := make(map[string]sdk.Coins) + for _, event := range events { + etype := event.Get("type").String() + // Check for action finalized event - if event.Get("type").String() == "action_finalized" { + if etype == "action_finalized" { actionFinalized = true attrs := event.Get("attributes").Array() for _, attr := range attrs { @@ -500,44 +528,78 @@ waitLoop: require.Equal(t, actionID, attr.Get("value").String(), "Action ID should match") } } + continue } - // Check for fee spent event - if event.Get("type").String() == "coin_spent" { - attrs := event.Get("attributes").Array() - for i, attr := range attrs { - if attr.Get("key").String() == "amount" && attr.Get("value").String() == autoPrice { - feeSpent = true - // Get the spender address from the same event group - for j, addrAttr := range attrs { - if j < i && addrAttr.Get("key").String() == "spender" { - fromAddress = addrAttr.Get("value").String() - break - } - } - } + if etype != "coin_spent" && etype != "coin_received" { + continue + } + + attrs := event.Get("attributes").Array() + var addr, amtStr string + var hasMsgIndex bool + addrKey := "spender" + if etype == "coin_received" { + addrKey = "receiver" + } + for _, a := range attrs { + switch a.Get("key").String() { + case addrKey: + addr = a.Get("value").String() + case "amount": + amtStr = a.Get("value").String() + case "msg_index": + hasMsgIndex = true } } + // Exclude tx-level gas-fee events (no msg_index) — they belong to + // the finalizer, not the action-module payout flow. + if !hasMsgIndex || addr == "" || amtStr == "" { + continue + } + coin, perr := sdk.ParseCoinNormalized(amtStr) + if perr != nil || coin.Denom != autoPriceCoin.Denom { + continue + } + if etype == "coin_spent" { + spenderTotals[addr] = spenderTotals[addr].Add(coin) + } else { + recvTotals[addr] = recvTotals[addr].Add(coin) + } + } - // Check for fee received event - if event.Get("type").String() == "coin_received" { - attrs := event.Get("attributes").Array() - for i, attr := range attrs { - if attr.Get("key").String() == "amount" && attr.Get("value").String() == autoPrice { - feeReceived = true - // Get the receiver address from the same event group - for j, addrAttr := range attrs { - if j < i && addrAttr.Get("key").String() == "receiver" { - toAddress = addrAttr.Get("value").String() - break - } - } - amount = attr.Get("value").String() - } + // feeSpent: exactly one spender (the action module) paid out autoPrice + // in total across all message-triggered coin_spent events. + for addr, coins := range spenderTotals { + if coins.AmountOf(autoPriceCoin.Denom).Equal(autoPriceCoin.Amount) { + feeSpent = true + fromAddress = addr + amount = autoPriceCoin.String() + break + } + } + + // feeReceived: recipients received autoPrice in total. + totalReceived := sdk.NewCoins() + for _, coins := range recvTotals { + totalReceived = totalReceived.Add(coins...) + } + if totalReceived.AmountOf(autoPriceCoin.Denom).Equal(autoPriceCoin.Amount) { + feeReceived = true + // Pick the recipient of the largest single share for the payment-flow + // log (this is the supernode payout under the default split ratio). + var maxAmt = math.ZeroInt() + for addr, coins := range recvTotals { + amt := coins.AmountOf(autoPriceCoin.Denom) + if amt.GT(maxAmt) { + maxAmt = amt + toAddress = addr } } } + t.Logf("Fee distribution: spender_totals=%v receiver_totals=%v", spenderTotals, recvTotals) + // Validate events require.True(t, actionFinalized, "Action finalized event should be emitted") require.True(t, feeSpent, "Fee spent event should be emitted") diff --git a/tests/system/go.mod b/tests/system/go.mod index 15b3244e..aeb95b7a 100644 --- a/tests/system/go.mod +++ b/tests/system/go.mod @@ -1,6 +1,6 @@ module github.com/LumeraProtocol/supernode/v2/tests/systemtests -go 1.25.5 +go 1.25.9 replace ( github.com/LumeraProtocol/supernode/v2 => ../../ @@ -11,9 +11,9 @@ replace ( require ( cosmossdk.io/math v1.5.3 - github.com/LumeraProtocol/lumera v1.11.2-0.20260413145614-4ffe74bb13dc + github.com/LumeraProtocol/lumera v1.12.0-rc github.com/LumeraProtocol/supernode/v2 v2.0.0-00010101000000-000000000000 - github.com/cometbft/cometbft v0.38.20 + github.com/cometbft/cometbft v0.38.21 github.com/cosmos/ibc-go/v10 v10.5.0 github.com/tidwall/gjson v1.14.2 github.com/tidwall/sjson v1.2.5 diff --git a/tests/system/go.sum b/tests/system/go.sum index 062d8096..815a8d71 100644 --- a/tests/system/go.sum +++ b/tests/system/go.sum @@ -107,8 +107,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/LumeraProtocol/lumera v1.11.2-0.20260413145614-4ffe74bb13dc h1:B43KT06s/4lE/LyVQevb0Xr5XqKy6nlel1fZh7G7w14= -github.com/LumeraProtocol/lumera v1.11.2-0.20260413145614-4ffe74bb13dc/go.mod h1:p2sZZG3bLzSBdaW883qjuU3DXXY4NJzTTwLywr8uI0w= +github.com/LumeraProtocol/lumera v1.12.0-rc h1:Mfae496LpjYhf1SvAE/bsmtjgdoOD8WAJFRCier8xsg= +github.com/LumeraProtocol/lumera v1.12.0-rc/go.mod h1:/G9LTPZB+261tHoWoj7q+1fn+O/VV0zzagwLdsThSNo= github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4= github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8= github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4= @@ -221,8 +221,8 @@ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1: github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coder/websocket v1.8.7 h1:jiep6gmlfP/yq2w1gBoubJEXL9gf8x3bp6lzzX8nJxE= github.com/coder/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= -github.com/cometbft/cometbft v0.38.20 h1:i9v9rvh3Z4CZvGSWrByAOpiqNq5WLkat3r/tE/B49RU= -github.com/cometbft/cometbft v0.38.20/go.mod h1:UCu8dlHqvkAsmAFmWDRWNZJPlu6ya2fTWZlDrWsivwo= +github.com/cometbft/cometbft v0.38.21 h1:qcIJSH9LiwU5s6ZgKR5eRbsLNucbubfraDs5bzgjtOI= +github.com/cometbft/cometbft v0.38.21/go.mod h1:UCu8dlHqvkAsmAFmWDRWNZJPlu6ya2fTWZlDrWsivwo= github.com/cometbft/cometbft-db v0.14.1 h1:SxoamPghqICBAIcGpleHbmoPqy+crij/++eZz3DlerQ= github.com/cometbft/cometbft-db v0.14.1/go.mod h1:KHP1YghilyGV/xjD5DP3+2hyigWx0WTp9X+0Gnx0RxQ= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg=