Skip to content
This repository was archived by the owner on Jul 25, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
PEGGO_ENV="local"
PEGGO_LOG_LEVEL="debug"
PEGGO_SERVICE_WAIT_TIMEOUT="1m"
PEGGO_LOOP_DURATION=60s
PEGGO_BLOCKS_TO_SEARCH=2000
Comment on lines +4 to +5
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for the new configuration values.

The code should validate:

  1. PEGGO_LOOP_DURATION is a valid duration string (e.g., "60s", "1m").
  2. PEGGO_BLOCKS_TO_SEARCH is a positive number.

Would you like me to generate the validation code for these configuration values?


PEGGO_COSMOS_CHAIN_ID="injective-1"
PEGGO_COSMOS_GRPC="tcp://localhost:9900"
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Peggo is a Go implementation of the Peggy Orchestrator for the Injective Chain.

Important Commands:

* `peggo orchestrator` starts the orchestrator main loop.
* `peggo tx register-eth-key` is a special command to submit an Ethereum key that will be used to sign messages on behalf of your Validator
- `peggo orchestrator` starts the orchestrator main loop.
- `peggo tx register-eth-key` is a special command to submit an Ethereum key that will be used to sign messages on behalf of your Validator

## Installation

Expand Down Expand Up @@ -47,7 +47,7 @@ Commands:
tx Transactions for Peggy governance and maintenance.
version Print the version information and exit.

Run 'peggo COMMAND --help' for more information on a command.
Run 'peggo COMMAND --help' for more information on a command.
```

## Commands
Expand Down Expand Up @@ -89,6 +89,8 @@ Options:
--relay_pending_tx_wait_duration If set, relayer will broadcast pending batches/valsetupdate only after pendingTxWaitDuration has passed (env $PEGGO_RELAY_PENDING_TX_WAIT_DURATION) (default "20m")
--min_batch_fee_usd If set, batch request will create batches only if fee threshold exceeds (env $PEGGO_MIN_BATCH_FEE_USD) (default 23.3)
--coingecko_api Specify HTTP endpoint for coingecko api. (env $PEGGO_COINGECKO_API) (default "https://api.coingecko.com/api/v3")
--loop_duration Specify the main the loop duration (env $PEGGO_LOOP_DURATION) (default "60s")
--blocks_to_search Specify maximum block range for Ethereum event query (env $PEGGO_BLOCKS_TO_SEARCH) (default 2000)

```

Expand Down
25 changes: 24 additions & 1 deletion cmd/peggo/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import cli "github.com/jawher/mow.cli"
import (
cli "github.com/jawher/mow.cli"
)

// initGlobalOptions defines some global CLI options, that are useful for most parts of the app.
// Before adding option to there, consider moving it into the actual Cmd.
Expand Down Expand Up @@ -270,6 +272,9 @@ type Config struct {
minBatchFeeUSD *float64

coingeckoApi *string

loopDuration *string
numberOfBlocksToSearch *uint64
}

func initConfig(cmd *cli.Cmd) Config {
Expand Down Expand Up @@ -473,5 +478,23 @@ func initConfig(cmd *cli.Cmd) Config {
Value: "https://api.coingecko.com/api/v3",
})

/** Loop Duration **/
cfg.loopDuration = cmd.String(cli.StringOpt{
Name: "loop_duration",
Desc: "Specify the duration of the loop",
EnvVar: "PEGGO_LOOP_DURATION",
Value: "60s",
})

/** Default block to serach **/
numberOfBlocksToSearch := uint64(*cmd.Int(cli.IntOpt{
Name: "number_of_blocks_to_search",
Desc: "Maximum block range for Ethereum event query",
EnvVar: "PEGGO_BLOCKS_TO_SEARCH",
Value: 2000,
}))

cfg.numberOfBlocksToSearch = &numberOfBlocksToSearch

