Skip to content

Commit

Permalink
Fix the watcher maybe lost the notify
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Nov 4, 2019
1 parent ab67f13 commit 542cde0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
4 changes: 4 additions & 0 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package prophet

import (
"sync"
"sync/atomic"

"github.com/fagongzi/goetty"
)
Expand Down Expand Up @@ -50,6 +51,7 @@ type InitWatcher struct {

// EventNotify event notify
type EventNotify struct {
Seq uint64 `json:"seq"`
Event int `json:"event"`
Value []byte `json:"value"`
}
Expand Down Expand Up @@ -175,12 +177,14 @@ func (evt *EventNotify) ReadLeaderChangerValue() (uint64, uint64) {
}

type watcher struct {
seq uint64
info *InitWatcher
conn goetty.IOSession
}

func (wt *watcher) notify(evt *EventNotify) {
if MatchEvent(evt.Event, wt.info.Flag) {
evt.Seq = atomic.AddUint64(&wt.seq, 1)
err := wt.conn.WriteAndFlush(evt)
if err != nil {
wt.conn.Close()
Expand Down
15 changes: 14 additions & 1 deletion watcher_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (w *Watcher) resetConnWithProphet(flag int) {
Flag: flag,
})
if err != nil {
time.Sleep(time.Millisecond * 200)
conn.Close()
time.Sleep(time.Millisecond * 200)
continue
}

Expand All @@ -125,6 +125,8 @@ func (w *Watcher) watchDog(flag int) {
}

func (w *Watcher) startReadLoop() {
expectSeq := uint64(0)

for {
msg, err := w.conn.Read()
if err != nil {
Expand All @@ -133,6 +135,17 @@ func (w *Watcher) startReadLoop() {
}

if evt, ok := msg.(*EventNotify); ok {
log.Infof("%d, %d", expectSeq, evt.Seq)
// we lost some event notify, close the conection, and retry
if expectSeq != evt.Seq {
log.Warnf("prophet: watch lost some event notify, expect seq %d, but %d, close and retry",
expectSeq,
evt.Seq)
w.conn.Close()
return
}

expectSeq = evt.Seq + 1
w.eventC <- evt
} else {
w.conn.Close()
Expand Down
49 changes: 49 additions & 0 deletions watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package prophet

import (
"testing"
"time"

"github.com/fagongzi/goetty"
"github.com/stretchr/testify/assert"
)

func TestNotify(t *testing.T) {
addr := "127.0.0.1:12345"
bizCodec := &codec{}
c := make(chan *EventNotify, 10)
s := goetty.NewServer(addr,
goetty.WithServerDecoder(goetty.NewIntLengthFieldBasedDecoder(bizCodec)),
goetty.WithServerEncoder(goetty.NewIntLengthFieldBasedEncoder(bizCodec)))

conns := 0
go s.Start(func(conn goetty.IOSession) error {
conns++
for evt := range c {
log.Infof("write %d", evt.Seq)
conn.WriteAndFlush(evt)
}

log.Infof("exit")
return nil
})
<-s.Started()

defer s.Stop()

w := NewWatcher("127.0.0.1:12345")
ch := w.Watch(EventResourceChaned)
go func() {
for range ch {

}
}()

c <- &EventNotify{Event: EventInit, Seq: 0}
time.Sleep(time.Millisecond * 100)
assert.Equal(t, 1, conns, "test watcher failed")

c <- &EventNotify{Event: EventResourceChaned, Seq: 2}
time.Sleep(time.Millisecond * 200)
assert.Equal(t, 2, conns, "test watcher failed")
}

0 comments on commit 542cde0

Please sign in to comment.