Skip to content

Commit

Permalink
fix: some watcher bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Jul 17, 2018
1 parent edbb007 commit c463202
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 56 deletions.
6 changes: 3 additions & 3 deletions prophet_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p *Prophet) handleResourceHeartbeat(msg *ResourceHeartbeatReq) (*resourceH
return nil, errReq
}

return p.coordinator.dispatch(p.rt.GetResource(value.meta.ID())), nil
return p.coordinator.dispatch(p.rt.Resource(value.meta.ID())), nil
}

func (p *Prophet) handleContainerHeartbeat(msg *ContainerHeartbeatReq) error {
Expand All @@ -49,7 +49,7 @@ func (p *Prophet) handleContainerHeartbeat(msg *ContainerHeartbeatReq) error {
p.Lock()
defer p.Unlock()

container := p.rt.GetContainer(meta.ID())
container := p.rt.Container(meta.ID())
if container == nil {
err := p.store.PutContainer(meta)
if err != nil {
Expand Down Expand Up @@ -87,7 +87,7 @@ func (p *Prophet) handleAskSplit(req *askSplitReq) *askSplitRsp {

rsp := &askSplitRsp{}

res := p.rt.GetResource(req.Resource.ID())
res := p.rt.Resource(req.Resource.ID())
if res == nil {
rsp.Err = fmt.Errorf("resource not found")
return rsp
Expand Down
24 changes: 12 additions & 12 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (rc *Runtime) load() {
}
}

// GetContainers returns the containers, using clone
func (rc *Runtime) GetContainers() []*ContainerRuntime {
// Containers returns the containers, using clone
func (rc *Runtime) Containers() []*ContainerRuntime {
rc.RLock()
defer rc.RUnlock()

Expand All @@ -67,8 +67,8 @@ func (rc *Runtime) GetContainers() []*ContainerRuntime {
return value
}

// GetResources returns the resources, using clone
func (rc *Runtime) GetResources() []*ResourceRuntime {
// Resources returns the resources, using clone
func (rc *Runtime) Resources() []*ResourceRuntime {
rc.RLock()
defer rc.RUnlock()

Expand All @@ -82,24 +82,24 @@ func (rc *Runtime) GetResources() []*ResourceRuntime {
return value
}

// GetContainer returns a cloned value of container runtime info
func (rc *Runtime) GetContainer(id uint64) *ContainerRuntime {
// Container returns a cloned value of container runtime info
func (rc *Runtime) Container(id uint64) *ContainerRuntime {
rc.RLock()
defer rc.RUnlock()

return rc.getContainerWithoutLock(id)
}

// GetResource returns a cloned value of resource runtime info
func (rc *Runtime) GetResource(id uint64) *ResourceRuntime {
// Resource returns a cloned value of resource runtime info
func (rc *Runtime) Resource(id uint64) *ResourceRuntime {
rc.RLock()
defer rc.RUnlock()

return rc.getResourceWithoutLock(id)
}

// GetResourceContainers returns resource containers
func (rc *Runtime) GetResourceContainers(target *ResourceRuntime) []*ContainerRuntime {
// ResourceContainers returns containers that has the resource's peer
func (rc *Runtime) ResourceContainers(target *ResourceRuntime) []*ContainerRuntime {
rc.RLock()
defer rc.RUnlock()

Expand All @@ -112,8 +112,8 @@ func (rc *Runtime) GetResourceContainers(target *ResourceRuntime) []*ContainerRu
return containers
}

// GetResourceFollowerContainers returns all containers for peers exclude leader
func (rc *Runtime) GetResourceFollowerContainers(res *ResourceRuntime) []*ContainerRuntime {
// ResourceFollowerContainers returns all containers for peers exclude leader
func (rc *Runtime) ResourceFollowerContainers(res *ResourceRuntime) []*ContainerRuntime {
rc.RLock()
defer rc.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion scheduler_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func shouldBalance(source, target *ContainerRuntime, kind ResourceKind) bool {
}

func adjustBalanceLimit(rt *Runtime, kind ResourceKind) uint64 {
containers := rt.GetContainers()
containers := rt.Containers()

counts := make([]float64, 0, len(containers))
for _, c := range containers {
Expand Down
8 changes: 4 additions & 4 deletions scheduler_balancer_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (l *balanceResourceLeaderScheduler) Schedule(rt *Runtime) Operator {
return nil
}

source := rt.GetContainer(res.leaderPeer.ContainerID)
target := rt.GetContainer(newLeader.ContainerID)
source := rt.Container(res.leaderPeer.ContainerID)
target := rt.Container(newLeader.ContainerID)
if !shouldBalance(source, target, l.ResourceKind()) {
return nil
}
Expand All @@ -57,7 +57,7 @@ func (l *balanceResourceLeaderScheduler) Schedule(rt *Runtime) Operator {

// scheduleTransferLeader schedules a resource to transfer leader to the peer.
func scheduleTransferLeader(rt *Runtime, s Selector, filters ...Filter) (*ResourceRuntime, *Peer) {
containers := rt.GetContainers()
containers := rt.Containers()
if len(containers) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func scheduleTransferLeader(rt *Runtime, s Selector, filters ...Filter) (*Resour
return nil, nil
}

targetContainers := rt.GetResourceFollowerContainers(res)
targetContainers := rt.ResourceFollowerContainers(res)
target := s.SelectTarget(targetContainers)
if target == nil {
return nil, nil
Expand Down
8 changes: 4 additions & 4 deletions scheduler_balancer_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (s *balanceReplicaScheduler) Schedule(rt *Runtime) Operator {

func (s *balanceReplicaScheduler) transferPeer(rt *Runtime, res *ResourceRuntime, oldPeer *Peer) Operator {
// scoreGuard guarantees that the distinct score will not decrease.
containers := rt.GetResourceContainers(res)
source := rt.GetContainer(oldPeer.ContainerID)
containers := rt.ResourceContainers(res)
source := rt.Container(oldPeer.ContainerID)
scoreGuard := NewDistinctScoreFilter(s.cfg, containers, source)

checker := newReplicaChecker(s.cfg, rt)
Expand All @@ -76,7 +76,7 @@ func (s *balanceReplicaScheduler) transferPeer(rt *Runtime, res *ResourceRuntime
return nil
}

target := rt.GetContainer(newPeer.ContainerID)
target := rt.Container(newPeer.ContainerID)
if !shouldBalance(source, target, s.ResourceKind()) {
return nil
}
Expand All @@ -87,7 +87,7 @@ func (s *balanceReplicaScheduler) transferPeer(rt *Runtime, res *ResourceRuntime

// scheduleRemovePeer schedules a resource to remove the peer.
func scheduleRemovePeer(rt *Runtime, s Selector, filters ...Filter) (*ResourceRuntime, *Peer) {
containers := rt.GetContainers()
containers := rt.Containers()

source := s.SelectSource(containers, filters...)
if source == nil {
Expand Down
10 changes: 5 additions & 5 deletions scheduler_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *replicaChecker) Check(target *ResourceRuntime) Operator {
func (r *replicaChecker) checkDownPeer(target *ResourceRuntime) Operator {
for _, stats := range target.downPeers {
peer := stats.Peer
container := r.rt.GetContainer(peer.ContainerID)
container := r.rt.Container(peer.ContainerID)

if nil != container && container.Downtime() < r.cfg.MaxAllowContainerDownDuration {
continue
Expand All @@ -70,7 +70,7 @@ func (r *replicaChecker) checkDownPeer(target *ResourceRuntime) Operator {

func (r *replicaChecker) checkOfflinePeer(target *ResourceRuntime) Operator {
for _, peer := range target.meta.Peers() {
container := r.rt.GetContainer(peer.ContainerID)
container := r.rt.Container(peer.ContainerID)

if container != nil && container.IsUp() {
continue
Expand All @@ -96,7 +96,7 @@ func (r *replicaChecker) selectWorstPeer(target *ResourceRuntime, filters ...Fil

// Select the container with lowest distinct score.
// If the scores are the same, select the container with maximal resource score.
containers := r.rt.GetResourceContainers(target)
containers := r.rt.ResourceContainers(target)
for _, container := range containers {
if filterSource(container, filters) {
continue
Expand Down Expand Up @@ -129,8 +129,8 @@ func (r *replicaChecker) selectBestPeer(target *ResourceRuntime, allocPeerID boo

// Select the container with best distinct score.
// If the scores are the same, select the container with minimal replica score.
containers := r.rt.GetResourceContainers(target)
for _, container := range r.rt.GetContainers() {
containers := r.rt.ResourceContainers(target)
for _, container := range r.rt.Containers() {
if filterTarget(container, filters) {
continue
}
Expand Down
114 changes: 88 additions & 26 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,37 @@ type EventNotify struct {
func newInitEvent(rt *Runtime) (*EventNotify, error) {
value := goetty.NewByteBuf(512)

resources := rt.GetResources()
value.WriteInt(len(resources))
snap := rt.snap()
value.WriteInt(len(snap.resources))
value.WriteInt(len(snap.containers))

for _, v := range resources {
data, err := v.meta.Marshal()
for _, v := range snap.resources {
data, err := v.Marshal()
if err != nil {
return nil, err
}
value.WriteInt(len(data) + 8)
value.Write(data)
if id, ok := snap.leaders[v.ID()]; ok {
value.WriteUint64(id)
} else {
value.WriteUint64(0)
}
}

for _, v := range snap.containers {
data, err := v.Marshal()
if err != nil {
return nil, err
}
value.WriteInt(len(data))
value.Write(data)
}

_, data, _ := value.ReadAll()
nt := &EventNotify{
Event: EventInit,
Value: value.RawBuf()[0:value.Readable()],
Value: data,
}
value.Release()
return nt, nil
Expand Down Expand Up @@ -99,21 +115,31 @@ func newLeaderChangerEvent(target, leader uint64) *EventNotify {
}

// ReadInitEventValues read all resource info
func (evt *EventNotify) ReadInitEventValues() [][]byte {
func (evt *EventNotify) ReadInitEventValues(resourceF func([]byte, uint64), containerF func([]byte)) {
if len(evt.Value) == 0 {
return nil
return
}

buf := goetty.NewByteBuf(len(evt.Value))
buf.Write(evt.Value)
n, _ := buf.ReadInt()
value := make([][]byte, n, n)
for i := 0; i < n; i++ {
rn, _ := buf.ReadInt()
cn, _ := buf.ReadInt()

for i := 0; i < rn; i++ {
size, _ := buf.ReadInt()
_, data, _ := buf.ReadBytes(size)
leader, _ := buf.ReadUInt64()
resourceF(data, leader)
}

for i := 0; i < cn; i++ {
size, _ := buf.ReadInt()
_, value[i], _ = buf.ReadBytes(size)
_, data, _ := buf.ReadBytes(size)
containerF(data)
}

buf.Release()
return value
return
}

// ReadLeaderChangerValue returns the target resource and the new leader
Expand Down Expand Up @@ -176,20 +202,22 @@ func (wn *watcherNotifier) onInitWatcher(msg *InitWatcher, conn goetty.IOSession
log.Infof("prophet: new watcher %s added", conn.RemoteIP())

if wn.eventC != nil {
nt, err := newInitEvent(wn.rt)
if err != nil {
log.Errorf("prophet: marshal init notify failed, errors:%+v", err)
conn.Close()
return
}

err = conn.WriteAndFlush(nt)
if err != nil {
log.Errorf("prophet: notify to %s failed, errors:%+v",
conn.RemoteIP(),
err)
conn.Close()
return
if MatchEvent(EventInit, msg.Flag) {
nt, err := newInitEvent(wn.rt)
if err != nil {
log.Errorf("prophet: marshal init notify failed, errors:%+v", err)
conn.Close()
return
}

err = conn.WriteAndFlush(nt)
if err != nil {
log.Errorf("prophet: notify to %s failed, errors:%+v",
conn.RemoteIP(),
err)
conn.Close()
return
}
}

wn.watchers.Store(conn.ID(), &watcher{
Expand Down Expand Up @@ -232,3 +260,37 @@ func (wn *watcherNotifier) start() {
})
}
}

type snap struct {
resources []Resource
containers []Container
leaders map[uint64]uint64
}

func (rc *Runtime) snap() *snap {
rc.RLock()
defer rc.RUnlock()

value := &snap{
resources: make([]Resource, len(rc.resources), len(rc.resources)),
containers: make([]Container, len(rc.containers), len(rc.containers)),
leaders: make(map[uint64]uint64),
}

idx := 0
for _, c := range rc.containers {
value.containers[idx] = c.meta.Clone()
idx++
}

idx = 0
for _, r := range rc.resources {
value.resources[idx] = r.meta.Clone()
if r.leaderPeer != nil {
value.leaders[r.meta.ID()] = r.leaderPeer.ID
}
idx++
}

return value
}
1 change: 0 additions & 1 deletion watcher_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func (w *Watcher) Watch(flag int) chan *EventNotify {
w.Lock()
defer w.Unlock()

flag = flag | EventInit
go w.watchDog(flag)
w.eventC = make(chan *EventNotify)
return w.eventC
Expand Down

0 comments on commit c463202

Please sign in to comment.