return cfg
}
26 changes: 17 additions & 9 deletions cmd/peggo/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func orchestratorCmd(cmd *cli.Cmd) {
var (
valsetDur time.Duration
batchDur time.Duration
loopDur time.Duration
)

if *cfg.relayValsets {
Expand All @@ -138,16 +139,23 @@ func orchestratorCmd(cmd *cli.Cmd) {
orShutdown(err)
}

loopDur, err = time.ParseDuration(*cfg.loopDuration)
if err != nil {
loopDur = 60 * time.Second
}

orchestratorCfg := orchestrator.Config{
CosmosAddr: cosmosKeyring.Addr,
EthereumAddr: ethKeyFromAddress,
MinBatchFeeUSD: *cfg.minBatchFeeUSD,
ERC20ContractMapping: erc20ContractMapping,
RelayValsetOffsetDur: valsetDur,
RelayBatchOffsetDur: batchDur,
RelayValsets: *cfg.relayValsets,
RelayBatches: *cfg.relayBatches,
RelayerMode: !isValidator,
CosmosAddr: cosmosKeyring.Addr,
EthereumAddr: ethKeyFromAddress,
MinBatchFeeUSD: *cfg.minBatchFeeUSD,
ERC20ContractMapping: erc20ContractMapping,
RelayValsetOffsetDur: valsetDur,
RelayBatchOffsetDur: batchDur,
RelayValsets: *cfg.relayValsets,
RelayBatches: *cfg.relayBatches,
RelayerMode: !isValidator,
LoopDuration: loopDur,
NumberOfBlocksToSearch: *cfg.numberOfBlocksToSearch,
}

// Create peggo and run it
Expand Down
4 changes: 2 additions & 2 deletions orchestrator/batch_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

func (s *Orchestrator) runBatchCreator(ctx context.Context) (err error) {
bc := batchCreator{Orchestrator: s}
s.logger.WithField("loop_duration", defaultLoopDur.String()).Debugln("starting BatchCreator...")
s.logger.WithField("loop_duration", s.cfg.LoopDuration.String()).Debugln("starting BatchCreator...")

return loops.RunLoop(ctx, defaultLoopDur, func() error {
return loops.RunLoop(ctx, s.cfg.LoopDuration, func() error {
return bc.requestTokenBatches(ctx)
})
}
Expand Down
13 changes: 4 additions & 9 deletions orchestrator/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ import (
const (
// Minimum number of confirmations for an Ethereum block to be considered valid
ethBlockConfirmationDelay uint64 = 12

// Maximum block range for Ethereum event query. If the orchestrator has been offline for a long time,
// the oracle loop can potentially run longer than defaultLoopDur due to a surge of events. This usually happens
// when there are more than ~50 events to claim in a single run.
defaultBlocksToSearch uint64 = 2000
)

// runOracle is responsible for making sure that Ethereum events are retrieved from the Ethereum blockchain
Expand All @@ -33,9 +28,9 @@ func (s *Orchestrator) runOracle(ctx context.Context, lastObservedBlock uint64)
lastResyncWithInjective: time.Now(),
}

s.logger.WithField("loop_duration", defaultLoopDur.String()).Debugln("starting Oracle...")
s.logger.WithField("loop_duration", s.cfg.LoopDuration.String()).Debugln("starting Oracle...")

return loops.RunLoop(ctx, defaultLoopDur, func() error {
return loops.RunLoop(ctx, s.cfg.LoopDuration, func() error {
return oracle.observeEthEvents(ctx)
})
}
Expand Down Expand Up @@ -93,8 +88,8 @@ func (l *oracle) observeEthEvents(ctx context.Context) error {
}

// ensure the block range is within defaultBlocksToSearch
if latestHeight > l.lastObservedEthHeight+defaultBlocksToSearch {
latestHeight = l.lastObservedEthHeight + defaultBlocksToSearch
if latestHeight > l.lastObservedEthHeight+l.cfg.NumberOfBlocksToSearch {
latestHeight = l.lastObservedEthHeight + l.cfg.NumberOfBlocksToSearch
}

events, err := l.getEthEvents(ctx, l.lastObservedEthHeight, latestHeight)
Expand Down
10 changes: 6 additions & 4 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
"github.com/InjectiveLabs/peggo/orchestrator/loops"
)

const (
defaultLoopDur = 60 * time.Second
)

// PriceFeed provides token price for a given contract address
type PriceFeed interface {
QueryUSDPrice(address gethcommon.Address) (float64, error)
Expand All @@ -34,6 +30,12 @@ type Config struct {
RelayValsets bool
RelayBatches bool
RelayerMode bool
LoopDuration time.Duration

// Maximum block range for Ethereum event query. If the orchestrator has been offline for a long time,
// the oracle loop can potentially run longer than defaultLoopDur due to a surge of events. This usually happens
// when there are more than ~50 events to claim in a single run.
NumberOfBlocksToSearch uint64
}
Comment on lines +35 to 39
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider using uint64 consistently.

The NumberOfBlocksToSearch field uses uint64, which is appropriate for block numbers. However, ensure this type is used consistently across the codebase for block-related values.


🏁 Script executed:

#!/bin/bash
# Check for inconsistent block number types
rg -A 1 "block.*uint|block.*int" --type go

Length of output: 301


Ensure Consistent Block Number Types Across Files

Please update block number types for consistency. While the organizer/orchestrator.go file correctly uses uint64 for NumberOfBlocksToSearch, our search revealed that in orchestrator/relayer.go block numbers are converted to int64 (e.g., using int64(vs.Height) and int64(batch.Block)). Aligning these types across the codebase would reduce potential confusion or type-mismatch issues, unless using int64 is required by external API constraints. Please either refactor the relayer code to use uint64 where applicable or add documentation explaining the need for the current discrepancy.


type Orchestrator struct {
Expand Down
4 changes: 2 additions & 2 deletions orchestrator/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func (s *Orchestrator) runSigner(ctx context.Context, peggyID gethcommon.Hash) e
peggyID: peggyID,
}

s.logger.WithField("loop_duration", defaultLoopDur.String()).Debugln("starting Signer...")
s.logger.WithField("loop_duration", s.cfg.LoopDuration.String()).Debugln("starting Signer...")

return loops.RunLoop(ctx, defaultLoopDur, func() error {
return loops.RunLoop(ctx, s.cfg.LoopDuration, func() error {
return signer.sign(ctx)
})
}
Expand Down