Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ config.yaml

# Coverage
coverage.*

# OS
.DS_Store
20 changes: 20 additions & 0 deletions cmd/parse/bridge/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package bridge

import (
parsecmdtypes "github.com/forbole/juno/v6/cmd/parse/types"
"github.com/spf13/cobra"
)

// NewBridgeCmd returns the Cobra command that allows to fix all the things related to the bridge module
func NewBridgeCmd(parseConfig *parsecmdtypes.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "bridge",
Short: "Fix things related to the bridge module",
}

cmd.AddCommand(
txsCmd(parseConfig),
)

return cmd
}
201 changes: 201 additions & 0 deletions cmd/parse/bridge/txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package bridge

import (
"encoding/hex"
"fmt"
"sort"
"time"

tmctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/forbole/callisto/v4/database"
"github.com/forbole/callisto/v4/modules/bridge"
modulestypes "github.com/forbole/callisto/v4/modules/types"
"github.com/forbole/callisto/v4/utils"
parsecmdtypes "github.com/forbole/juno/v6/cmd/parse/types"
"github.com/forbole/juno/v6/types/config"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)

// txsCmd returns the Cobra command allowing re-scan bridge transactions
func txsCmd(parseConfig *parsecmdtypes.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "txs",
Short: "Parse all the bridge transactions and overwrite the existing ones",
RunE: func(cmd *cobra.Command, args []string) error {
parseCtx, err := parsecmdtypes.GetParserContext(config.Cfg, parseConfig)
if err != nil {
return err
}

startHeight, err := cmd.Flags().GetInt64("start-height")
if err != nil {
return err
}

endHeight, err := cmd.Flags().GetInt64("end-height")
if err != nil {
return err
}

checkSize, err := cmd.Flags().GetInt64("check-size")
if err != nil {
return err
}

if checkSize <= 0 {
checkSize = 1000 // Default check size
}

// Log processing summary header
log.Info().Int64("start_height", startHeight).Int64("end_height", endHeight).Int64("check_size", checkSize).Msg("starting bridge transaction processing")

bz, err := config.Cfg.GetBytes()
if err != nil {
return err
}
bridgeCfg, err := bridge.ParseConfig(bz)
if err != nil {
return err
}

sources, err := modulestypes.BuildSources(config.Cfg.Node, utils.GetCodec())
if err != nil {
return err
}

// Get the database
db := database.Cast(parseCtx.Database)

// Build bridge module
bridgeModule := bridge.NewModule(config.Cfg, sources.BridgeSource, utils.GetCodec(), db)

// Track overall statistics
totalTxsProcessed := 0
totalErrorsEncountered := 0
totalBlocksInserted := 0

// Split the height range into chunks
for rangeStart := startHeight; rangeStart < endHeight; rangeStart += checkSize {
rangeEnd := rangeStart + checkSize
if rangeEnd > endHeight {
rangeEnd = endHeight
}

rangeStartTime := time.Now()

log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd).
Msg("processing height range")

// Track statistics for this range
rangeErrorCount := 0
rangeBlocksInserted := 0
rangeMessagesProcessed := 0

// Collect all the transactions for this range
var txs []*tmctypes.ResultTx

// Get all the MsgSendToXrpl txs
query := fmt.Sprintf(
"tx.height >= %d AND tx.height < %d AND wasm._contract_address='%s' AND wasm.action='send_to_xrpl'",
rangeStart,
rangeEnd,
bridgeCfg.ContractAddress,
)
sendToXrplTxs, err := utils.QueryTxs(parseCtx.Node, query)
if err != nil {
return err
}
txs = append(txs, sendToXrplTxs...)

// Get all the MsgSaveEvidence txs
query = fmt.Sprintf(
"tx.height >= %d AND tx.height < %d AND wasm._contract_address='%s' AND wasm.action='save_evidence'",
rangeStart,
rangeEnd,
bridgeCfg.ContractAddress,
)
saveEvidenceTxs, err := utils.QueryTxs(parseCtx.Node, query)
if err != nil {
return err
}
txs = append(txs, saveEvidenceTxs...)

log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd).
Int("tx_count", len(txs)).Msg("found transactions in range")

// Sort the txs based on their ascending height
sort.Slice(txs, func(i, j int) bool {
return txs[i].Height < txs[j].Height
})

for _, tx := range txs {
log.Debug().Int64("height", tx.Height).Msg("parsing transaction")

transaction, err := parseCtx.Node.Tx(hex.EncodeToString(tx.Tx.Hash()))
if err != nil {
return err
}

for index := range transaction.Body.Messages {
rangeMessagesProcessed++
err = bridgeModule.HandleMsg(index, transaction.Body.Messages[index], transaction)
if err != nil {
rangeErrorCount++
txHash := hex.EncodeToString(tx.Tx.Hash())

log.Error().Int64("height", tx.Height).Int("msg_index", index).
Str("tx_hash", txHash).
Err(err).Msg("error while handling bridge module message")
continue
}
}
}

// Calculate range duration
rangeDuration := time.Since(rangeStartTime)

// Update overall statistics
totalTxsProcessed += len(txs)
totalErrorsEncountered += rangeErrorCount
totalBlocksInserted += rangeBlocksInserted

// Log summary for this range
if rangeMessagesProcessed > 0 {
successRate := float64(rangeMessagesProcessed-rangeErrorCount) / float64(rangeMessagesProcessed) * 100
log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd).
Int("txs_found", len(txs)).Int("messages_processed", rangeMessagesProcessed).
Int("blocks_inserted", rangeBlocksInserted).Int("errors", rangeErrorCount).
Float64("success_rate", successRate).
Str("duration", rangeDuration.String()).
Msg("finished processing height range")
} else {
log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd).
Int("txs_found", len(txs)).Int("messages_processed", rangeMessagesProcessed).
Int("blocks_inserted", rangeBlocksInserted).Int("errors", rangeErrorCount).
Str("success_rate", "N/A").
Str("duration", rangeDuration.String()).
Msg("finished processing height range")
}
}

