Skip to content

Commit

Permalink
add feature for olap systems
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangxu19830126 committed Aug 23, 2019
1 parent fc94dfb commit 021b4b1
Show file tree
Hide file tree
Showing 272 changed files with 26,162 additions and 18,487 deletions.
6 changes: 4 additions & 2 deletions cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Cfg struct {
LeaseTTL int64
Schedulers []Scheduler
Handler RoleChangeHandler

EnableScaleOnNewStore bool
}

func (c *Cfg) adujst() {
Expand Down Expand Up @@ -157,5 +159,5 @@ func (c *Cfg) getDistinctScore(containers []*ContainerRuntime, other *ContainerR
type emprtyHandler struct {
}

func (h *emprtyHandler) BecomeLeader() {}
func (h *emprtyHandler) BecomeFollower() {}
func (h *emprtyHandler) ProphetBecomeLeader() {}
func (h *emprtyHandler) ProphetBecomeFollower() {}
33 changes: 15 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,37 @@ go 1.12

require (
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/etcd v3.3.15+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190620071333-e64a0ec8b42a // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/fagongzi/goetty v1.3.1
github.com/fagongzi/util v0.0.0-20181102105153-fd38e0f42a4f
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.6 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/prometheus/common v0.6.0 // indirect
github.com/prometheus/procfs v0.0.3 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/prometheus/client_golang v1.1.0 // indirect
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.3 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 // indirect
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 // indirect
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/genproto v0.0.0-20190626174449-989357319d63 // indirect
google.golang.org/grpc v1.21.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 // indirect
google.golang.org/grpc v1.23.0 // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
)
84 changes: 46 additions & 38 deletions go.sum

Large diffs are not rendered by default.

65 changes: 39 additions & 26 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ var (
)

func getResourceKey(id uint64) []byte {
buf := make([]byte, 0, len(resourcesPrefix)+8)
n := len(resourcesPrefix) + 8
buf := make([]byte, n, n)
copy(buf[0:len(resourcesPrefix)], resourcesPrefix)
binary.BigEndian.PutUint64(buf[len(resourcesPrefix):], id)
return buf
Expand All @@ -32,21 +33,21 @@ type LocalStorage interface {
// Get returns the key value
Get(key []byte) ([]byte, error)
// Set sets the key value to the local storage
Set(key, value []byte) error
Set(pairs ...[]byte) error
// Remove remove the key from the local storage
Remove(key []byte) error
Remove(keys ...[]byte) error
// Range visit all values that start with prefix, set limit to 0 for no limit
Range(prefix []byte, limit uint64, fn func(key, value []byte) bool) error
}

type localDB interface {
get(key []byte) ([]byte, error)
set(key, value []byte) error
get([]byte) ([]byte, error)
set(...[]byte) error

countResources() (int, error)
loadResources(handleFunc func(value []byte) (uint64, error)) error
putResource(res Resource) error
removeResource(resID uint64) error
loadResources(func(value []byte) (uint64, error)) error
putResource(...Resource) error
removeResource(...uint64) error
}

type defaultLocalDB struct {
Expand Down Expand Up @@ -79,20 +80,32 @@ func (db *defaultLocalDB) countResources() (int, error) {
return c, nil
}

func (db *defaultLocalDB) putResource(res Resource) error {
data, err := res.Marshal()
if err != nil {
return err
func (db *defaultLocalDB) putResource(reses ...Resource) error {
var pairs [][]byte

for _, res := range reses {
data, err := res.Marshal()
if err != nil {
return err
}

pairs = append(pairs, getResourceKey(res.ID()), data)
}
return db.set(getResourceKey(res.ID()), data)

return db.set(pairs...)
}

func (db *defaultLocalDB) removeResource(resID uint64) error {
return db.storage.Remove(getResourceKey(resID))
func (db *defaultLocalDB) removeResource(resIDs ...uint64) error {
var keys [][]byte
for _, resID := range resIDs {
keys = append(keys, getResourceKey(resID))
}

return db.storage.Remove(keys...)
}

func (db *defaultLocalDB) set(key, value []byte) error {
return db.storage.Set(key, value)
func (db *defaultLocalDB) set(pairs ...[]byte) error {
return db.storage.Set(pairs...)
}

func (db *defaultLocalDB) get(key []byte) ([]byte, error) {
Expand All @@ -105,10 +118,10 @@ type LocalStore interface {
BootstrapCluster(initResources ...Resource)

// MustPutResource put the resource to local
MustPutResource(Resource)
MustPutResource(...Resource)

// MustRemoveResource remove the res from the local
MustRemoveResource(uint64)
MustRemoveResource(...uint64)

// MustAllocID returns the new id by pd
MustAllocID() uint64
Expand Down Expand Up @@ -137,7 +150,7 @@ type defaultLocalStore struct {

func (ls *defaultLocalStore) BootstrapCluster(initResources ...Resource) {
if len(initResources) == 0 {
log.Fatalf("init resources can not empty")
log.Warnf("init with empty resources")
}

data, err := ls.db.get(containerKey)
Expand Down Expand Up @@ -166,7 +179,7 @@ func (ls *defaultLocalStore) BootstrapCluster(initResources ...Resource) {
log.Fatal("local container is not empty and has already had data")
}

data = make([]byte, 0, 8)
data = make([]byte, 8, 8)
binary.BigEndian.PutUint64(data, id)
err = ls.db.set(containerKey, data)
if err != nil {
Expand Down Expand Up @@ -199,20 +212,20 @@ func (ls *defaultLocalStore) BootstrapCluster(initResources ...Resource) {
ls.pd.GetRPC().TiggerContainerHeartbeat()
}

func (ls *defaultLocalStore) MustPutResource(res Resource) {
err := ls.db.putResource(res)
func (ls *defaultLocalStore) MustPutResource(res ...Resource) {
err := ls.db.putResource(res...)
if err != nil {
log.Fatalf("save resource %+v failed with %+v",
res,
err)
}
}

func (ls *defaultLocalStore) MustRemoveResource(resID uint64) {
err := ls.db.removeResource(resID)
func (ls *defaultLocalStore) MustRemoveResource(resIDs ...uint64) {
err := ls.db.removeResource(resIDs...)
if err != nil {
log.Fatalf("remove resource %d failed with %+v",
resID,
resIDs,
err)
}
}
Expand Down
45 changes: 40 additions & 5 deletions meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type ResourceKind int
const (
// LeaderKind leader
LeaderKind ResourceKind = iota
// ReplicaKind replica of resource
// ReplicaKind replication of resource
ReplicaKind
)

Expand Down Expand Up @@ -60,20 +60,34 @@ func (ps *PeerStats) Clone() *PeerStats {
}
}

// Resource resource
// Resource is an abstraction of data shard in a distributed system.
// Each Resource has multiple replication and is distributed on different nodes.
type Resource interface {
Serializable

// SetID update the resource id
SetID(id uint64)
// ID returns the resource id
ID() uint64
// Peers returns the repication peers
Peers() []*Peer
// SetPeers update the repication peers
SetPeers(peers []*Peer)
// Stale returns true if the other resource is older than current resource
Stale(other Resource) bool
// Changed returns true if the other resource is newer than current resource
Changed(other Resource) bool
Clone() Resource
// Labels returns the label pairs that determine which the resources will be scheduled to which nodes
Labels() []Pair
// Clone returns the cloned value
Clone() Resource

// ScaleCompleted returns true if the current resource has been successfully scaled according to the specified container
ScaleCompleted(uint64) bool

// Marshal returns error if marshal failed
Marshal() ([]byte, error)
// Unmarshal returns error if unmarshal failed
Unmarshal(data []byte) error
}

Expand All @@ -83,17 +97,38 @@ type Pair struct {
Value string `json:"value"`
}

// Container is the resource container, the resource is running on the container
// the container is usually a node
// Container is an abstraction of the node in a distributed system.
// Usually a container has many resoruces
type Container interface {
Serializable

// ShardAddr returns address that used for communication between the resource replications
ShardAddr() string
// SetID update the container id
SetID(id uint64)
// ID returns the container id
ID() uint64
// Labels returns the lable tag of the container
Labels() []Pair
// State returns the state of the container
State() State
// Clone returns the cloned value
Clone() Container
// ActionOnJoinCluster returns the cluster will do what when a new container join the cluster
ActionOnJoinCluster() Action

// Marshal returns error if marshal failed
Marshal() ([]byte, error)
// Unmarshal returns error if unmarshal failed
Unmarshal(data []byte) error
}

// Action the action on the cluster join the cluster
type Action int

const (
// NoneAction none action
NoneAction = Action(0)
// ScaleOutAction all resources will received a scale operation when a new container join the cluster
ScaleOutAction = Action(1)
)
Loading

0 comments on commit 021b4b1

Please sign in to comment.