Skip to content

Commit 772b40e

Browse files
authored
Merge pull request #11 from hyle-team/dev/subscriber
Dev/subscriber-submodule
2 parents 33224d5 + a8bc804 commit 772b40e

File tree

10 files changed

+299
-8
lines changed

10 files changed

+299
-8
lines changed

cmd/root.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package cmd
22

33
import (
4-
"os"
5-
64
"github.com/hyle-team/tss-svc/cmd/helpers"
75
"github.com/hyle-team/tss-svc/cmd/service"
6+
"os"
87

98
"github.com/spf13/cobra"
109
)

cmd/service/run/sign.go

+12
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/hyle-team/tss-svc/internal/bridge/withdrawal"
1717
"github.com/hyle-team/tss-svc/internal/config"
1818
core "github.com/hyle-team/tss-svc/internal/core/connector"
19+
"github.com/hyle-team/tss-svc/internal/core/subscriber"
1920
pg "github.com/hyle-team/tss-svc/internal/db/postgres"
2021
"github.com/hyle-team/tss-svc/internal/p2p"
2122
"github.com/hyle-team/tss-svc/internal/secrets/vault"
@@ -67,6 +68,7 @@ func runSigningService(ctx context.Context, cfg config.Config, wg *sync.WaitGrou
6768

6869
db := pg.NewDepositsQ(cfg.DB())
6970
connector := core.NewConnector(*account, cfg.CoreConnectorConfig().Connection, cfg.CoreConnectorConfig().Settings)
71+
sub := subscriber.NewSubmitSubscriber(db, cfg.TendermintHttpClient(), logger)
7072
fetcher := bridge.NewDepositFetcher(clientsRepo, connector)
7173
srv := api.NewServer(
7274
cfg.ApiGrpcListener(),
@@ -174,6 +176,16 @@ func runSigningService(ctx context.Context, cfg config.Config, wg *sync.WaitGrou
174176
depositAcceptorSession.Run(ctx)
175177
}()
176178

179+
// Core deposit subscriber spin-up
180+
wg.Add(1)
181+
go func() {
182+
defer wg.Done()
183+
184+
if err := sub.Run(ctx); err != nil {
185+
logger.WithError(err).Error("failed to run Core event subscriber")
186+
}
187+
}()
188+
177189
// p2p server spin-up
178190
wg.Add(1)
179191
go func() {

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
github.com/pkg/errors v0.9.1
3232
github.com/rubenv/sql-migrate v1.7.0
3333
github.com/spf13/cobra v1.8.1
34+
github.com/tendermint/tendermint v0.34.28
3435
gitlab.com/distributed_lab/ape v1.7.2
3536
gitlab.com/distributed_lab/figure/v3 v3.1.4
3637
gitlab.com/distributed_lab/kit v1.11.3
@@ -159,7 +160,6 @@ require (
159160
github.com/subosito/gotenv v1.6.0 // indirect
160161
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
161162
github.com/tendermint/go-amino v0.16.0 // indirect
162-
github.com/tendermint/tendermint v0.34.28 // indirect
163163
github.com/tendermint/tm-db v0.6.7 // indirect
164164
github.com/tidwall/btree v1.6.0 // indirect
165165
github.com/tidwall/gjson v1.14.4 // indirect

go.sum

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
1+
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
22
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
33
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
44
cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7pg=

internal/config/config.example.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,7 @@ core_connector:
5656
settings:
5757
chain_id: "00000"
5858
denom: "denom"
59-
min_gas_price: 0
59+
min_gas_price: 0
60+
61+
subscriber:
62+
addr: "tcp"

internal/config/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"github.com/hyle-team/tss-svc/internal/bridge/chains"
55
connector "github.com/hyle-team/tss-svc/internal/core/connector/config"
6+
subscriber "github.com/hyle-team/tss-svc/internal/core/subscriber/config"
67
p2p "github.com/hyle-team/tss-svc/internal/p2p/config"
78
vaulter "github.com/hyle-team/tss-svc/internal/secrets/vault/config"
89
tss "github.com/hyle-team/tss-svc/internal/tss/config"
@@ -20,6 +21,7 @@ type Config interface {
2021
tss.SessionParamsConfigurator
2122
chains.Chainer
2223
connector.ConnectorConfigurer
24+
subscriber.SubscriberConfigurator
2325
}
2426

2527
type config struct {
@@ -33,6 +35,7 @@ type config struct {
3335
tss.SessionParamsConfigurator
3436
chains.Chainer
3537
connector.ConnectorConfigurer
38+
subscriber.SubscriberConfigurator
3639
}
3740

3841
func New(getter kv.Getter) Config {
@@ -46,5 +49,6 @@ func New(getter kv.Getter) Config {
4649
SessionParamsConfigurator: tss.NewSessionParamsConfigurator(getter),
4750
Chainer: chains.NewChainer(getter),
4851
ConnectorConfigurer: connector.NewConnectorConfigurer(getter),
52+
SubscriberConfigurator: subscriber.NewSubscriberConfigurator(getter),
4953
}
5054
}
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package config
2+
3+
import (
4+
"github.com/tendermint/tendermint/rpc/client/http"
5+
"gitlab.com/distributed_lab/figure/v3"
6+
"gitlab.com/distributed_lab/kit/comfig"
7+
"gitlab.com/distributed_lab/kit/kv"
8+
)
9+
10+
const subscriberConfigKey = "subscriber"
11+
12+
type SubscriberConfigurator interface {
13+
TendermintHttpClient() *http.HTTP
14+
}
15+
16+
type subscriber struct {
17+
once comfig.Once
18+
getter kv.Getter
19+
}
20+
21+
func NewSubscriberConfigurator(getter kv.Getter) SubscriberConfigurator {
22+
return &subscriber{
23+
getter: getter,
24+
}
25+
}
26+
27+
func (sc *subscriber) TendermintHttpClient() *http.HTTP {
28+
return sc.once.Do(func() interface{} {
29+
var config struct {
30+
Addr string `fig:"addr"`
31+
}
32+
33+
if err := figure.Out(&config).From(kv.MustGetStringMap(sc.getter, subscriberConfigKey)).Please(); err != nil {
34+
panic(err)
35+
}
36+
37+
client, err := http.New(config.Addr, "/websocket")
38+
if err != nil {
39+
panic(err)
40+
}
41+
42+
if err = client.Start(); err != nil {
43+
panic(err)
44+
}
45+
46+
return client
47+
}).(*http.HTTP)
48+
}

internal/core/subscriber/main.go

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package subscriber
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"strings"
8+
9+
bridgeTypes "github.com/hyle-team/bridgeless-core/v12/x/bridge/types"
10+
database "github.com/hyle-team/tss-svc/internal/db"
11+
"github.com/hyle-team/tss-svc/internal/types"
12+
"github.com/pkg/errors"
13+
"github.com/tendermint/tendermint/rpc/client/http"
14+
coretypes "github.com/tendermint/tendermint/rpc/core/types"
15+
"gitlab.com/distributed_lab/logan/v3"
16+
)
17+
18+
const (
19+
OpServiceName = "op-subscriber"
20+
OpPoolSize = 50
21+
22+
OpQuerySubmit = "tm.event='Tx' AND message.action='/core.bridge.MsgSubmitTransactions'"
23+
)
24+
25+
type Subscriber struct {
26+
db database.DepositsQ
27+
client *http.HTTP
28+
query string
29+
log *logan.Entry
30+
}
31+
32+
func NewSubmitSubscriber(db database.DepositsQ, client *http.HTTP, logger *logan.Entry) *Subscriber {
33+
return &Subscriber{
34+
db: db,
35+
client: client,
36+
query: OpQuerySubmit,
37+
log: logger,
38+
}
39+
}
40+
41+
func (s *Subscriber) Run(ctx context.Context) error {
42+
out, err := s.client.Subscribe(ctx, OpServiceName, s.query, OpPoolSize)
43+
if err != nil {
44+
return errors.Wrap(err, "subscriber init failed")
45+
}
46+
47+
go s.run(ctx, out)
48+
49+
return nil
50+
}
51+
52+
func (s *Subscriber) run(ctx context.Context, out <-chan coretypes.ResultEvent) {
53+
for {
54+
select {
55+
case <-ctx.Done():
56+
if err := s.client.Unsubscribe(ctx, OpServiceName, s.query); err != nil {
57+
s.log.WithError(err).Error("failed to unsubscribe from new operations")
58+
}
59+
60+
s.log.Info("context finished")
61+
return
62+
case c, ok := <-out:
63+
if !ok {
64+
s.log.Warn("chanel closed, stopping receiving messages")
65+
return
66+
}
67+
68+
deposit, err := parseSubmittedDeposit(c.Events)
69+
if err != nil {
70+
s.log.WithError(err).Error("failed to parse submitted deposit")
71+
continue
72+
}
73+
74+
tx, err := s.db.Get(deposit.DepositIdentifier)
75+
if err != nil {
76+
s.log.WithError(err).Error("failed to get deposit")
77+
continue
78+
}
79+
80+
// if deposit does not exist in db insert it
81+
if tx == nil {
82+
s.log.Info("found new submitted deposit")
83+
if _, err = s.db.InsertProcessedDeposit(*deposit); err != nil {
84+
s.log.WithError(err).Error("failed to insert new deposit")
85+
}
86+
continue
87+
}
88+
89+
// if deposit exists and pending or processing update signature,withdrawal tx hash and status
90+
switch tx.WithdrawalStatus {
91+
case types.WithdrawalStatus_WITHDRAWAL_STATUS_PROCESSED:
92+
s.log.Info("skipping processed deposit")
93+
case types.WithdrawalStatus_WITHDRAWAL_STATUS_PROCESSING:
94+
s.log.Info("found existing deposit submitted to core")
95+
if err = s.db.UpdateWithdrawalDetails(tx.DepositIdentifier, deposit.WithdrawalTxHash, deposit.Signature); err != nil {
96+
s.log.WithError(err).Error("failed to update deposit withdrawal details")
97+
}
98+
case types.WithdrawalStatus_WITHDRAWAL_STATUS_PENDING:
99+
s.log.Info("found submitted pending deposit")
100+
if err = s.db.UpdateWithdrawalDetails(tx.DepositIdentifier, deposit.WithdrawalTxHash, deposit.Signature); err != nil {
101+
s.log.WithError(err).Error("failed to update deposit withdrawal details")
102+
}
103+
default:
104+
s.log.Infof("nothing to do with deposit status %s", tx.WithdrawalStatus)
105+
}
106+
}
107+
}
108+
}
109+
110+
func parseSubmittedDeposit(attributes map[string][]string) (*database.Deposit, error) {
111+
deposit := &database.Deposit{}
112+
for keys, attribute := range attributes {
113+
114+
parts := strings.SplitN(keys, ".", 2)
115+
if parts[0] != bridgeTypes.EventType_DEPOSIT_SUBMITTED.String() {
116+
continue
117+
}
118+
119+
switch parts[1] {
120+
case bridgeTypes.AttributeKeyDepositTxHash:
121+
deposit.TxHash = attribute[0]
122+
case bridgeTypes.AttributeKeyDepositNonce:
123+
n, err := strconv.Atoi(attribute[0])
124+
if err != nil {
125+
return nil, errors.Wrap(errors.New(fmt.Sprintf("got invalid nonce, got %s", attribute)), "invalid nonce")
126+
}
127+
deposit.TxNonce = n
128+
case bridgeTypes.AttributeKeyDepositChainId:
129+
deposit.ChainId = attribute[0]
130+
case bridgeTypes.AttributeKeyDepositAmount:
131+
deposit.DepositAmount = &attribute[0]
132+
case bridgeTypes.AttributeKeyDepositToken:
133+
deposit.DepositToken = &attribute[0]
134+
case bridgeTypes.AttributeKeyDepositBlock:
135+
b, err := strconv.ParseInt(attribute[0], 10, 64)
136+
if err != nil {
137+
return nil, errors.Wrap(err, "failed to parse deposit block")
138+
}
139+
deposit.DepositBlock = &b
140+
case bridgeTypes.AttributeKeyWithdrawalAmount:
141+
deposit.WithdrawalAmount = &attribute[0]
142+
case bridgeTypes.AttributeKeyDepositor:
143+
deposit.Depositor = &attribute[0]
144+
case bridgeTypes.AttributeKeyReceiver:
145+
deposit.Receiver = &attribute[0]
146+
case bridgeTypes.AttributeKeyWithdrawalChainID:
147+
deposit.WithdrawalChainId = &attribute[0]
148+
case bridgeTypes.AttributeKeyWithdrawalTxHash:
149+
if attribute[0] != "" {
150+
deposit.WithdrawalTxHash = &attribute[0]
151+
}
152+
case bridgeTypes.AttributeKeyWithdrawalToken:
153+
deposit.WithdrawalToken = &attribute[0]
154+
case bridgeTypes.AttributeKeySignature:
155+
if attribute[0] != "" {
156+
deposit.Signature = &attribute[0]
157+
}
158+
case bridgeTypes.AttributeKeyIsWrapped:
159+
isWrapped, err := strconv.ParseBool(attribute[0])
160+
if err != nil {
161+
return nil, errors.Wrap(err, "failed to parse isWrapped attribute")
162+
}
163+
deposit.IsWrappedToken = &isWrapped
164+
default:
165+
166+
return nil, errors.Wrap(errors.New(fmt.Sprintf("unknown attribute key: %s", parts[1])), "failed to parse attribute")
167+
}
168+
}
169+
170+
return deposit, nil
171+
}

internal/db/deposits.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ type DepositsQ interface {
2929
GetWithSelector(selector DepositsSelector) (*Deposit, error)
3030

3131
Exists(check DepositExistenceCheck) (bool, error)
32-
32+
UpdateWithdrawalDetails(identifier DepositIdentifier, hash *string, signature *string) error
3333
UpdateWithdrawalTx(DepositIdentifier, string) error
3434
UpdateSignature(DepositIdentifier, string) error
3535
UpdateStatus(DepositIdentifier, types.WithdrawalStatus) error
36+
InsertProcessedDeposit(deposit Deposit) (int64, error)
3637

3738
Transaction(f func() error) error
3839
}

0 commit comments

Comments
 (0)