Skip to content

Commit

Permalink
Add scheduler control
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed May 22, 2020
1 parent 72f948f commit da41c4d
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 9 deletions.
5 changes: 5 additions & 0 deletions meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ type Resource interface {
Marshal() ([]byte, error)
// Unmarshal returns error if unmarshal failed
Unmarshal(data []byte) error

// SupportRebalance support rebalance the resource
SupportRebalance() bool
// SupportTransferLeader support transfer leader
SupportTransferLeader() bool
}

// Pair key value pair
Expand Down
28 changes: 28 additions & 0 deletions mock_meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ func (rc *Runtime) ResourceFollowerContainers(res *ResourceRuntime) []*Container
}

// RandLeaderResource returns the random leader resource
func (rc *Runtime) RandLeaderResource(id uint64) *ResourceRuntime {
func (rc *Runtime) RandLeaderResource(id uint64, kind ResourceKind) *ResourceRuntime {
rc.RLock()
defer rc.RUnlock()

return randResource(rc.leaders[id])
return randResource(rc.leaders[id], kind)
}

// RandFollowerResource returns the random follower resource
func (rc *Runtime) RandFollowerResource(id uint64) *ResourceRuntime {
func (rc *Runtime) RandFollowerResource(id uint64, kind ResourceKind) *ResourceRuntime {
rc.RLock()
defer rc.RUnlock()

return randResource(rc.followers[id])
return randResource(rc.followers[id], kind)
}

func (rc *Runtime) getContainerWithoutLock(id uint64) *ContainerRuntime {
Expand All @@ -160,7 +160,7 @@ func (rc *Runtime) getResourceWithoutLock(id uint64) *ResourceRuntime {
return resource.Clone()
}

func randResource(resources map[uint64]*ResourceRuntime) *ResourceRuntime {
func randResource(resources map[uint64]*ResourceRuntime, kind ResourceKind) *ResourceRuntime {
for _, res := range resources {
if res.leaderPeer == nil {
log.Fatalf("rand resource %d without leader", res.meta.ID())
Expand All @@ -174,6 +174,14 @@ func randResource(resources map[uint64]*ResourceRuntime) *ResourceRuntime {
continue
}

if kind == LeaderKind && !res.meta.SupportTransferLeader() {
continue
}

if kind == ReplicaKind && !res.meta.SupportRebalance() {
continue
}

return res.Clone()
}

Expand Down
4 changes: 2 additions & 2 deletions scheduler_balancer_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func scheduleTransferLeader(rt *Runtime, s Selector, filters ...Filter) (*Resour

if mostLeaderDistance > leastLeaderDistance {
// Transfer a leader out of mostLeaderContainer.
res := rt.RandLeaderResource(mostLeaderContainer.meta.ID())
res := rt.RandLeaderResource(mostLeaderContainer.meta.ID(), LeaderKind)
if res == nil {
return nil, nil
}
Expand All @@ -99,7 +99,7 @@ func scheduleTransferLeader(rt *Runtime, s Selector, filters ...Filter) (*Resour
}

// Transfer a leader into leastLeaderContainer.
res := rt.RandFollowerResource(leastLeaderContainer.meta.ID())
res := rt.RandFollowerResource(leastLeaderContainer.meta.ID(), LeaderKind)
if res == nil {
return nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler_balancer_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func scheduleRemovePeer(rt *Runtime, s Selector, filters ...Filter) (*ResourceRu
return nil, nil
}

target := rt.RandFollowerResource(source.meta.ID())
target := rt.RandFollowerResource(source.meta.ID(), ReplicaKind)
if target == nil {
target = rt.RandLeaderResource(source.meta.ID())
target = rt.RandLeaderResource(source.meta.ID(), ReplicaKind)
}
if target == nil {
return nil, nil
Expand Down
10 changes: 10 additions & 0 deletions test_infostructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ func (res *testResource) Unmarshal(data []byte) error {
return json.Unmarshal(data, res)
}

// SupportRebalance support rebalance the resource
func (res *testResource) SupportRebalance() bool {
return true
}

// SupportTransferLeader support transfer leader
func (res *testResource) SupportTransferLeader() bool {
return true
}

type testContainer struct {
CShardAddr string `json:"shardAddr"`
CID uint64 `json:"cid"`
Expand Down

0 comments on commit da41c4d

Please sign in to comment.