// Log final summary
rangeCount := (endHeight - startHeight + checkSize - 1) / checkSize

log.Info().
Int64("total_ranges_processed", rangeCount).
Int("total_txs_processed", totalTxsProcessed).
Int("total_blocks_inserted", totalBlocksInserted).
Int("total_errors_encountered", totalErrorsEncountered).
Msg("processing complete")

return nil
},
}

cmd.Flags().Int64("start-height", 0, "Start height for filtering transactions (inclusive, 0 means no lower limit)")
cmd.Flags().Int64("end-height", 0, "End height for filtering transactions (exclusive, 0 means no upper limit)")
cmd.Flags().Int64("check-size", 100000, "Size of the height range to check (default: 100000)")

return cmd
}
2 changes: 2 additions & 0 deletions cmd/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parse
import (
parseauth "github.com/forbole/callisto/v4/cmd/parse/auth"
parsebank "github.com/forbole/callisto/v4/cmd/parse/bank"
parsebridge "github.com/forbole/callisto/v4/cmd/parse/bridge"
parsedex "github.com/forbole/callisto/v4/cmd/parse/dex"
parsedistribution "github.com/forbole/callisto/v4/cmd/parse/distribution"
parsefeegrant "github.com/forbole/callisto/v4/cmd/parse/feegrant"
Expand Down Expand Up @@ -39,6 +40,7 @@ func NewParseCmd(parseCfg *parse.Config, parser messages.MessageAddressesParser)
parsestaking.NewStakingCmd(parseCfg),
parsetransaction.NewTransactionsCmd(parseCfg),
parsedex.NewDexCmd(parseCfg),
parsebridge.NewBridgeCmd(parseCfg),
)

return cmd
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
cosmossdk.io/x/feegrant v0.1.1
cosmossdk.io/x/nft v0.1.1
cosmossdk.io/x/upgrade v0.1.4
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20241202115740-dbc6962a4d0a
github.com/CoreumFoundation/coreum/v5 v5.0.0-20250526103302-5a3f05b11008
github.com/CoreumFoundation/coreumbridge-xrpl/relayer v0.0.0-20250526113311-5229bda4c986
github.com/CosmWasm/wasmd v0.54.0
Expand Down Expand Up @@ -51,7 +52,6 @@ require (
github.com/Antonboom/errname v0.1.9 // indirect
github.com/Antonboom/nilnil v0.1.3 // indirect
github.com/BurntSushi/toml v1.4.0 // indirect
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20241202115740-dbc6962a4d0a // indirect
github.com/CosmWasm/wasmvm/v2 v2.2.2 // indirect
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
github.com/DataDog/zstd v1.5.6 // indirect
Expand Down
26 changes: 24 additions & 2 deletions utils/node.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package utils

import (
"context"
"fmt"
"time"

coretypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/forbole/juno/v6/node"

"github.com/CoreumFoundation/coreum-tools/pkg/retry"
)

// QueryTxs queries all the transactions from the given node corresponding to the given query
Expand All @@ -14,10 +18,28 @@ func QueryTxs(node node.Node, query string) ([]*coretypes.ResultTx, error) {
page := 1
perPage := 100
stop := false

ctx := context.Background()
retryDelay := 500 * time.Millisecond
doTimeout := 60 * time.Second

for !stop {
result, err := node.TxSearch(query, &page, &perPage, "")
var result *coretypes.ResultTxSearch
var searchErr error

// Retry logic for each page
doCtx, doCtxCancel := context.WithTimeout(ctx, doTimeout)
err := retry.Do(doCtx, retryDelay, func() error {
result, searchErr = node.TxSearch(query, &page, &perPage, "")
if searchErr != nil {
return fmt.Errorf("error while running tx search: %s", searchErr)
}
return nil
})
doCtxCancel()

if err != nil {
return nil, fmt.Errorf("error while running tx search: %s", err)
return nil, err
}

page++
Expand Down