Skip to content

Commit 99a62aa

Browse files
committed
multi: endpoints for onion messages
This commit creates the necessary endpoints for onion messages. Specifically, it adds the following: - `SendOnionMessage` endpoint to send onion messages. - `SubscribeOnionMessages` endpoint to subscribe to incoming onion messages. It uses the `msgmux` package to handle the onion messages.
1 parent 86db77c commit 99a62aa

File tree

9 files changed

+376
-0
lines changed

9 files changed

+376
-0
lines changed

itest/list_on_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,10 @@ var allTestCases = []*lntest.TestCase{
531531
Name: "custom message",
532532
TestFunc: testCustomMessage,
533533
},
534+
{
535+
Name: "onion message",
536+
TestFunc: testOnionMessage,
537+
},
534538
{
535539
Name: "sign verify message with addr",
536540
TestFunc: testSignVerifyMessageWithAddr,

itest/lnd_onion_message_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package itest
2+
3+
import (
4+
"time"
5+
6+
"github.com/btcsuite/btcd/btcec/v2"
7+
"github.com/lightningnetwork/lnd/lnrpc"
8+
"github.com/lightningnetwork/lnd/lntest"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
// testOnionMessage tests sending and receiving of the onion message type.
13+
func testOnionMessage(ht *lntest.HarnessTest) {
14+
alice := ht.NewNode("Alice", nil)
15+
bob := ht.NewNode("Bob", nil)
16+
17+
// Subscribe Alice to onion messages before we send any, so that we
18+
// don't miss any.
19+
msgClient, cancel := alice.RPC.SubscribeOnionMessages()
20+
defer cancel()
21+
22+
// Create a channel to receive onion messages on.
23+
messages := make(chan *lnrpc.OnionMessage)
24+
go func() {
25+
for {
26+
// If we fail to receive, just exit. The test should
27+
// fail elsewhere if it doesn't get a message that it
28+
// was expecting.
29+
msg, err := msgClient.Recv()
30+
if err != nil {
31+
return
32+
}
33+
34+
// Deliver the message into our channel or exit if the
35+
// test is shutting down.
36+
select {
37+
case messages <- msg:
38+
case <-ht.Context().Done():
39+
return
40+
}
41+
}
42+
}()
43+
44+
// Connect alice and bob so that they can exchange messages.
45+
ht.EnsureConnected(alice, bob)
46+
47+
// Create a random onion message.
48+
randomPriv, err := btcec.NewPrivateKey()
49+
require.NoError(ht.T, err)
50+
randomPub := randomPriv.PubKey()
51+
msgBlindingPoint := randomPub.SerializeCompressed()
52+
msgOnion := []byte{1, 2, 3}
53+
54+
// Send it from Bob to Alice.
55+
bobMsg := &lnrpc.SendOnionMessageRequest{
56+
Peer: alice.PubKey[:],
57+
BlindingPoint: msgBlindingPoint,
58+
Onion: msgOnion,
59+
}
60+
bob.RPC.SendOnionMessage(bobMsg)
61+
62+
// Wait for Alice to receive the message.
63+
select {
64+
case msg := <-messages:
65+
// Check our type and data and (sanity) check the peer we got
66+
// it from.
67+
require.Equal(ht, msgOnion, msg.Onion, "msg data wrong")
68+
require.Equal(ht, msgBlindingPoint, msg.BlindingPoint, "msg "+
69+
"blinding point wrong")
70+
require.Equal(ht, bob.PubKey[:], msg.Peer, "msg peer wrong")
71+
72+
case <-time.After(lntest.DefaultTimeout):
73+
ht.Fatalf("alice did not receive onion message: %v", bobMsg)
74+
}
75+
}

lntest/rpc/lnd.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,8 @@ func (h *HarnessRPC) SubscribeChannelEvents() ChannelEventsClient {
726726

727727
type CustomMessageClient lnrpc.Lightning_SubscribeCustomMessagesClient
728728

729+
type OnionMessageClient lnrpc.Lightning_SubscribeOnionMessagesClient
730+
729731
// SubscribeCustomMessages creates a subscription client for custom messages.
730732
func (h *HarnessRPC) SubscribeCustomMessages() (CustomMessageClient,
731733
context.CancelFunc) {
@@ -758,6 +760,38 @@ func (h *HarnessRPC) SendCustomMessage(
758760
return resp
759761
}
760762

763+
// SendOnionMessage makes a RPC call to the node's SendOnionMessage and
764+
// returns the response.
765+
func (h *HarnessRPC) SendOnionMessage(
766+
req *lnrpc.SendOnionMessageRequest) *lnrpc.SendOnionMessageResponse {
767+
768+
ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout)
769+
defer cancel()
770+
771+
resp, err := h.LN.SendOnionMessage(ctxt, req)
772+
h.NoError(err, "SendOnionMessage")
773+
774+
return resp
775+
}
776+
777+
// SubscribeOnionMessages creates a subscription client for onion messages.
778+
func (h *HarnessRPC) SubscribeOnionMessages() (OnionMessageClient,
779+
context.CancelFunc) {
780+
781+
ctxt, cancel := context.WithCancel(h.runCtx)
782+
783+
req := &lnrpc.SubscribeOnionMessagesRequest{}
784+
785+
// SubscribeCustomMessages needs to have the context alive for the
786+
// entire test case as the returned client will be used for send and
787+
// receive events stream. Thus we use runCtx here instead of a timeout
788+
// context.
789+
stream, err := h.LN.SubscribeOnionMessages(ctxt, req)
790+
h.NoError(err, "SubscribeOnionMessages")
791+
792+
return stream, cancel
793+
}
794+
761795
// GetChanInfo makes a RPC call to the node's GetChanInfo and returns the
762796
// response.
763797
func (h *HarnessRPC) GetChanInfo(

log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/lightningnetwork/lnd/monitoring"
4747
"github.com/lightningnetwork/lnd/msgmux"
4848
"github.com/lightningnetwork/lnd/netann"
49+
"github.com/lightningnetwork/lnd/onion_message"
4950
paymentsdb "github.com/lightningnetwork/lnd/payments/db"
5051
"github.com/lightningnetwork/lnd/peer"
5152
"github.com/lightningnetwork/lnd/peernotifier"
@@ -212,6 +213,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
212213
root, paymentsdb.Subsystem, interceptor, paymentsdb.UseLogger,
213214
)
214215

216+
AddSubLogger(root, onion_message.Subsystem, interceptor, onion_message.UseLogger)
215217
}
216218

217219
// AddSubLogger is a helper method to conveniently create and register the

onion_message/log.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package onion_message
2+
3+
import (
4+
"github.com/btcsuite/btclog/v2"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the logging code for this subsystem.
9+
const Subsystem = "OMSG"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// DisableLog disables all library log output. Logging output is disabled
22+
// by default until UseLogger is called.
23+
func DisableLog() {
24+
UseLogger(btclog.Disabled)
25+
}
26+
27+
// UseLogger uses a specified Logger to output package logging info.
28+
// This should be used in preference to SetLogWriter if the caller is also
29+
// using btclog.
30+
func UseLogger(logger btclog.Logger) {
31+
log = logger
32+
}

onion_message/onion_endpoint.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package onion_message
2+
3+
import (
4+
"context"
5+
6+
"github.com/lightningnetwork/lnd/lnwire"
7+
"github.com/lightningnetwork/lnd/msgmux"
8+
"github.com/lightningnetwork/lnd/subscribe"
9+
)
10+
11+
// OnionMessageUpdate is onion message update dispatched to any potential
12+
// subscriber.
13+
type OnionMessageUpdate struct {
14+
// Peer is the peer pubkey
15+
Peer [33]byte
16+
17+
// BlindingPoint is the route blinding ephemeral pubkey to be used for
18+
// the onion message.
19+
BlindingPoint []byte
20+
21+
// OnionBlob is the raw serialized mix header used to relay messages in
22+
// a privacy-preserving manner. This blob should be handled in the same
23+
// manner as onions used to route HTLCs, with the exception that it uses
24+
// blinded routes by default.
25+
OnionBlob []byte
26+
}
27+
28+
// OnionEndpoint handles incoming onion messages.
29+
type OnionEndpoint struct {
30+
// subscribe.Server is used for subscriptions to onion messages.
31+
onionMessageServer *subscribe.Server
32+
}
33+
34+
// A compile-time check to ensure OnionEndpoint implements the Endpoint
35+
// interface.
36+
var _ msgmux.Endpoint = (*OnionEndpoint)(nil)
37+
38+
// NewOnionEndpoint creates a new OnionEndpoint.
39+
func NewOnionEndpoint(messageServer *subscribe.Server) *OnionEndpoint {
40+
return &OnionEndpoint{
41+
onionMessageServer: messageServer,
42+
}
43+
}
44+
45+
// Name returns the unique name of the endpoint.
46+
func (o *OnionEndpoint) Name() string {
47+
return "OnionMessageHandler"
48+
}
49+
50+
// CanHandle checks if the endpoint can handle the incoming message.
51+
// It returns true if the message is an lnwire.OnionMessage.
52+
func (o *OnionEndpoint) CanHandle(msg msgmux.PeerMsg) bool {
53+
_, ok := msg.Message.(*lnwire.OnionMessage)
54+
return ok
55+
}
56+
57+
// SendMessage processes the incoming onion message.
58+
// It returns true if the message was successfully processed.
59+
func (o *OnionEndpoint) SendMessage(ctx context.Context,
60+
msg msgmux.PeerMsg) bool {
61+
62+
onionMsg, ok := msg.Message.(*lnwire.OnionMessage)
63+
if !ok {
64+
return false
65+
}
66+
67+
peer := msg.PeerPub.SerializeCompressed()
68+
log.Debugf("OnionEndpoint received OnionMessage from peer %s: "+
69+
"BlindingPoint=%v, OnionPacket[:10]=%x...", peer,
70+
onionMsg.BlindingPoint, onionMsg.OnionBlob)
71+
72+
var peerArr [33]byte
73+
copy(peerArr[:], peer)
74+
err := o.onionMessageServer.SendUpdate(&OnionMessageUpdate{
75+
Peer: peerArr,
76+
BlindingPoint: onionMsg.BlindingPoint.SerializeCompressed(),
77+
OnionBlob: onionMsg.OnionBlob,
78+
})
79+
if err != nil {
80+
log.Errorf("Failed to send onion message update: %v", err)
81+
return false
82+
}
83+
84+
return true
85+
}

peer/brontide.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/lightningnetwork/lnd/lnwire"
4646
"github.com/lightningnetwork/lnd/msgmux"
4747
"github.com/lightningnetwork/lnd/netann"
48+
"github.com/lightningnetwork/lnd/onion_message"
4849
"github.com/lightningnetwork/lnd/pool"
4950
"github.com/lightningnetwork/lnd/protofsm"
5051
"github.com/lightningnetwork/lnd/queue"
@@ -456,6 +457,10 @@ type Config struct {
456457
// used to modify the way the co-op close transaction is constructed.
457458
AuxChanCloser fn.Option[chancloser.AuxChanCloser]
458459

460+
// OnionMessageServer is an instance of a message server that dispatches
461+
// onion messages to subscribers.
462+
OnionMessageServer *subscribe.Server
463+
459464
// ShouldFwdExpEndorsement is a closure that indicates whether
460465
// experimental endorsement signals should be set.
461466
ShouldFwdExpEndorsement func() bool
@@ -891,6 +896,21 @@ func (p *Brontide) Start() error {
891896
return fmt.Errorf("unable to load channels: %w", err)
892897
}
893898

899+
onionMessageEndpoint := onion_message.NewOnionEndpoint(
900+
p.cfg.OnionMessageServer,
901+
)
902+
903+
// We register the onion message endpoint with the message router.
904+
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
905+
_ = r.UnregisterEndpoint(onionMessageEndpoint.Name())
906+
907+
return r.RegisterEndpoint(onionMessageEndpoint)
908+
})
909+
if err != nil {
910+
return fmt.Errorf("unable to register endpoint for onion "+
911+
"messaging: %w", err)
912+
}
913+
894914
p.startTime = time.Now()
895915

896916
// Before launching the writeHandler goroutine, we send any channel

0 commit comments

Comments
 (0)