diff --git a/docs/release-notes/release-notes-0.20.1.md b/docs/release-notes/release-notes-0.20.1.md index 59370eaae7e..32ae26b3103 100644 --- a/docs/release-notes/release-notes-0.20.1.md +++ b/docs/release-notes/release-notes-0.20.1.md @@ -32,6 +32,10 @@ addresses](https://github.com/lightningnetwork/lnd/pull/10341) were added to the node announcement and `getinfo` output. +* [Fix potential sql tx exhaustion + issue](https://github.com/lightningnetwork/lnd/pull/10428) in LND which might + happen when running postgres with a limited number of connections configured. + # New Features ## Functional Enhancements diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index cc9a14889d3..ec55ee3be52 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -2116,6 +2116,13 @@ func (c *KVStore) fetchNextChanUpdateBatch( batch []ChannelEdge hasMore bool ) + + // Acquire read lock before starting transaction to ensure + // consistent lock ordering (cacheMu -> DB) and prevent + // deadlock with write operations. + c.cacheMu.RLock() + defer c.cacheMu.RUnlock() + err := kvdb.View(c.db, func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) if edges == nil { @@ -2195,9 +2202,7 @@ func (c *KVStore) fetchNextChanUpdateBatch( continue } - // Before we read the edge info, we'll see if this - // element is already in the cache or not. - c.cacheMu.RLock() + // Check cache (we already hold shared read lock). if channel, ok := c.chanCache.get(chanIDInt); ok { state.edgesSeen[chanIDInt] = struct{}{} @@ -2208,11 +2213,8 @@ func (c *KVStore) fetchNextChanUpdateBatch( indexKey, _ = updateCursor.Next() - c.cacheMu.RUnlock() - continue } - c.cacheMu.RUnlock() // The edge wasn't in the cache, so we'll fetch it along // w/ the edge policies and nodes. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index f67894e4ce0..2d85971acf5 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -1126,6 +1126,11 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time, for hasMore { var batch []ChannelEdge + // Acquire read lock before starting transaction to + // ensure consistent lock ordering (cacheMu -> DB) and + // prevent deadlock with write operations. + s.cacheMu.RLock() + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { //nolint:ll @@ -1178,11 +1183,11 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time, continue } - s.cacheMu.RLock() + // Check cache (we already hold + // shared read lock). channel, ok := s.chanCache.get( chanIDInt, ) - s.cacheMu.RUnlock() if ok { hits++ total++ @@ -1216,6 +1221,9 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time, ) }) + // Release read lock after transaction completes. + s.cacheMu.RUnlock() + if err != nil { log.Errorf("ChanUpdatesInHorizon "+ "batch error: %v", err)