Skip to content

Commit

Permalink
Add check bootstrapped
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Feb 17, 2020
1 parent cec7351 commit 73f3317
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 20 deletions.
45 changes: 26 additions & 19 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,32 +186,39 @@ func (ls *defaultLocalStore) BootstrapCluster(initResources ...Resource) {
log.Fatal("save local container id failed with %+v", err)
}

// prepare init resource, alloc the resource id and the first replica peer info
for _, res := range initResources {
res.SetID(ls.MustAllocID())
p := ls.newPeer()
res.SetPeers([]*Peer{&p})
ls.MustPutResource(res)
ok, err := ls.pd.GetStore().AlreadyBootstrapped()
if err != nil {
log.Fatal("get cluster already bootstrapped failed with %+v", err)
}

ok, err := ls.pd.GetStore().PutBootstrapped(ls.meta, initResources...)
if err != nil {
if !ok {
// prepare init resource, alloc the resource id and the first replica peer info
for _, res := range initResources {
ls.MustRemoveResource(res.ID())
res.SetID(ls.MustAllocID())
p := ls.newPeer()
res.SetPeers([]*Peer{&p})
ls.MustPutResource(res)
}

if err != errMaybeNotLeader {
log.Fatal("bootstrap cluster failed with %+v", err)
}
ok, err := ls.pd.GetStore().PutBootstrapped(ls.meta, initResources...)
if err != nil {
for _, res := range initResources {
ls.MustRemoveResource(res.ID())
}

log.Warningf("bootstrap cluster failed with %+v", err)
}
if !ok {
log.Info("the cluster is already bootstrapped")
for _, res := range initResources {
ls.MustRemoveResource(res.ID())
if err != errMaybeNotLeader {
log.Fatal("bootstrap cluster failed with %+v", err)
}

log.Warningf("bootstrap cluster failed with %+v", err)
}
if !ok {
log.Info("the cluster is already bootstrapped")
for _, res := range initResources {
ls.MustRemoveResource(res.ID())
}
log.Info("the init resources is already removed from container")
}
log.Info("the init resources is already removed from container")
}

ls.pd.GetRPC().TiggerContainerHeartbeat()
Expand Down
3 changes: 2 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type Store interface {

// AllocID returns the alloc id
AllocID() (uint64, error)

// AlreadyBootstrapped returns the cluster was already bootstrapped
AlreadyBootstrapped() (bool, error)
// PutBootstrapped put cluster is bootstrapped
PutBootstrapped(container Container, resources ...Resource) (bool, error)

Expand Down
9 changes: 9 additions & 0 deletions store_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ func (s *etcdStore) generate() (uint64, error) {
return max, nil
}

func (s *etcdStore) AlreadyBootstrapped() (bool, error) {
resp, err := s.get(s.clusterPath, clientv3.WithCountOnly())
if err != nil {
return false, nil
}

return resp.Count > 0, nil
}

// PutBootstrapped put cluster is bootstrapped
func (s *etcdStore) PutBootstrapped(container Container, resources ...Resource) (bool, error) {
clusterID, err := s.AllocID()
Expand Down
48 changes: 48 additions & 0 deletions store_etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package prophet

import (
"context"
"fmt"
"math"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/stretchr/testify/assert"
)

func TestAlreadyBootstrapped(t *testing.T) {
stopC, port, err := startTestSingleEtcd(t)
if err != nil {
assert.FailNowf(t, "start embed etcd failed", "error: %+v", err)
}
defer close(stopC)

client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(fmt.Sprintf("http://127.0.0.1:%d", port), ","),
DialTimeout: DefaultTimeout,
})
assert.Nil(t, err, "create etcd client failed")
defer client.Close()

e, err := NewElector(client)
assert.Nil(t, err, "TestAlreadyBootstrapped failed")
defer e.Stop(math.MaxUint64)

go e.ElectionLoop(context.Background(), math.MaxUint64, "node1", func() {}, func() {})
time.Sleep(time.Millisecond * 200)

store := newEtcdStore(client, nil, "node1", e)
yes, err := store.AlreadyBootstrapped()
assert.NoError(t, err, "TestAlreadyBootstrapped failed")
assert.False(t, yes, "TestAlreadyBootstrapped failed")

yes, err = store.PutBootstrapped(newTestContainer(), newTestResource())
assert.NoError(t, err, "TestAlreadyBootstrapped failed")
assert.True(t, yes, "TestAlreadyBootstrapped failed")

yes, err = store.AlreadyBootstrapped()
assert.NoError(t, err, "TestAlreadyBootstrapped failed")
assert.True(t, yes, "TestAlreadyBootstrapped failed")
}

0 comments on commit 73f3317

Please sign in to comment.