Skip to content

Commit

Permalink
dev: update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Jun 14, 2018
1 parent f82ec49 commit 6546b70
Show file tree
Hide file tree
Showing 28 changed files with 1,142 additions and 277 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# lion
# prophet
Embedded distributed coordinator
55 changes: 45 additions & 10 deletions cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import (
)

var (
cfg *Cfg

replicaBaseScore = float64(100)
)

// Cfg lion cfg
type emprtyHandler struct {
}

func (h *emprtyHandler) BecomeLeader() {}
func (h *emprtyHandler) BecomeFollower() {}

// Cfg prophet cfg
type Cfg struct {
// MaxScheduleRetries maximum retry times for schedule
MaxScheduleRetries int
Expand Down Expand Up @@ -39,13 +43,28 @@ type Cfg struct {
MinAvailableStorageUsedRate int
// LocationLabels the label used for location
LocationLabels []string

schedulers []Scheduler
resourceFactory func() Resource
containerFactory func() Container
// MaxRPCCons rpc conns
MaxRPCCons int
// MaxRPCConnIdle rpc conn max idle time
MaxRPCConnIdle time.Duration
// MaxRPCTimeout rpc max timeout
MaxRPCTimeout time.Duration

Namespace string
LeaseTTL int64
Schedulers []Scheduler
Handler RoleChangeHandler
}

func (c *Cfg) adujst() {
if c.LeaseTTL == 0 {
c.LeaseTTL = 5
}

if c.Namespace == "" {
c.Namespace = "/prophet"
}

if c.MaxScheduleRetries == 0 {
c.MaxScheduleRetries = 3
}
Expand Down Expand Up @@ -94,9 +113,25 @@ func (c *Cfg) adujst() {
c.MaxLimitSnapshotsCount = 3
}

if c.schedulers == nil {
c.schedulers = append(c.schedulers, newBalanceReplicaScheduler())
c.schedulers = append(c.schedulers, newBalanceResourceLeaderScheduler())
if c.MaxRPCCons == 0 {
c.MaxRPCCons = 10
}

if c.MaxRPCConnIdle == 0 {
c.MaxRPCConnIdle = time.Hour
}

if c.MaxRPCTimeout == 0 {
c.MaxRPCTimeout = time.Second * 10
}

if len(c.Schedulers) == 0 {
c.Schedulers = append(c.Schedulers, newBalanceReplicaScheduler(c))
c.Schedulers = append(c.Schedulers, newBalanceResourceLeaderScheduler(c))
}

if c.Handler == nil {
c.Handler = &emprtyHandler{}
}
}

Expand Down
1 change: 1 addition & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ func init() {
// SetLogger set the log for prophet
func SetLogger(l Logger) {
log = l
log.Infof("prophet: logger set")
}
20 changes: 14 additions & 6 deletions meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ const (

// Serializable serializable
type Serializable interface {
Marshal() ([]byte, error)
Unmarshal(data []byte) error
}

type codecSerializable interface {
Serializable
Prepare() error
Init(adapter Adapter) error
}

// Peer is the resource peer
Expand Down Expand Up @@ -63,11 +67,12 @@ type Resource interface {
ID() uint64
Peers() []*Peer
SetPeers(peers []*Peer)
IsStale(other Resource) bool
PeerChanged(other Resource) bool
RangeChanged(other Resource) bool

Stale(other Resource) bool
Changed(other Resource) bool
Clone() Resource

Marshal() ([]byte, error)
Unmarshal(data []byte) error
}

// Pair key value pair
Expand All @@ -85,4 +90,7 @@ type Container interface {
State() State

Clone() Container

Marshal() ([]byte, error)
Unmarshal(data []byte) error
}
51 changes: 51 additions & 0 deletions meta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package prophet

import (
"testing"
)

func TestPeerClone(t *testing.T) {
old := newTestPeer(1, 10)
cloned := old.Clone()

if cloned.ID != old.ID {
t.Error("peer clone failed with ID")
}
if cloned.ContainerID != old.ContainerID {
t.Error("peer clone failed with ContainerID")
}

old.ID = 2
old.ContainerID = 20
if cloned.ID == old.ID {
t.Error("peer clone failed with ID changed")
}
if cloned.ContainerID == old.ContainerID {
t.Error("peer clone failed with ContainerID changed")
}
}

func TestPeerStatusClone(t *testing.T) {
old := &PeerStats{
Peer: newTestPeer(1, 10),
DownSeconds: 10,
}
cloned := old.Clone()

if cloned.DownSeconds != old.DownSeconds {
t.Error("peer status clone failed with DownSeconds")
}

old.DownSeconds = 20

if cloned.DownSeconds == old.DownSeconds {
t.Error("peer status clone failed with DownSeconds changed")
}
}

func newTestPeer(id, containerID uint64) *Peer {
return &Peer{
ID: id,
ContainerID: containerID,
}
}
76 changes: 57 additions & 19 deletions prophet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,82 @@ package prophet

import (
"sync"
"time"

"github.com/fagongzi/goetty"
)

// RoleChangeHandler prophet role change handler
type RoleChangeHandler interface {
BecomeLeader()
BecomeFollower()
}

// Adapter prophet adapter
type Adapter interface {
// NewResource return a new resource
NewResource() Resource
// NewContainer return a new container
NewContainer() Container
// FetchResourceHB fetch resource HB
FetchAllResourceHB() []*ResourceHeartbeatReq
// FetchResourceHB fetch resource HB
FetchResourceHB(id uint64) *ResourceHeartbeatReq
// FetchContainerHB fetch container HB
FetchContainerHB() *ContainerHeartbeatReq
// ResourceHBInterval fetch resource HB interface
ResourceHBInterval() time.Duration
// ContainerHBInterval fetch container HB interface
ContainerHBInterval() time.Duration
// HBHandler HB hander
HBHandler() HeartbeatHandler
}

// Prophet is the distributed scheduler and coordinator
type Prophet struct {
sync.Mutex

opts *options

adapter Adapter
cfg *Cfg
store Store
rt *Runtime
coordinator *Coordinator

node *Node
leader *Node
leaderFlag int64
signature string

tcpL *goetty.Server
runner *Runner
completeC chan struct{}
node *Node
leader *Node
leaderFlag int64
signature string
tcpL *goetty.Server
runner *Runner
completeC chan struct{}
rpc *simpleRPC
bizCodec *codec
}

// NewProphet returns a prophet instance
func NewProphet(name string, addr string, opts ...Option) *Prophet {
func NewProphet(name string, addr string, adapter Adapter, opts ...Option) *Prophet {
value := &options{cfg: &Cfg{}}
for _, opt := range opts {
opt(value)
}
value.adjust()

p := new(Prophet)
p.opts = value
p.cfg = value.cfg
p.adapter = adapter
p.bizCodec = &codec{adapter: adapter}
p.leaderFlag = 0
p.node = &Node{
ID: p.opts.id,
Name: name,
Addr: addr,
}
p.signature = p.node.marshal()
p.store = newEtcdStore(p.opts.client, p.opts.namespace)
p.store = newEtcdStore(value.client, p.cfg.Namespace, adapter, p.signature)
p.runner = NewRunner()
p.coordinator = newCoordinator(p.runner, p.rt)
p.coordinator = newCoordinator(value.cfg, p.runner, p.rt)
p.tcpL = goetty.NewServer(addr,
goetty.WithServerDecoder(goetty.NewIntLengthFieldBasedDecoder(bizCodec)),
goetty.WithServerEncoder(goetty.NewIntLengthFieldBasedEncoder(bizCodec)))
goetty.WithServerDecoder(goetty.NewIntLengthFieldBasedDecoder(p.bizCodec)),
goetty.WithServerEncoder(goetty.NewIntLengthFieldBasedEncoder(p.bizCodec)))
p.completeC = make(chan struct{})
p.rpc = newSimpleRPC(p)

return p
}
Expand All @@ -61,3 +89,13 @@ func (p *Prophet) Start() {
p.startResourceHeartbeatLoop()
p.startContainerHeartbeatLoop()
}

// GetStore returns the store
func (p *Prophet) GetStore() Store {
return p.store
}

// GetRPC returns the rpc interface
func (p *Prophet) GetRPC() RPC {
return p.rpc
}
44 changes: 36 additions & 8 deletions prophet_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,58 @@ import (
type Coordinator struct {
sync.RWMutex

cfg *Cfg
rt *Runtime
checker *replicaChecker
limiter *scheduleLimiter
schedulers map[string]*scheduleController
opts map[uint64]Operator

runner *Runner
runner *Runner
tasks []uint64
running bool
}

func newCoordinator(runner *Runner, rt *Runtime) *Coordinator {
func newCoordinator(cfg *Cfg, runner *Runner, rt *Runtime) *Coordinator {
c := new(Coordinator)
c.limiter = newScheduleLimiter()
c.checker = newReplicaChecker(rt)
c.checker = newReplicaChecker(cfg, rt)
c.opts = make(map[uint64]Operator)
c.schedulers = make(map[string]*scheduleController)
c.runner = runner
c.rt = rt
c.cfg = cfg
return c
}

func (c *Coordinator) start() {
for _, s := range cfg.schedulers {
if c.running {
log.Warnf("prophet: coordinator is already started.")
return
}

for _, s := range c.cfg.Schedulers {
c.addScheduler(s)
}
c.running = true
}

func (c *Coordinator) stop() {
c.runner.Stop()
c.Lock()
defer c.Unlock()

c.running = false
for _, id := range c.tasks {
c.runner.StopCancelableTask(id)
}
}

func (c *Coordinator) isRunning() bool {
c.RLock()
value := c.running
c.RUnlock()

return value
}

func (c *Coordinator) addScheduler(scheduler Scheduler) error {
Expand All @@ -53,10 +77,14 @@ func (c *Coordinator) addScheduler(scheduler Scheduler) error {
return err
}

c.runner.RunCancelableTask(func(ctx context.Context) {
id, err := c.runner.RunCancelableTask(func(ctx context.Context) {
c.runScheduler(ctx, s)
})
if err != nil {
return err
}

c.tasks = append(c.tasks, id)
c.schedulers[s.Name()] = s
return nil
}
Expand Down Expand Up @@ -111,7 +139,7 @@ func (c *Coordinator) runScheduler(ctx context.Context, s *scheduleController) {
continue
}

for i := 0; i < cfg.MaxScheduleRetries; i++ {
for i := 0; i < c.cfg.MaxScheduleRetries; i++ {
op := s.Schedule(c.rt)
if op == nil {
continue
Expand All @@ -138,7 +166,7 @@ func (c *Coordinator) dispatch(target *ResourceRuntime) *resourceHeartbeatRsp {
}

// Check replica operator.
if c.limiter.operatorCount(ReplicaKind) >= cfg.MaxScheduleReplica {
if c.limiter.operatorCount(ReplicaKind) >= c.cfg.MaxScheduleReplica {
return nil
}

Expand Down
Loading

0 comments on commit 6546b70

Please sign in to comment.