Skip to content

Commit

Permalink
Add recover for watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Oct 28, 2020
1 parent 0c0aa24 commit a9b74a6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
2 changes: 1 addition & 1 deletion prophet_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *defaultProphet) enableLeader() {
p.coordinator.start()

p.wn = newWatcherNotifier(p.rt)
go p.wn.start()
p.wn.start()

// now, we are leader
atomic.StoreInt64(&p.leaderFlag, 1)
Expand Down
36 changes: 24 additions & 12 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,32 @@ func (wn *watcherNotifier) stop() {
}

func (wn *watcherNotifier) start() {
for {
evt, ok := <-wn.eventC
if !ok {
log.Infof("watcher notifer exited")
return
go func() {
defer func() {
if err := recover(); err == nil {
log.Errorf("watcher notify failed with %+v, restart later", err)
wn.start()
}
}()

for {
evt, ok := <-wn.eventC
if !ok {
log.Infof("watcher notifer exited")
return
}

log.Debugf("new event: %+v", evt)
wn.RLock()
wn.watchers.Range(func(key, value interface{}) bool {
wt := value.(*watcher)
wt.notify(evt)
return true
})
wn.RUnlock()
}
}()

log.Debugf("new event: %+v", evt)
wn.watchers.Range(func(key, value interface{}) bool {
wt := value.(*watcher)
wt.notify(evt)
return true
})
}
}

type snap struct {
Expand Down

0 comments on commit a9b74a6

Please sign in to comment.