Skip to content

Commit 735fe42

Browse files
committed
rfq: add support for quote policy persistence and restoration
1 parent 2b3ac4f commit 735fe42

File tree

4 files changed

+188
-8
lines changed

4 files changed

+188
-8
lines changed

rfq/manager.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ type ManagerCfg struct {
118118
// helps us communicate custom feature bits with our peer.
119119
AuxChanNegotiator *tapfeatures.AuxChannelNegotiator
120120

121+
// PolicyStore provides persistence for agreed RFQ policies.
122+
PolicyStore PolicyStore
123+
121124
// AcceptPriceDeviationPpm is the price deviation in
122125
// parts per million that is accepted by the RFQ negotiator.
123126
//
@@ -264,12 +267,18 @@ func (m *Manager) startSubsystems(ctx context.Context) error {
264267
SpecifierChecker: m.AssetMatchesSpecifier,
265268
NoOpHTLCs: m.cfg.NoOpHTLCs,
266269
AuxChanNegotiator: m.cfg.AuxChanNegotiator,
270+
PolicyStore: m.cfg.PolicyStore,
267271
})
268272
if err != nil {
269273
return fmt.Errorf("error initializing RFQ order handler: %w",
270274
err)
271275
}
272276

277+
if err := m.restorePersistedPolicies(ctx); err != nil {
278+
return fmt.Errorf("error restoring persisted RFQ "+
279+
"policies: %w", err)
280+
}
281+
273282
if err := m.orderHandler.Start(); err != nil {
274283
return fmt.Errorf("unable to start RFQ order handler: %w", err)
275284
}
@@ -313,6 +322,29 @@ func (m *Manager) startSubsystems(ctx context.Context) error {
313322
return err
314323
}
315324

325+
// restorePersistedPolicies loads persisted RFQ policies from the store
326+
// and rehydrates the order handler and local cache state.
327+
func (m *Manager) restorePersistedPolicies(ctx context.Context) error {
328+
buyAccepts, sellAccepts, err := m.cfg.PolicyStore.FetchActivePolicies(
329+
ctx,
330+
)
331+
if err != nil {
332+
return fmt.Errorf("error fetching persisted policies: %w", err)
333+
}
334+
335+
m.orderHandler.restorePersistedPolicies(buyAccepts, sellAccepts)
336+
337+
for _, accept := range buyAccepts {
338+
m.localAcceptedBuyQuotes.Store(accept.ShortChannelId(), accept)
339+
}
340+
341+
for _, accept := range sellAccepts {
342+
m.localAcceptedSellQuotes.Store(accept.ShortChannelId(), accept)
343+
}
344+
345+
return nil
346+
}
347+
316348
// handleError logs an error and sends it to the main server error channel if
317349
// it is a critical error.
318350
func (m *Manager) handleError(err error) {

rfq/manager_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"context"
55
"encoding/binary"
66
"testing"
7+
"time"
78

89
"github.com/btcsuite/btcd/btcec/v2"
910
"github.com/decred/dcrd/dcrec/secp256k1/v4"
1011
"github.com/lightninglabs/lndclient"
1112
"github.com/lightninglabs/taproot-assets/address"
1213
"github.com/lightninglabs/taproot-assets/asset"
1314
"github.com/lightninglabs/taproot-assets/proof"
15+
"github.com/lightninglabs/taproot-assets/rfqmsg"
1416
tpchmsg "github.com/lightninglabs/taproot-assets/tapchannelmsg"
1517
"github.com/lightningnetwork/lnd/lnwallet"
1618
"github.com/lightningnetwork/lnd/routing/route"
@@ -59,6 +61,32 @@ var (
5961
peer2 = route.Vertex{77}
6062
)
6163

64+
type nopPolicyStore struct{}
65+
66+
func (nopPolicyStore) StoreSalePolicy(context.Context,
67+
rfqmsg.BuyAccept) error {
68+
69+
return nil
70+
}
71+
72+
func (nopPolicyStore) StorePurchasePolicy(context.Context,
73+
rfqmsg.SellAccept) error {
74+
75+
return nil
76+
}
77+
78+
func (nopPolicyStore) FetchActivePolicies(context.Context) (
79+
[]rfqmsg.BuyAccept, []rfqmsg.SellAccept, error) {
80+
81+
return nil, nil, nil
82+
}
83+
84+
func (nopPolicyStore) DeactivatePolicy(context.Context,
85+
rfqmsg.ID, time.Time) error {
86+
87+
return nil
88+
}
89+
6290
// GroupLookupMock mocks the GroupLookup interface that is required by the
6391
// rfq manager to check asset IDs against asset specifiers.
6492
type GroupLookupMock struct{}
@@ -141,6 +169,7 @@ func assertComputeChannelAssetBalance(t *testing.T,
141169
mockGroupLookup := &GroupLookupMock{}
142170
cfg := ManagerCfg{
143171
GroupLookup: mockGroupLookup,
172+
PolicyStore: nopPolicyStore{},
144173
}
145174
manager, err := NewManager(cfg)
146175
require.NoError(t, err)

rfq/order.go

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ type Policy interface {
8888
GenerateInterceptorResponse(
8989
lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse,
9090
error)
91+
92+
// QuoteID returns the RFQ ID if it exists, otherwise false.
93+
QuoteID() (rfqmsg.ID, bool)
9194
}
9295

9396
// AssetSalePolicy is a struct that holds the terms which determine whether an
@@ -325,6 +328,11 @@ func (c *AssetSalePolicy) GenerateInterceptorResponse(
325328
}, nil
326329
}
327330

331+
// QuoteID returns the RFQ identifier that originated this policy.
332+
func (c *AssetSalePolicy) QuoteID() (rfqmsg.ID, bool) {
333+
return c.AcceptedQuoteId, true
334+
}
335+
328336
// Ensure that AssetSalePolicy implements the Policy interface.
329337
var _ Policy = (*AssetSalePolicy)(nil)
330338

@@ -537,6 +545,11 @@ func (c *AssetPurchasePolicy) GenerateInterceptorResponse(
537545
}, nil
538546
}
539547

