From 804787cce300dcb95555e0ac5a54e2874cdfaf3a Mon Sep 17 00:00:00 2001 From: Albert Le Batteux Date: Mon, 12 Dec 2022 18:44:56 +0000 Subject: [PATCH] chore: alerts on jailed refactoring rpc system --- cmd/cosmos-notifyer/cli_start.go | 163 ++++++++++++++++--------------- pkg/cosmosblocks/rpc.go | 55 +++++++++++ 2 files changed, 141 insertions(+), 77 deletions(-) create mode 100644 pkg/cosmosblocks/rpc.go diff --git a/cmd/cosmos-notifyer/cli_start.go b/cmd/cosmos-notifyer/cli_start.go index 7a90b2f..3075084 100644 --- a/cmd/cosmos-notifyer/cli_start.go +++ b/cmd/cosmos-notifyer/cli_start.go @@ -28,94 +28,69 @@ func (s *service) Start(cctx *cli.Context) error { go func(chain Chain) { defer wg.Done() - s.startWatcher(chain) + activeRPC := true + + for { + rpcs := cosmosblocks.CheckRPCs(chain.RPC) + rpc := rpcs.GetValidRPCURL() + + if rpc == nil { + if activeRPC { + s.notify.Alert(notifyer.AlertMsg{ + Msg: fmt.Sprintf("[%s] No valid RPC (0/%d)", chain.Name, len(rpcs)), + }) + } + activeRPC = false + time.Sleep(time.Second * 5) + continue + } else if !activeRPC && rpc != nil { + s.notify.Recover(notifyer.RecoverMsg{ + Msg: fmt.Sprintf("[%s] RPCs are back up ! ", chain.Name), + }) + } + + // Start watching + if err := s.startWatcher(chain, *rpc); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "chain": chain.Name, + "rpc": *rpc, + }).Error() + } + } }(chain) } wg.Wait() return nil } -func (s *service) startWatcher(chain Chain) { - var ( - startTime time.Time - lastRPCAlertFiredAt time.Time - ) +func (s *service) startWatcher(chain Chain, rpc string) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() - for { - startTime = time.Now() + ctx = ctxlogger.WithValue(ctx, "chain", chain.Name) + ctx = ctxlogger.WithValue(ctx, "rpc", rpc) - // Loop over all RPCs - for _, rpc := range chain.RPC { - ctx, cancelFunc := context.WithCancel(context.Background()) - - ctx = ctxlogger.WithValue(ctx, "chain", chain.Name) - ctx = ctxlogger.WithValue(ctx, "rpc", rpc) - - l := ctxlogger.Logger(ctx) - l.Logger.SetLevel(s.cfg.GetLogLevel()) - - c, err := cosmosblocks.NewClient(cosmosblocks.Config{ - RPCEndpoint: rpc, - Logger: l, - }) - if err != nil { - l.WithError(err).Error() - cancelFunc() - continue - } - - status, err := c.Status(ctx) - if err != nil { - l.WithError(err).Error() - cancelFunc() - continue - } else if status.SyncInfo.CatchingUp { - l.Error("node is catching-up...") - cancelFunc() - continue - } + l := ctxlogger.Logger(ctx) + l.Logger.SetLevel(s.cfg.GetLogLevel()) - go func() { - if err := s.blockHandler(ctx, c, chain); err != nil { - logrus.WithError(err).Error() - } - }() + c, err := cosmosblocks.NewClient(cosmosblocks.Config{ + RPCEndpoint: rpc, + Logger: l, + }) + if err != nil { + return errors.Trace(err) + } - if err := c.Start(ctx); err != nil { - l.WithError(err).Error() - } - cancelFunc() + go func() { + if err := s.blockHandler(ctx, c, chain); err != nil { + logrus.WithError(err).Error() } + }() - // If we're here, a RPC has failed - if time.Since(startTime) < time.Second*60 && time.Since(lastRPCAlertFiredAt) > time.Hour*4 { - lastRPCAlertFiredAt = time.Now() - - err := s.notify.Alert(notifyer.AlertMsg{ - Msg: fmt.Sprintf("[%s] RPCs are down", chain.Name), - }) - if err != nil { - logrus.WithError(err).Error("Failed to send alert message: RPC is down") - } - - go func() { - for { - if time.Since(startTime) > time.Second*60*5 { - err := s.notify.Recover(notifyer.RecoverMsg{ - Msg: fmt.Sprintf("[%s] RPCs are back up!", chain.Name), - }) - if err != nil { - logrus.WithError(err).Error("Failed to send recover message: RPC is up") - } - return - } - time.Sleep(time.Second * 5) - } - - }() - } + if err := c.Start(ctx); err != nil { + return errors.Trace(err) } - + return nil } func (s *service) blockHandler(ctx context.Context, c *cosmosblocks.Client, chain Chain) error { @@ -126,16 +101,50 @@ func (s *service) blockHandler(ctx context.Context, c *cosmosblocks.Client, chai latestBlockHeight int64 = 0 missedBlocks int64 = 0 missedBlocksAlert int64 = 10 + + isJailed bool = false + isBonded bool = true ) +START: + validator, err := c.QueryValidator(chain.ValidatorAddr) if err != nil { return errors.Errorf("failed to get validator: %s", chain.ValidatorAddr) } if validator.Validator.IsJailed() { - s.notify.Alert(notifyer.AlertMsg{ - Msg: fmt.Sprintf("[%s] %s is jailed", + if !isJailed { + isJailed = true + s.notify.Alert(notifyer.AlertMsg{ + Msg: fmt.Sprintf("[%s] %s is jailed", + chain.Name, validator.Validator.GetMoniker()), + }) + } + time.Sleep(time.Second * 30) + goto START + } else if !validator.Validator.IsJailed() && isJailed { + isJailed = false + s.notify.Recover(notifyer.RecoverMsg{ + Msg: fmt.Sprintf("[%s] %s is un-jailed", + chain.Name, validator.Validator.GetMoniker()), + }) + } + + if !validator.Validator.IsBonded() { + if isBonded { + isBonded = false + s.notify.Alert(notifyer.AlertMsg{ + Msg: fmt.Sprintf("[%s] validator: %s is not in the active set", + chain.Name, validator.Validator.GetMoniker()), + }) + } + time.Sleep(time.Second * 30) + goto START + } else if validator.Validator.IsBonded() && !isBonded { + isBonded = true + s.notify.Recover(notifyer.RecoverMsg{ + Msg: fmt.Sprintf("[%s] validator: %s is back in the active set", chain.Name, validator.Validator.GetMoniker()), }) } diff --git a/pkg/cosmosblocks/rpc.go b/pkg/cosmosblocks/rpc.go new file mode 100644 index 0000000..0495cd4 --- /dev/null +++ b/pkg/cosmosblocks/rpc.go @@ -0,0 +1,55 @@ +package cosmosblocks + +import ( + "context" +) + +type RPCStatus struct { + URL string + + CatchingUp bool + + Err error +} + +type RPCStatuses []RPCStatus + +func (rpcs RPCStatuses) GetValidRPCURL() *string { + for _, rpc := range rpcs { + if rpc.Err == nil && !rpc.CatchingUp { + return &rpc.URL + } + } + return nil +} + +func CheckRPCs(rpcAddrs []string) RPCStatuses { + statuses := make([]RPCStatus, 0, len(rpcAddrs)) + + for _, rpc := range rpcAddrs { + status := RPCStatus{ + URL: rpc, + } + + c, err := NewClient(Config{ + RPCEndpoint: rpc, + }) + if err != nil { + status.Err = err + continue + } + // TODO: Check chain-id is correct + + rpcStatus, err := c.Status(context.Background()) + if err != nil { + status.Err = err + continue + } else if rpcStatus.SyncInfo.CatchingUp { + status.CatchingUp = true + } + + statuses = append(statuses, status) + } + + return statuses +}