Skip to content

Commit

Permalink
refactor: support pass custom tag when report counter (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored Jan 26, 2021
1 parent 858063a commit 507e11c
Show file tree
Hide file tree
Showing 17 changed files with 265 additions and 164 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
bin/
.idea/
pegasus-tools-*
golangci-lint-1.29.0-linux-amd64
golangci-lint-*
coverage.txt
*.tar.gz
*.log
11 changes: 6 additions & 5 deletions check_and_format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
PROJECT_DIR=$(dirname "${SCRIPT_DIR}")
cd "${PROJECT_DIR}" || exit 1

if [ ! -f "${PROJECT_DIR}"/golangci-lint-1.29.0-linux-amd64/golangci-lint ]; then
wget https://github.com/golangci/golangci-lint/releases/download/v1.29.0/golangci-lint-1.29.0-linux-amd64.tar.gz
tar -xzvf golangci-lint-1.29.0-linux-amd64.tar.gz
if [ ! -f "${PROJECT_DIR}"/golangci-lint-1.35.2-linux-amd64/golangci-lint ]; then
wget https://github.com/golangci/golangci-lint/releases/download/v1.35.2/golangci-lint-1.35.2-linux-amd64.tar.gz
tar -xzvf golangci-lint-1.35.2-linux-amd64.tar.gz
fi

gofmt -l -w .
gofmt -l -w -s .
goimports -w .
go mod tidy
golangci-lint-1.29.0-linux-amd64/golangci-lint run
golangci-lint-1.35.2-linux-amd64/golangci-lint run -v -E gofmt -E golint
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type zookeeperOpts struct {

// metricsOpts used for init the perfCounter type(now support the Falcon and Prometheus) and
type metricsOpts struct {
Type string `mapstructure:"type"`
Tags map[string]string `mapstructure:"tags"`
Type string `mapstructure:"type"`
Tags []string `mapstructure:"tags"`
}

var GlobalConfig Configuration
Expand Down
7 changes: 2 additions & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestConfig(t *testing.T) {
Init("../meta-proxy.yml")
Init("yaml/meta-proxy-example.yml")
config := Configuration{
ZookeeperOpts: zookeeperOpts{
Address: []string{"127.0.0.1:22181", "127.0.0.2:22181"},
Expand All @@ -17,10 +17,7 @@ func TestConfig(t *testing.T) {
},
MetricsOpts: metricsOpts{
Type: "falcon",
Tags: map[string]string{
"region": "c3tst_staging",
"service": "meta_proxy",
},
Tags: []string{"region=local_tst", "service=meta_proxy"},
},
}

Expand Down
9 changes: 9 additions & 0 deletions config/yaml/meta-proxy-example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
zookeeper:
address: [127.0.0.1:22181,127.0.0.2:22181]
root: /pegasus-cluster
timeout: 1000 # ms
table_watcher_cache_capacity: 1024

metric:
type: falcon
tags: [region=local_tst,service=meta_proxy]
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (

func main() {
logrus.SetOutput(&lumberjack.Logger{
Filename: "meta-proxy.log",
Filename: os.Args[2],
MaxSize: 500, // MB
MaxAge: 7, // days
LocalTime: true,
})

config.Init(os.Args[1])
meta.Init()
err := rpc.Serve()
Expand Down
11 changes: 0 additions & 11 deletions meta-proxy.yml

This file was deleted.

36 changes: 20 additions & 16 deletions meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
"github.com/sirupsen/logrus"
)

// declare perfcounters
var tableWatcherEvictCount metrics.Gauge
var zkRequestCount metrics.Meter

var globalClusterManager *ClusterManager

Expand Down Expand Up @@ -47,7 +46,7 @@ type TableInfoWatcher struct {
}

func initClusterManager() {
tableWatcherEvictCount = metrics.RegisterGauge("table_watcher_cache_evict_count")
zkRequestCount = metrics.RegisterMeterWithTags("zk_request_count", []string{"table"})

zkAddrs := config.GlobalConfig.ZookeeperOpts.Address
zkConn, _, err := zk.Connect(config.GlobalConfig.ZookeeperOpts.Address,
Expand All @@ -58,7 +57,7 @@ func initClusterManager() {

tables := gcache.New(config.GlobalConfig.ZookeeperOpts.WatcherCount).LRU().EvictedFunc(func(key interface{}, value interface{}) {
value.(*TableInfoWatcher).ctx.cancel()
tableWatcherEvictCount.Inc()
logrus.Debugf("[%s] zk watcher is evicted", key.(string))
}).Build() // TODO(jiashuo1) consider set expire time
globalClusterManager = &ClusterManager{
ZkConn: zkConn,
Expand All @@ -67,14 +66,17 @@ func initClusterManager() {
}
}

func (m *ClusterManager) getMeta(table string) (*session.MetaManager, error) {
// return (metaAddr, metaManager, error)
func (m *ClusterManager) getMeta(table string) (string, *session.MetaManager, error) {
var addrs string
var meta *session.MetaManager

tableInfo, err := m.Tables.Get(table)
if err == nil {
addrs = tableInfo.(*TableInfoWatcher).metaAddrs
meta = m.Metas[tableInfo.(*TableInfoWatcher).metaAddrs]
if meta != nil {
return meta, nil
return addrs, meta, nil
}
}

Expand All @@ -86,28 +88,28 @@ func (m *ClusterManager) getMeta(table string) (*session.MetaManager, error) {
tableInfo, err = m.newTableInfo(table)
if err != nil {
logrus.Errorf("[%s] failed to get cluster info: %s", table, err)
return nil, err
return "", nil, err
}
err = m.Tables.Set(table, tableInfo)
if err != nil {
logrus.Errorf("[%s] failed to update local cache cluster info: %s", table, err)
return nil, base.ERR_INVALID_DATA
return "", nil, base.ERR_INVALID_DATA
}
}
tableInfoW := tableInfo.(*TableInfoWatcher)
metaAddrs := tableInfoW.metaAddrs
meta = m.Metas[metaAddrs]
addrs = tableInfoW.metaAddrs
meta = m.Metas[addrs]
if meta == nil {
metaList, err := parseToMetaList(metaAddrs)
metaList, err := parseToMetaList(addrs)
if err != nil {
logrus.Errorf("[%s] cluster addr[%s] format is err: %s", table, metaAddrs, err)
return nil, base.ERR_INVALID_DATA
logrus.Errorf("[%s] cluster addr[%s] format is err: %s", table, addrs, err)
return "", nil, base.ERR_INVALID_DATA
}
meta = session.NewMetaManager(metaList, session.NewNodeSession)
m.Metas[metaAddrs] = meta
m.Metas[addrs] = meta
}

return meta, nil
return addrs, meta, nil
}

// get table cluster info and watch it based table name from zk
Expand All @@ -118,6 +120,8 @@ func (m *ClusterManager) getMeta(table string) (*session.MetaManager, error) {
// "meta_addrs" : "metaAddr1,metaAddr2,metaAddr3"
// }
func (m *ClusterManager) newTableInfo(table string) (*TableInfoWatcher, error) {
zkRequestCount.UpdateWithTags([]string{table})

path := fmt.Sprintf("%s/%s", config.GlobalConfig.ZookeeperOpts.Root, table)
value, _, watcherEvent, err := m.ZkConn.GetW(path)
zkAddrs := config.GlobalConfig.ZookeeperOpts.Address
Expand Down Expand Up @@ -212,7 +216,7 @@ func parseToMetaList(metaAddrs string) ([]string, error) {
// /<table2> => {JSON}
func parseToTableName(path string) (string, error) {
result := strings.Split(path, "/")
if len(result) != 3 {
if len(result) < 3 {
return "", fmt.Errorf("the path[%s] is invalid", path)
}

Expand Down
26 changes: 13 additions & 13 deletions meta/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func initTestLog() {
// init the zk data
func init() {
initTestLog()
config.Init("../meta-proxy.yml")
config.Init("../config/yaml/meta-proxy-example.yml")
config.GlobalConfig.ZookeeperOpts.WatcherCount = 2
initClusterManager()

Expand Down Expand Up @@ -128,7 +128,7 @@ func TestGetMetaConnector(t *testing.T) {

// first get connector which will init the cache and only store `stat` and `test` table watcher
for _, test := range tests {
_, _ = globalClusterManager.getMeta(test.table)
_, _, _ = globalClusterManager.getMeta(test.table)
cacheWatcher, _ := globalClusterManager.Tables.Get(test.table)
assert.Equal(t, test.addr, cacheWatcher.(*TableInfoWatcher).metaAddrs)
}
Expand All @@ -144,7 +144,7 @@ func TestGetMetaConnector(t *testing.T) {
} else {
assert.Equal(t, test.addr, cacheWatcher.(*TableInfoWatcher).metaAddrs)
assert.NotNil(t, globalClusterManager.Metas[test.addr])
meta, _ := globalClusterManager.getMeta(test.table)
_, meta, _ := globalClusterManager.getMeta(test.table)
assert.NotNil(t, meta)
}
}
Expand All @@ -153,7 +153,7 @@ func TestGetMetaConnector(t *testing.T) {
func TestZookeeperUpdate(t *testing.T) {
zkRoot := config.GlobalConfig.ZookeeperOpts.Root
for _, test := range tests {
_, _ = globalClusterManager.getMeta(test.table)
_, _, _ = globalClusterManager.getMeta(test.table)
// update zookeeper node data and trigger the watch event update local cache
for _, update := range updates {
_, stat, _ := globalClusterManager.ZkConn.Get(test.path)
Expand All @@ -163,7 +163,7 @@ func TestZookeeperUpdate(t *testing.T) {
}

// local cache will change to new meta addr because the zk watcher
time.Sleep(time.Duration(10000000))
time.Sleep(time.Duration(100000000))
cacheWatcher, _ := globalClusterManager.Tables.Get(test.table)
assert.Equal(t, update.addr, cacheWatcher.(*TableInfoWatcher).metaAddrs)
}
Expand All @@ -185,29 +185,29 @@ func TestParseTablePath(t *testing.T) {
}

tablePaths := []table{
{
path: zkRoot + "/table",
result: nil,
},
{
path: zkRoot,
result: fmt.Errorf("the path[%s] is invalid", zkRoot),
},
{
path: zkRoot + "//table",
result: fmt.Errorf("the path[%s] is invalid", zkRoot+"//table"),
path: zkRoot + "/name",
result: nil,
},
{
path: zkRoot + "//name",
result: nil,
},
{
path: zkRoot + "/table/name",
result: fmt.Errorf("the path[%s] is invalid", zkRoot+"/table/name"),
result: nil,
},
}

for _, tb := range tablePaths {
ret, err := parseToTableName(tb.path)
assert.Equal(t, err, tb.result)
if err == nil {
assert.Equal(t, "table", ret)
assert.Equal(t, "name", ret)
}
}
}
Expand Down
15 changes: 10 additions & 5 deletions meta/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"github.com/XiaoMi/pegasus-go-client/idl/rrdb"
"github.com/pegasus-kv/meta-proxy/metrics"
"github.com/pegasus-kv/meta-proxy/rpc"
"github.com/sirupsen/logrus"
)

var clientQueryConfigQPS metrics.Meter
var clientQueryConfigCount metrics.Meter

func Init() {
clientQueryConfigQPS = metrics.RegisterMeter("client_query_config_qps")
clientQueryConfigCount = metrics.RegisterMeterWithTags("client_query_config_count", []string{"table"})
initClusterManager()

rpc.Register("RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX", &rpc.MethodDefinition{
Expand All @@ -27,12 +28,12 @@ func Init() {
}

func queryConfig(ctx context.Context, args rpc.RequestArgs) rpc.ResponseResult {
clientQueryConfigQPS.Update()

var errorCode *base.ErrorCode
queryCfgArgs := args.(*rrdb.MetaQueryCfgArgs)
tableName := queryCfgArgs.Query.AppName
meta, err := globalClusterManager.getMeta(tableName)
clientQueryConfigCount.UpdateWithTags([]string{tableName})

addrs, meta, err := globalClusterManager.getMeta(tableName)
if err != nil {
errorCode = parseToErrorCode(err)
return &rrdb.MetaQueryCfgResult{
Expand All @@ -52,6 +53,10 @@ func queryConfig(ctx context.Context, args rpc.RequestArgs) rpc.ResponseResult {
}
}

if resp.GetErr().Errno != base.ERR_OK.String() {
logrus.Errorf("[%s] failed to query config from [%s], err = %s", tableName, addrs, resp.Err)
}

return &rrdb.MetaQueryCfgResult{
Success: resp,
}
Expand Down
Loading

0 comments on commit 507e11c

Please sign in to comment.