diff --git a/.gitignore b/.gitignore index 9d6de9394..9f511d382 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ config.yaml # Coverage coverage.* + +# OS +.DS_Store \ No newline at end of file diff --git a/cmd/parse/bridge/cmd.go b/cmd/parse/bridge/cmd.go new file mode 100644 index 000000000..fd1a35f6b --- /dev/null +++ b/cmd/parse/bridge/cmd.go @@ -0,0 +1,20 @@ +package bridge + +import ( + parsecmdtypes "github.com/forbole/juno/v6/cmd/parse/types" + "github.com/spf13/cobra" +) + +// NewBridgeCmd returns the Cobra command that allows to fix all the things related to the bridge module +func NewBridgeCmd(parseConfig *parsecmdtypes.Config) *cobra.Command { + cmd := &cobra.Command{ + Use: "bridge", + Short: "Fix things related to the bridge module", + } + + cmd.AddCommand( + txsCmd(parseConfig), + ) + + return cmd +} diff --git a/cmd/parse/bridge/txs.go b/cmd/parse/bridge/txs.go new file mode 100644 index 000000000..c1f0d36eb --- /dev/null +++ b/cmd/parse/bridge/txs.go @@ -0,0 +1,201 @@ +package bridge + +import ( + "encoding/hex" + "fmt" + "sort" + "time" + + tmctypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/forbole/callisto/v4/database" + "github.com/forbole/callisto/v4/modules/bridge" + modulestypes "github.com/forbole/callisto/v4/modules/types" + "github.com/forbole/callisto/v4/utils" + parsecmdtypes "github.com/forbole/juno/v6/cmd/parse/types" + "github.com/forbole/juno/v6/types/config" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" +) + +// txsCmd returns the Cobra command allowing re-scan bridge transactions +func txsCmd(parseConfig *parsecmdtypes.Config) *cobra.Command { + cmd := &cobra.Command{ + Use: "txs", + Short: "Parse all the bridge transactions and overwrite the existing ones", + RunE: func(cmd *cobra.Command, args []string) error { + parseCtx, err := parsecmdtypes.GetParserContext(config.Cfg, parseConfig) + if err != nil { + return err + } + + startHeight, err := cmd.Flags().GetInt64("start-height") + if err != nil { + return err + } + + endHeight, err := cmd.Flags().GetInt64("end-height") + if err != nil { + return err + } + + checkSize, err := cmd.Flags().GetInt64("check-size") + if err != nil { + return err + } + + if checkSize <= 0 { + checkSize = 1000 // Default check size + } + + // Log processing summary header + log.Info().Int64("start_height", startHeight).Int64("end_height", endHeight).Int64("check_size", checkSize).Msg("starting bridge transaction processing") + + bz, err := config.Cfg.GetBytes() + if err != nil { + return err + } + bridgeCfg, err := bridge.ParseConfig(bz) + if err != nil { + return err + } + + sources, err := modulestypes.BuildSources(config.Cfg.Node, utils.GetCodec()) + if err != nil { + return err + } + + // Get the database + db := database.Cast(parseCtx.Database) + + // Build bridge module + bridgeModule := bridge.NewModule(config.Cfg, sources.BridgeSource, utils.GetCodec(), db) + + // Track overall statistics + totalTxsProcessed := 0 + totalErrorsEncountered := 0 + totalBlocksInserted := 0 + + // Split the height range into chunks + for rangeStart := startHeight; rangeStart < endHeight; rangeStart += checkSize { + rangeEnd := rangeStart + checkSize + if rangeEnd > endHeight { + rangeEnd = endHeight + } + + rangeStartTime := time.Now() + + log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd). + Msg("processing height range") + + // Track statistics for this range + rangeErrorCount := 0 + rangeBlocksInserted := 0 + rangeMessagesProcessed := 0 + + // Collect all the transactions for this range + var txs []*tmctypes.ResultTx + + // Get all the MsgSendToXrpl txs + query := fmt.Sprintf( + "tx.height >= %d AND tx.height < %d AND wasm._contract_address='%s' AND wasm.action='send_to_xrpl'", + rangeStart, + rangeEnd, + bridgeCfg.ContractAddress, + ) + sendToXrplTxs, err := utils.QueryTxs(parseCtx.Node, query) + if err != nil { + return err + } + txs = append(txs, sendToXrplTxs...) + + // Get all the MsgSaveEvidence txs + query = fmt.Sprintf( + "tx.height >= %d AND tx.height < %d AND wasm._contract_address='%s' AND wasm.action='save_evidence'", + rangeStart, + rangeEnd, + bridgeCfg.ContractAddress, + ) + saveEvidenceTxs, err := utils.QueryTxs(parseCtx.Node, query) + if err != nil { + return err + } + txs = append(txs, saveEvidenceTxs...) + + log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd). + Int("tx_count", len(txs)).Msg("found transactions in range") + + // Sort the txs based on their ascending height + sort.Slice(txs, func(i, j int) bool { + return txs[i].Height < txs[j].Height + }) + + for _, tx := range txs { + log.Debug().Int64("height", tx.Height).Msg("parsing transaction") + + transaction, err := parseCtx.Node.Tx(hex.EncodeToString(tx.Tx.Hash())) + if err != nil { + return err + } + + for index := range transaction.Body.Messages { + rangeMessagesProcessed++ + err = bridgeModule.HandleMsg(index, transaction.Body.Messages[index], transaction) + if err != nil { + rangeErrorCount++ + txHash := hex.EncodeToString(tx.Tx.Hash()) + + log.Error().Int64("height", tx.Height).Int("msg_index", index). + Str("tx_hash", txHash). + Err(err).Msg("error while handling bridge module message") + continue + } + } + } + + // Calculate range duration + rangeDuration := time.Since(rangeStartTime) + + // Update overall statistics + totalTxsProcessed += len(txs) + totalErrorsEncountered += rangeErrorCount + totalBlocksInserted += rangeBlocksInserted + + // Log summary for this range + if rangeMessagesProcessed > 0 { + successRate := float64(rangeMessagesProcessed-rangeErrorCount) / float64(rangeMessagesProcessed) * 100 + log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd). + Int("txs_found", len(txs)).Int("messages_processed", rangeMessagesProcessed). + Int("blocks_inserted", rangeBlocksInserted).Int("errors", rangeErrorCount). + Float64("success_rate", successRate). + Str("duration", rangeDuration.String()). + Msg("finished processing height range") + } else { + log.Info().Int64("range_start", rangeStart).Int64("range_end", rangeEnd). + Int("txs_found", len(txs)).Int("messages_processed", rangeMessagesProcessed). + Int("blocks_inserted", rangeBlocksInserted).Int("errors", rangeErrorCount). + Str("success_rate", "N/A"). + Str("duration", rangeDuration.String()). + Msg("finished processing height range") + } + } + + // Log final summary + rangeCount := (endHeight - startHeight + checkSize - 1) / checkSize + + log.Info(). + Int64("total_ranges_processed", rangeCount). + Int("total_txs_processed", totalTxsProcessed). + Int("total_blocks_inserted", totalBlocksInserted). + Int("total_errors_encountered", totalErrorsEncountered). + Msg("processing complete") + + return nil + }, + } + + cmd.Flags().Int64("start-height", 0, "Start height for filtering transactions (inclusive, 0 means no lower limit)") + cmd.Flags().Int64("end-height", 0, "End height for filtering transactions (exclusive, 0 means no upper limit)") + cmd.Flags().Int64("check-size", 100000, "Size of the height range to check (default: 100000)") + + return cmd +} diff --git a/cmd/parse/parse.go b/cmd/parse/parse.go index 869ef6904..896953548 100644 --- a/cmd/parse/parse.go +++ b/cmd/parse/parse.go @@ -3,6 +3,7 @@ package parse import ( parseauth "github.com/forbole/callisto/v4/cmd/parse/auth" parsebank "github.com/forbole/callisto/v4/cmd/parse/bank" + parsebridge "github.com/forbole/callisto/v4/cmd/parse/bridge" parsedex "github.com/forbole/callisto/v4/cmd/parse/dex" parsedistribution "github.com/forbole/callisto/v4/cmd/parse/distribution" parsefeegrant "github.com/forbole/callisto/v4/cmd/parse/feegrant" @@ -39,6 +40,7 @@ func NewParseCmd(parseCfg *parse.Config, parser messages.MessageAddressesParser) parsestaking.NewStakingCmd(parseCfg), parsetransaction.NewTransactionsCmd(parseCfg), parsedex.NewDexCmd(parseCfg), + parsebridge.NewBridgeCmd(parseCfg), ) return cmd diff --git a/go.mod b/go.mod index d7ae90f44..af6f95816 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( cosmossdk.io/x/feegrant v0.1.1 cosmossdk.io/x/nft v0.1.1 cosmossdk.io/x/upgrade v0.1.4 + github.com/CoreumFoundation/coreum-tools v0.4.1-0.20241202115740-dbc6962a4d0a github.com/CoreumFoundation/coreum/v5 v5.0.0-20250526103302-5a3f05b11008 github.com/CoreumFoundation/coreumbridge-xrpl/relayer v0.0.0-20250526113311-5229bda4c986 github.com/CosmWasm/wasmd v0.54.0 @@ -51,7 +52,6 @@ require ( github.com/Antonboom/errname v0.1.9 // indirect github.com/Antonboom/nilnil v0.1.3 // indirect github.com/BurntSushi/toml v1.4.0 // indirect - github.com/CoreumFoundation/coreum-tools v0.4.1-0.20241202115740-dbc6962a4d0a // indirect github.com/CosmWasm/wasmvm/v2 v2.2.2 // indirect github.com/DataDog/datadog-go v4.8.3+incompatible // indirect github.com/DataDog/zstd v1.5.6 // indirect diff --git a/utils/node.go b/utils/node.go index 059d5ce78..83c122b44 100644 --- a/utils/node.go +++ b/utils/node.go @@ -1,10 +1,14 @@ package utils import ( + "context" "fmt" + "time" coretypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/forbole/juno/v6/node" + + "github.com/CoreumFoundation/coreum-tools/pkg/retry" ) // QueryTxs queries all the transactions from the given node corresponding to the given query @@ -14,10 +18,28 @@ func QueryTxs(node node.Node, query string) ([]*coretypes.ResultTx, error) { page := 1 perPage := 100 stop := false + + ctx := context.Background() + retryDelay := 500 * time.Millisecond + doTimeout := 60 * time.Second + for !stop { - result, err := node.TxSearch(query, &page, &perPage, "") + var result *coretypes.ResultTxSearch + var searchErr error + + // Retry logic for each page + doCtx, doCtxCancel := context.WithTimeout(ctx, doTimeout) + err := retry.Do(doCtx, retryDelay, func() error { + result, searchErr = node.TxSearch(query, &page, &perPage, "") + if searchErr != nil { + return fmt.Errorf("error while running tx search: %s", searchErr) + } + return nil + }) + doCtxCancel() + if err != nil { - return nil, fmt.Errorf("error while running tx search: %s", err) + return nil, err } page++