548+
// QuoteID returns the RFQ identifier that originated this policy.
549+
func (c *AssetPurchasePolicy) QuoteID() (rfqmsg.ID, bool) {
550+
return c.AcceptedQuoteId, true
551+
}
552+
540553
// Ensure that AssetPurchasePolicy implements the Policy interface.
541554
var _ Policy = (*AssetPurchasePolicy)(nil)
542555

@@ -674,6 +687,13 @@ func (a *AssetForwardPolicy) GenerateInterceptorResponse(
674687
}, nil
675688
}
676689

690+
// QuoteID returns the RFQ identifier that originated this policy.
691+
//
692+
// Forward policies do not map to a single quote, hence we return false.
693+
func (a *AssetForwardPolicy) QuoteID() (rfqmsg.ID, bool) {
694+
return rfqmsg.ID{}, false
695+
}
696+
677697
// Ensure that AssetForwardPolicy implements the Policy interface.
678698
var _ Policy = (*AssetForwardPolicy)(nil)
679699

@@ -707,6 +727,9 @@ type OrderHandlerCfg struct {
707727
// that is encapsulated in the init and reestablish peer messages. This
708728
// helps us communicate custom feature bits with our peer.
709729
AuxChanNegotiator *tapfeatures.AuxChannelNegotiator
730+
731+
// PolicyStore persists agreed RFQ policies.
732+
PolicyStore PolicyStore
710733
}
711734

