Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph+routing: refactor to remove graphsession #9513

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
5 changes: 4 additions & 1 deletion docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ config option](https://github.com/lightningnetwork/lnd/pull/9182) and introduce
a new option `channel-max-fee-exposure` which is unambiguous in its description.
The underlying functionality between those two options remain the same.

* [Abstraction of graph](https://github.com/lightningnetwork/lnd/pull/9480)
* [Abstraction of graph]
access for autopilot.

* [Golang was updated to
Expand All @@ -256,6 +256,9 @@ The underlying functionality between those two options remain the same.
[2](https://github.com/lightningnetwork/lnd/pull/9477)
[3](https://github.com/lightningnetwork/lnd/pull/9478).

* Graph abstraction work:
- [Abstract autopilot access](https://github.com/lightningnetwork/lnd/pull/9480)
- [Refactor to hide DB transactions](https://github.com/lightningnetwork/lnd/pull/9513)

## Breaking Changes
## Performance Improvements
Expand Down
118 changes: 98 additions & 20 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,6 @@ func initChannelGraph(db kvdb.Backend) error {
return nil
}

// NewPathFindTx returns a new read transaction that can be used for a single
// path finding session. Will return nil if the graph cache is enabled.
func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) {
if c.graphCache != nil {
return nil, nil
}

return c.db.BeginReadTx()
}

// AddrsForNode returns all known addresses for the target node public key that
// the graph DB is aware of. The returned boolean indicates if the given node is
// unknown to the graph DB or not.
Expand Down Expand Up @@ -500,13 +490,14 @@ func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo,
}, func() {})
}

// ForEachNodeDirectedChannel iterates through all channels of a given node,
// forEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller.
// is halted with the error propagated back up to the caller. An optional read
// transaction may be provided. If none is provided, a new one will be created.
//
// Unknown policies are passed into the callback as nil values.
func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
func (c *ChannelGraph) forEachNodeDirectedChannel(tx kvdb.RTx,
node route.Vertex, cb func(channel *DirectedChannel) error) error {

if c.graphCache != nil {
Expand All @@ -517,7 +508,7 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
toNodeCallback := func() route.Vertex {
return node
}
toNodeFeatures, err := c.FetchNodeFeatures(node)
toNodeFeatures, err := c.fetchNodeFeatures(tx, node)
if err != nil {
return err
}
Expand Down Expand Up @@ -561,17 +552,18 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
return nodeTraversal(tx, node[:], c.db, dbCallback)
}

// FetchNodeFeatures returns the features of a given node. If no features are
// known for the node, an empty feature vector is returned.
func (c *ChannelGraph) FetchNodeFeatures(
// fetchNodeFeatures returns the features of a given node. If no features are
// known for the node, an empty feature vector is returned. An optional read
// transaction may be provided. If none is provided, a new one will be created.
func (c *ChannelGraph) fetchNodeFeatures(tx kvdb.RTx,
node route.Vertex) (*lnwire.FeatureVector, error) {

if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}

// Fallback that uses the database.
targetNode, err := c.FetchLightningNode(node)
targetNode, err := c.FetchLightningNodeTx(tx, node)
switch err {
// If the node exists and has features, return them directly.
case nil:
Expand All @@ -588,6 +580,34 @@ func (c *ChannelGraph) FetchNodeFeatures(
}
}

// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller. If the graphCache
// is available, then it will be used to retrieve the node's channels instead
// of the database.
//
// Unknown policies are passed into the callback as nil values.
//
// NOTE: this is part of the graphdb.CachedGraph interface.
func (c *ChannelGraph) ForEachNodeDirectedChannel(nodePub route.Vertex,
cb func(channel *DirectedChannel) error) error {

return c.forEachNodeDirectedChannel(nil, nodePub, cb)
}

// FetchNodeFeatures returns the features of the given node. If no features are
// known for the node, an empty feature vector is returned.
// If the graphCache is available, then it will be used to retrieve the node's
// features instead of the database.
//
// NOTE: this is part of the graphdb.CachedGraph interface.
func (c *ChannelGraph) FetchNodeFeatures(nodePub route.Vertex) (
*lnwire.FeatureVector, error) {

return c.fetchNodeFeatures(nil, nodePub)
}

// ForEachNodeCached is similar to forEachNode, but it utilizes the channel
// graph cache instead. Note that this doesn't return all the information the
// regular forEachNode method does.
Expand Down Expand Up @@ -617,8 +637,8 @@ func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
toNodeCallback := func() route.Vertex {
return node.PubKeyBytes
}
toNodeFeatures, err := c.FetchNodeFeatures(
node.PubKeyBytes,
toNodeFeatures, err := c.fetchNodeFeatures(
tx, node.PubKeyBytes,
)
if err != nil {
return err
Expand Down Expand Up @@ -3873,6 +3893,64 @@ func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
return isClosed, nil
}

// GraphSession will provide the call-back with access to a graphdb.CachedGraph
// instance which can be used to perform queries against the channel graph.
// If the graph cache is not enabled, then the call-back will be provided with
// access to the graph via a consistent read-only transaction.
func (c *ChannelGraph) GraphSession(cb func(graph CachedGraph) error) error {
var (
tx kvdb.RTx
err error
commit = func() {}
)
if c.graphCache == nil {
tx, err = c.db.BeginReadTx()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could instead use a c.db.View() and execute the passed in callback within the view closure. This way we could completely remove BeginReadTx etc from kvdb.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea!!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, the only tricky part here though is that then it wont be a pure refactor right since currently we just log if BeginReadTx fails or if Rollback fails whereas if we use View then we will fully return errors there.

so perhaps another follow up just to keep this a pure refactor.

thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm!

if err != nil {
return err
}

commit = func() {
if err := tx.Rollback(); err != nil {
log.Errorf("Unable to rollback tx: %v", err)
}
}
}
defer commit()

return cb(&cachedGraphSession{
db: c,
tx: tx,
})
}

// cachedGraphSession implements the CachedGraph interface but with a backing
// read only transaction for a consistent view of the graph in the case where
// the graph Cache has not been enabled.
type cachedGraphSession struct {
tx kvdb.RTx
db *ChannelGraph
}

// ForEachNodeDirectedChannel calls the callback for every channel of the given
// node.
//
// NOTE: Part of the CachedGraph interface.
func (c *cachedGraphSession) ForEachNodeDirectedChannel(nodePub route.Vertex,
cb func(channel *DirectedChannel) error) error {

return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb)
}

// FetchNodeFeatures returns the features of the given node. If the node is
// unknown, assume no additional features are supported.
//
// NOTE: Part of the CachedGraph interface.
func (c *cachedGraphSession) FetchNodeFeatures(nodePub route.Vertex) (
*lnwire.FeatureVector, error) {

return c.db.fetchNodeFeatures(c.tx, nodePub)
}

func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl
updateIndex kvdb.RwBucket, node *models.LightningNode) error {

Expand Down
4 changes: 2 additions & 2 deletions graph/db/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3915,7 +3915,7 @@ func BenchmarkForEachChannel(b *testing.B) {
}
}

// TestGraphCacheForEachNodeChannel tests that the ForEachNodeDirectedChannel
// TestGraphCacheForEachNodeChannel tests that the forEachNodeDirectedChannel
// method works as expected, and is able to handle nil self edges.
func TestGraphCacheForEachNodeChannel(t *testing.T) {
graph, err := MakeTestGraph(t)
Expand Down Expand Up @@ -3952,7 +3952,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {

getSingleChannel := func() *DirectedChannel {
var ch *DirectedChannel
err = graph.ForEachNodeDirectedChannel(nil, node1.PubKeyBytes,
err = graph.forEachNodeDirectedChannel(nil, node1.PubKeyBytes,
func(c *DirectedChannel) error {
require.Nil(t, ch)
ch = c
Expand Down
14 changes: 14 additions & 0 deletions graph/db/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graphdb

import (
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)

Expand All @@ -23,3 +24,16 @@ type NodeRTx interface {
// the same transaction.
FetchNode(node route.Vertex) (NodeRTx, error)
}

// CachedGraph is an abstract read only interface that provides information
// about nodes and their edges. The interface is about providing fast read-only
// access to the graph and so if a cache is available, it should be used.
type CachedGraph interface {
// ForEachNodeDirectedChannel calls the callback for every channel of
// the given node.
ForEachNodeDirectedChannel(nodePub route.Vertex,
cb func(channel *DirectedChannel) error) error

// FetchNodeFeatures returns the features of the given node.
FetchNodeFeatures(nodePub route.Vertex) (*lnwire.FeatureVector, error)
}
141 changes: 0 additions & 141 deletions graph/graphsession/graph_session.go

This file was deleted.

2 changes: 1 addition & 1 deletion routing/bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newBandwidthManager(graph Graph, sourceNode route.Vertex,

// First, we'll collect the set of outbound edges from the target
// source node and add them to our bandwidth manager's map of channels.
err := graph.ForEachNodeChannel(sourceNode,
err := graph.ForEachNodeDirectedChannel(sourceNode,
func(channel *graphdb.DirectedChannel) error {
shortID := lnwire.NewShortChanIDFromInt(
channel.ChannelID,
Expand Down
Loading
Loading