Skip to content

Commit e4fc01b

Browse files
committed
multi: endpoints for onion messages
This commit creates the necessary endpoints for onion messages. Specifically, it adds the following: - `SendOnionMessage` endpoint to send onion messages. - `SubscribeOnionMessages` endpoint to subscribe to incoming onion messages. It uses the `msgmux` package to handle the onion messages.
1 parent b0f7e52 commit e4fc01b

File tree

9 files changed

+388
-0
lines changed

9 files changed

+388
-0
lines changed

itest/list_on_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,10 @@ var allTestCases = []*lntest.TestCase{
531531
Name: "custom message",
532532
TestFunc: testCustomMessage,
533533
},
534+
{
535+
Name: "onion message",
536+
TestFunc: testOnionMessage,
537+
},
534538
{
535539
Name: "sign verify message with addr",
536540
TestFunc: testSignVerifyMessageWithAddr,

itest/lnd_onion_message_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package itest
2+
3+
import (
4+
"time"
5+
6+
"github.com/btcsuite/btcd/btcec/v2"
7+
"github.com/lightningnetwork/lnd/lnrpc"
8+
"github.com/lightningnetwork/lnd/lntest"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
// testOnionMessage tests sending and receiving of the onion message type.
13+
func testOnionMessage(ht *lntest.HarnessTest) {
14+
alice := ht.NewNode("Alice", nil)
15+
bob := ht.NewNode("Bob", nil)
16+
17+
// Subscribe Alice to onion messages before we send any, so that we
18+
// don't miss any.
19+
msgClient, cancel := alice.RPC.SubscribeOnionMessages()
20+
defer cancel()
21+
22+
// Create a channel to receive onion messages on.
23+
messages := make(chan *lnrpc.OnionMessage)
24+
go func() {
25+
for {
26+
// If we fail to receive, just exit. The test should
27+
// fail elsewhere if it doesn't get a message that it
28+
// was expecting.
29+
msg, err := msgClient.Recv()
30+
if err != nil {
31+
return
32+
}
33+
34+
// Deliver the message into our channel or exit if the
35+
// test is shutting down.
36+
select {
37+
case messages <- msg:
38+
case <-ht.Context().Done():
39+
return
40+
}
41+
}
42+
}()
43+
44+
// Connect alice and bob so that they can exchange messages.
45+
ht.EnsureConnected(alice, bob)
46+
47+
// Create a random onion message.
48+
randomPriv, err := btcec.NewPrivateKey()
49+
require.NoError(ht.T, err)
50+
randomPub := randomPriv.PubKey()
51+
msgPathKey := randomPub.SerializeCompressed()
52+
msgOnion := []byte{1, 2, 3}
53+
54+
// Send it from Bob to Alice.
55+
bobMsg := &lnrpc.SendOnionMessageRequest{
56+
Peer: alice.PubKey[:],
57+
PathKey: msgPathKey,
58+
Onion: msgOnion,
59+
}
60+
bob.RPC.SendOnionMessage(bobMsg)
61+
62+
// Wait for Alice to receive the message.
63+
select {
64+
case msg := <-messages:
65+
// Check our type and data and (sanity) check the peer we got
66+
// it from.
67+
require.Equal(ht, msgOnion, msg.Onion, "msg data wrong")
68+
require.Equal(ht, msgPathKey, msg.PathKey, "msg "+
69+
"path key wrong")
70+
require.Equal(ht, bob.PubKey[:], msg.Peer, "msg peer wrong")
71+
72+
case <-time.After(lntest.DefaultTimeout):
73+
ht.Fatalf("alice did not receive onion message: %v", bobMsg)
74+
}
75+
}

lntest/rpc/lnd.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,8 @@ func (h *HarnessRPC) SubscribeChannelEvents() ChannelEventsClient {
726726

727727
type CustomMessageClient lnrpc.Lightning_SubscribeCustomMessagesClient
728728

729+
type OnionMessageClient lnrpc.Lightning_SubscribeOnionMessagesClient
730+
729731
// SubscribeCustomMessages creates a subscription client for custom messages.
730732
func (h *HarnessRPC) SubscribeCustomMessages() (CustomMessageClient,
731733
context.CancelFunc) {
@@ -758,6 +760,38 @@ func (h *HarnessRPC) SendCustomMessage(
758760
return resp
759761
}
760762

763+
// SendOnionMessage makes a RPC call to the node's SendOnionMessage and
764+
// returns the response.
765+
func (h *HarnessRPC) SendOnionMessage(
766+
req *lnrpc.SendOnionMessageRequest) *lnrpc.SendOnionMessageResponse {
767+
768+
ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout)
769+
defer cancel()
770+
771+
resp, err := h.LN.SendOnionMessage(ctxt, req)
772+
h.NoError(err, "SendOnionMessage")
773+
774+
return resp
775+
}
776+
777+
// SubscribeOnionMessages creates a subscription client for onion messages.
778+
func (h *HarnessRPC) SubscribeOnionMessages() (OnionMessageClient,
779+
context.CancelFunc) {
780+
781+
ctxt, cancel := context.WithCancel(h.runCtx)
782+
783+
req := &lnrpc.SubscribeOnionMessagesRequest{}
784+
785+
// SubscribeCustomMessages needs to have the context alive for the
786+
// entire test case as the returned client will be used for send and
787+
// receive events stream. Thus we use runCtx here instead of a timeout
788+
// context.
789+
stream, err := h.LN.SubscribeOnionMessages(ctxt, req)
790+
h.NoError(err, "SubscribeOnionMessages")
791+
792+
return stream, cancel
793+
}
794+
761795
// GetChanInfo makes a RPC call to the node's GetChanInfo and returns the
762796
// response.
763797
func (h *HarnessRPC) GetChanInfo(

log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/lightningnetwork/lnd/monitoring"
4747
"github.com/lightningnetwork/lnd/msgmux"
4848
"github.com/lightningnetwork/lnd/netann"
49+
"github.com/lightningnetwork/lnd/onionmessage"
4950
paymentsdb "github.com/lightningnetwork/lnd/payments/db"
5051
"github.com/lightningnetwork/lnd/peer"
5152
"github.com/lightningnetwork/lnd/peernotifier"
@@ -212,6 +213,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
212213
root, paymentsdb.Subsystem, interceptor, paymentsdb.UseLogger,
213214
)
214215

216+
AddSubLogger(root, onionmessage.Subsystem, interceptor, onionmessage.UseLogger)
215217
}
216218

217219
// AddSubLogger is a helper method to conveniently create and register the

onionmessage/log.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package onionmessage
2+
3+
import (
4+
"github.com/btcsuite/btclog/v2"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the logging code for this subsystem.
9+
const Subsystem = "OMSG"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// DisableLog disables all library log output. Logging output is disabled
22+
// by default until UseLogger is called.
23+
func DisableLog() {
24+
UseLogger(btclog.Disabled)
25+
}
26+
27+
// UseLogger uses a specified Logger to output package logging info.
28+
// This should be used in preference to SetLogWriter if the caller is also
29+
// using btclog.
30+
func UseLogger(logger btclog.Logger) {
31+
log = logger
32+
}

onionmessage/onion_endpoint.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package onionmessage
2+
3+
import (
4+
"context"
5+
"encoding/hex"
6+
"log/slog"
7+
8+
"github.com/lightningnetwork/lnd/lnutils"
9+
"github.com/lightningnetwork/lnd/lnwire"
10+
"github.com/lightningnetwork/lnd/msgmux"
11+
"github.com/lightningnetwork/lnd/subscribe"
12+
)
13+
14+
// OnionMessageUpdate is onion message update dispatched to any potential
15+
// subscriber.
16+
type OnionMessageUpdate struct {
17+
// Peer is the peer pubkey
18+
Peer [33]byte
19+
20+
// PathKey is the route blinding ephemeral pubkey to be used for
21+
// the onion message.
22+
PathKey [33]byte
23+
24+
// OnionBlob is the raw serialized mix header used to relay messages in
25+
// a privacy-preserving manner. This blob should be handled in the same
26+
// manner as onions used to route HTLCs, with the exception that it uses
27+
// blinded routes by default.
28+
OnionBlob []byte
29+
}
30+
31+
// OnionEndpoint handles incoming onion messages.
32+
type OnionEndpoint struct {
33+
// subscribe.Server is used for subscriptions to onion messages.
34+
onionMessageServer *subscribe.Server
35+
}
36+
37+
// A compile-time check to ensure OnionEndpoint implements the Endpoint
38+
// interface.
39+
var _ msgmux.Endpoint = (*OnionEndpoint)(nil)
40+
41+
// NewOnionEndpoint creates a new OnionEndpoint.
42+
func NewOnionEndpoint(messageServer *subscribe.Server) *OnionEndpoint {
43+
return &OnionEndpoint{
44+
onionMessageServer: messageServer,
45+
}
46+
}
47+
48+
// Name returns the unique name of the endpoint.
49+
func (o *OnionEndpoint) Name() string {
50+
return "OnionMessageHandler"
51+
}
52+
53+
// CanHandle checks if the endpoint can handle the incoming message.
54+
// It returns true if the message is an lnwire.OnionMessage.
55+
func (o *OnionEndpoint) CanHandle(msg msgmux.PeerMsg) bool {
56+
_, ok := msg.Message.(*lnwire.OnionMessage)
57+
return ok
58+
}
59+
60+
// SendMessage processes the incoming onion message.
61+
// It returns true if the message was successfully processed.
62+
func (o *OnionEndpoint) SendMessage(ctx context.Context,
63+
msg msgmux.PeerMsg) bool {
64+
65+
onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
66+
if !ok {
67+
return false
68+
}
69+
70+
peer := msg.PeerPub.SerializeCompressed()
71+
log.DebugS(ctx, "OnionEndpoint received OnionMessage",
72+
slog.String("peer", hex.EncodeToString(peer)),
73+
lnutils.LogPubKey("path_key", onionMsg.PathKey),
74+
lnutils.LogBytesPreview("onion_blob", onionMsg.OnionBlob),
75+
slog.Int("blob length", len(onionMsg.OnionBlob)))
76+
77+
var peerArr [33]byte
78+
copy(peerArr[:], peer)
79+
80+
// Convert blinding point []byte to [33]byte.
81+
blinding := onionMsg.PathKey.SerializeCompressed()
82+
var blindingArr [33]byte
83+
copy(blindingArr[:], blinding)
84+
85+
err := o.onionMessageServer.SendUpdate(&OnionMessageUpdate{
86+
Peer: peerArr,
87+
PathKey: blindingArr,
88+
OnionBlob: onionMsg.OnionBlob,
89+
})
90+
if err != nil {
91+
log.ErrorS(ctx, "Failed to send onion message update", err)
92+
return false
93+
}
94+
95+
return true
96+
}

peer/brontide.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/lightningnetwork/lnd/lnwire"
4848
"github.com/lightningnetwork/lnd/msgmux"
4949
"github.com/lightningnetwork/lnd/netann"
50+
"github.com/lightningnetwork/lnd/onionmessage"
5051
"github.com/lightningnetwork/lnd/pool"
5152
"github.com/lightningnetwork/lnd/protofsm"
5253
"github.com/lightningnetwork/lnd/queue"
@@ -463,6 +464,10 @@ type Config struct {
463464
// related wire messages.
464465
AuxChannelNegotiator fn.Option[lnwallet.AuxChannelNegotiator]
465466

467+
// OnionMessageServer is an instance of a message server that dispatches
468+
// onion messages to subscribers.
469+
OnionMessageServer *subscribe.Server
470+
466471
// ShouldFwdExpEndorsement is a closure that indicates whether
467472
// experimental endorsement signals should be set.
468473
ShouldFwdExpEndorsement func() bool
@@ -898,6 +903,21 @@ func (p *Brontide) Start() error {
898903
return fmt.Errorf("unable to load channels: %w", err)
899904
}
900905

906+
onionMessageEndpoint := onionmessage.NewOnionEndpoint(
907+
p.cfg.OnionMessageServer,
908+
)
909+
910+
// We register the onion message endpoint with the message router.
911+
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
912+
_ = r.UnregisterEndpoint(onionMessageEndpoint.Name())
913+
914+
return r.RegisterEndpoint(onionMessageEndpoint)
915+
})
916+
if err != nil {
917+
return fmt.Errorf("unable to register endpoint for onion "+
918+
"messaging: %w", err)
919+
}
920+
901921
p.startTime = time.Now()
902922

903923
// Before launching the writeHandler goroutine, we send any channel

0 commit comments

Comments
 (0)