712735
// OrderHandler orchestrates management of accepted quote bundles. It monitors
@@ -719,6 +742,9 @@ type OrderHandler struct {
719742
// cfg holds the configuration parameters for the RFQ order handler.
720743
cfg OrderHandlerCfg
721744

745+
// policyStore provides persistence for agreed policies.
746+
policyStore PolicyStore
747+
722748
// policies is a map of serialised short channel IDs (SCIDs) to
723749
// associated asset transaction policies.
724750
policies lnutils.SyncMap[SerialisedScid, Policy]
@@ -736,8 +762,9 @@ type OrderHandler struct {
736762
// NewOrderHandler creates a new struct instance.
737763
func NewOrderHandler(cfg OrderHandlerCfg) (*OrderHandler, error) {
738764
return &OrderHandler{
739-
cfg: cfg,
740-
policies: lnutils.SyncMap[SerialisedScid, Policy]{},
765+
cfg: cfg,
766+
policyStore: cfg.PolicyStore,
767+
policies: lnutils.SyncMap[SerialisedScid, Policy]{},
741768
ContextGuard: &fn.ContextGuard{
742769
DefaultTimeout: DefaultTimeout,
743770
Quit: make(chan struct{}),
@@ -965,6 +992,11 @@ func (h *OrderHandler) RegisterAssetSalePolicy(buyAccept rfqmsg.BuyAccept) {
965992
buyAccept, h.cfg.NoOpHTLCs, h.cfg.AuxChanNegotiator,
966993
)
967994

995+
if err := h.storeSalePolicy(buyAccept); err != nil {
996+
log.Errorf("Unable to persist asset sale policy (id=%x): %v",
997+
buyAccept.ID[:], err)
998+
}
999+
9681000
h.policies.Store(policy.AcceptedQuoteId.Scid(), policy)
9691001
}
9701002

@@ -978,9 +1010,72 @@ func (h *OrderHandler) RegisterAssetPurchasePolicy(
9781010
"sell accept message: %s", sellAccept.String())
9791011

9801012
policy := NewAssetPurchasePolicy(sellAccept)
1013+
1014+
if err := h.storePurchasePolicy(sellAccept); err != nil {
1015+
log.Errorf("Unable to persist asset purchase policy "+
1016+
"(id=%x): %v", sellAccept.ID[:], err)
1017+
}
1018+
9811019
h.policies.Store(policy.scid, policy)
9821020
}
9831021

1022+
func (h *OrderHandler) storeSalePolicy(buyAccept rfqmsg.BuyAccept) error {
1023+
ctx, cancel := h.WithCtxQuit()
1024+
defer cancel()
1025+
1026+
return h.policyStore.StoreSalePolicy(ctx, buyAccept)
1027+
}
1028+
1029+
func (h *OrderHandler) storePurchasePolicy(
1030+
sellAccept rfqmsg.SellAccept) error {
1031+
1032+
ctx, cancel := h.WithCtxQuit()
1033+
defer cancel()
1034+
1035+
return h.policyStore.StorePurchasePolicy(ctx, sellAccept)
1036+
}
1037+
1038+
func (h *OrderHandler) restorePersistedPolicies(buyAccepts []rfqmsg.BuyAccept,
1039+
sellAccepts []rfqmsg.SellAccept) {
1040+
1041+
for _, accept := range buyAccepts {
1042+
policy := NewAssetSalePolicy(
1043+
accept, h.cfg.NoOpHTLCs, h.cfg.AuxChanNegotiator,
1044+
)
1045+
h.policies.Store(policy.AcceptedQuoteId.Scid(), policy)
1046+
}
1047+
1048+
for _, accept := range sellAccepts {
1049+
policy := NewAssetPurchasePolicy(accept)
1050+
h.policies.Store(policy.scid, policy)
1051+
}
1052+
}
1053+
1054+
func (h *OrderHandler) removePolicy(scid SerialisedScid,
1055+
policy Policy) {
1056+
1057+
h.policies.Delete(scid)
1058+
h.deactivatePersistedPolicy(policy)
1059+
}
1060+
1061+
func (h *OrderHandler) deactivatePersistedPolicy(policy Policy) {
1062+
id, ok := policy.QuoteID()
1063+
if !ok {
1064+
return
1065+
}
1066+
1067+
ctx, cancel := h.WithCtxQuit()
1068+
defer cancel()
1069+
1070+
err := h.policyStore.DeactivatePolicy(
1071+
ctx, id, time.Now().UTC(),
1072+
)
1073+
if err != nil {
1074+
log.Errorf("Unable to deactivate persisted policy id=%x: %v",
1075+
id[:], err)
1076+
}
1077+
}
1078+
9841079
// fetchPolicy fetches a policy which is relevant to a given HTLC. If a policy
9851080
// is not found, false is returned. Expired policies are not returned and are
9861081
// removed from the cache.
@@ -1049,12 +1144,12 @@ func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy,
10491144
outgoingPolicy := outPolicy
10501145

10511146
if incomingPolicy.HasExpired() {
1052-
scid := incomingPolicy.Scid()
1053-
h.policies.Delete(SerialisedScid(scid))
1147+
inScid := SerialisedScid(incomingPolicy.Scid())
1148+
h.removePolicy(inScid, incomingPolicy)
10541149
}
10551150
if outgoingPolicy.HasExpired() {
1056-
scid := outgoingPolicy.Scid()
1057-
h.policies.Delete(SerialisedScid(scid))
1151+
outScid := SerialisedScid(outgoingPolicy.Scid())
1152+
h.removePolicy(outScid, outgoingPolicy)
10581153
}
10591154

10601155
// If either the incoming or outgoing policy has expired, we
@@ -1098,7 +1193,7 @@ func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy,
10981193
scid := *foundScid
10991194

11001195
if policy.HasExpired() {
1101-
h.policies.Delete(scid)
1196+
h.removePolicy(scid, policy)
11021197
return nil, false, nil
11031198
}
11041199

@@ -1114,7 +1209,7 @@ func (h *OrderHandler) cleanupStalePolicies() {
11141209
func(scid SerialisedScid, policy Policy) error {
11151210
if policy.HasExpired() {
11161211
staleCounter++
1117-
h.policies.Delete(scid)
1212+
h.removePolicy(scid, policy)
11181213
}
11191214

11201215
return nil

rfq/policystore.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package rfq
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/lightninglabs/taproot-assets/rfqmsg"
8+
)
9+
10+
// PolicyStore abstracts persistence of RFQ policies.
11+
type PolicyStore interface {
12+
// StoreSalePolicy stores an asset sale policy.
13+
StoreSalePolicy(ctx context.Context, accept rfqmsg.BuyAccept) error
14+
15+
// StorePurchasePolicy stores an asset purchase policy.
16+
StorePurchasePolicy(ctx context.Context, accept rfqmsg.SellAccept) error
17+
18+
// FetchActivePolicies fetches all active asset policies.
19+
FetchActivePolicies(ctx context.Context) ([]rfqmsg.BuyAccept,
20+
[]rfqmsg.SellAccept, error)
21+
22+
// DeactivatePolicy deactivates an asset sale or purchase policy.
23+
DeactivatePolicy(ctx context.Context, id rfqmsg.ID, at time.Time) error
24+
}

0 commit comments

Comments
 (0)