Skip to content

Commit

Permalink
chore: alerts on jailed
Browse files Browse the repository at this point in the history
refactoring rpc system
  • Loading branch information
albttx committed Dec 12, 2022
1 parent 224f930 commit 804787c
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 77 deletions.
163 changes: 86 additions & 77 deletions cmd/cosmos-notifyer/cli_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,94 +28,69 @@ func (s *service) Start(cctx *cli.Context) error {
go func(chain Chain) {
defer wg.Done()

s.startWatcher(chain)
activeRPC := true

for {
rpcs := cosmosblocks.CheckRPCs(chain.RPC)
rpc := rpcs.GetValidRPCURL()

if rpc == nil {
if activeRPC {
s.notify.Alert(notifyer.AlertMsg{
Msg: fmt.Sprintf("[%s] No valid RPC (0/%d)", chain.Name, len(rpcs)),
})
}
activeRPC = false
time.Sleep(time.Second * 5)
continue
} else if !activeRPC && rpc != nil {
s.notify.Recover(notifyer.RecoverMsg{
Msg: fmt.Sprintf("[%s] RPCs are back up ! ", chain.Name),
})
}

// Start watching
if err := s.startWatcher(chain, *rpc); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"chain": chain.Name,
"rpc": *rpc,
}).Error()
}
}
}(chain)
}
wg.Wait()
return nil
}

func (s *service) startWatcher(chain Chain) {
var (
startTime time.Time
lastRPCAlertFiredAt time.Time
)
func (s *service) startWatcher(chain Chain, rpc string) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

for {
startTime = time.Now()
ctx = ctxlogger.WithValue(ctx, "chain", chain.Name)
ctx = ctxlogger.WithValue(ctx, "rpc", rpc)

// Loop over all RPCs
for _, rpc := range chain.RPC {
ctx, cancelFunc := context.WithCancel(context.Background())

ctx = ctxlogger.WithValue(ctx, "chain", chain.Name)
ctx = ctxlogger.WithValue(ctx, "rpc", rpc)

l := ctxlogger.Logger(ctx)
l.Logger.SetLevel(s.cfg.GetLogLevel())

c, err := cosmosblocks.NewClient(cosmosblocks.Config{
RPCEndpoint: rpc,
Logger: l,
})
if err != nil {
l.WithError(err).Error()
cancelFunc()
continue
}

status, err := c.Status(ctx)
if err != nil {
l.WithError(err).Error()
cancelFunc()
continue
} else if status.SyncInfo.CatchingUp {
l.Error("node is catching-up...")
cancelFunc()
continue
}
l := ctxlogger.Logger(ctx)
l.Logger.SetLevel(s.cfg.GetLogLevel())

go func() {
if err := s.blockHandler(ctx, c, chain); err != nil {
logrus.WithError(err).Error()
}
}()
c, err := cosmosblocks.NewClient(cosmosblocks.Config{
RPCEndpoint: rpc,
Logger: l,
})
if err != nil {
return errors.Trace(err)
}

if err := c.Start(ctx); err != nil {
l.WithError(err).Error()
}
cancelFunc()
go func() {
if err := s.blockHandler(ctx, c, chain); err != nil {
logrus.WithError(err).Error()
}
}()

// If we're here, a RPC has failed
if time.Since(startTime) < time.Second*60 && time.Since(lastRPCAlertFiredAt) > time.Hour*4 {
lastRPCAlertFiredAt = time.Now()

err := s.notify.Alert(notifyer.AlertMsg{
Msg: fmt.Sprintf("[%s] RPCs are down", chain.Name),
})
if err != nil {
logrus.WithError(err).Error("Failed to send alert message: RPC is down")
}

go func() {
for {
if time.Since(startTime) > time.Second*60*5 {
err := s.notify.Recover(notifyer.RecoverMsg{
Msg: fmt.Sprintf("[%s] RPCs are back up!", chain.Name),
})
if err != nil {
logrus.WithError(err).Error("Failed to send recover message: RPC is up")
}
return
}
time.Sleep(time.Second * 5)
}

}()
}
if err := c.Start(ctx); err != nil {
return errors.Trace(err)
}

return nil
}

func (s *service) blockHandler(ctx context.Context, c *cosmosblocks.Client, chain Chain) error {
Expand All @@ -126,16 +101,50 @@ func (s *service) blockHandler(ctx context.Context, c *cosmosblocks.Client, chai
latestBlockHeight int64 = 0
missedBlocks int64 = 0
missedBlocksAlert int64 = 10

isJailed bool = false
isBonded bool = true
)

START:

validator, err := c.QueryValidator(chain.ValidatorAddr)
if err != nil {
return errors.Errorf("failed to get validator: %s", chain.ValidatorAddr)
}

if validator.Validator.IsJailed() {
s.notify.Alert(notifyer.AlertMsg{
Msg: fmt.Sprintf("[%s] %s is jailed",
if !isJailed {
isJailed = true
s.notify.Alert(notifyer.AlertMsg{
Msg: fmt.Sprintf("[%s] %s is jailed",
chain.Name, validator.Validator.GetMoniker()),
})
}
time.Sleep(time.Second * 30)
goto START
} else if !validator.Validator.IsJailed() && isJailed {
isJailed = false
s.notify.Recover(notifyer.RecoverMsg{
Msg: fmt.Sprintf("[%s] %s is un-jailed",
chain.Name, validator.Validator.GetMoniker()),
})
}

if !validator.Validator.IsBonded() {
if isBonded {
isBonded = false
s.notify.Alert(notifyer.AlertMsg{
Msg: fmt.Sprintf("[%s] validator: %s is not in the active set",
chain.Name, validator.Validator.GetMoniker()),
})
}
time.Sleep(time.Second * 30)
goto START
} else if validator.Validator.IsBonded() && !isBonded {
isBonded = true
s.notify.Recover(notifyer.RecoverMsg{
Msg: fmt.Sprintf("[%s] validator: %s is back in the active set",
chain.Name, validator.Validator.GetMoniker()),
})
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/cosmosblocks/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cosmosblocks

import (
"context"
)

type RPCStatus struct {
URL string

CatchingUp bool

Err error
}

type RPCStatuses []RPCStatus

func (rpcs RPCStatuses) GetValidRPCURL() *string {
for _, rpc := range rpcs {
if rpc.Err == nil && !rpc.CatchingUp {
return &rpc.URL
}
}
return nil
}

func CheckRPCs(rpcAddrs []string) RPCStatuses {
statuses := make([]RPCStatus, 0, len(rpcAddrs))

for _, rpc := range rpcAddrs {
status := RPCStatus{
URL: rpc,
}

c, err := NewClient(Config{
RPCEndpoint: rpc,
})
if err != nil {
status.Err = err
continue
}
// TODO: Check chain-id is correct

rpcStatus, err := c.Status(context.Background())
if err != nil {
status.Err = err
continue
} else if rpcStatus.SyncInfo.CatchingUp {
status.CatchingUp = true
}

statuses = append(statuses, status)
}

return statuses
}

0 comments on commit 804787c

Please sign in to comment.