Skip to content

Commit

Permalink
enhance: make runtime config into a global environment table (#38671)
Browse files Browse the repository at this point in the history
issue: #38399

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 24, 2024
1 parent 69a9fd6 commit 118678b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 73 deletions.
18 changes: 10 additions & 8 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os/signal"
"path/filepath"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -173,13 +172,6 @@ func NewMilvusRoles() *MilvusRoles {
return mr
}

// EnvValue not used now.
func (mr *MilvusRoles) EnvValue(env string) bool {
env = strings.ToLower(env)
env = strings.Trim(env, " ")
return env == "1" || env == "true"
}

func (mr *MilvusRoles) printLDPreLoad() {
const LDPreLoad = "LD_PRELOAD"
val, ok := os.LookupEnv(LDPreLoad)
Expand Down Expand Up @@ -449,45 +441,55 @@ func (mr *MilvusRoles) Run() {
if mr.EnableRootCoord {
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
paramtable.SetLocalComponentEnabled(typeutil.RootCoordRole)
}

if mr.EnableDataCoord {
dataCoord = mr.runDataCoord(ctx, local, &wg)
componentMap[typeutil.DataCoordRole] = dataCoord
paramtable.SetLocalComponentEnabled(typeutil.DataCoordRole)
}

if mr.EnableIndexCoord {
indexCoord = mr.runIndexCoord(ctx, local, &wg)
componentMap[typeutil.IndexCoordRole] = indexCoord
paramtable.SetLocalComponentEnabled(typeutil.IndexCoordRole)
}

if mr.EnableQueryCoord {
queryCoord = mr.runQueryCoord(ctx, local, &wg)
componentMap[typeutil.QueryCoordRole] = queryCoord
paramtable.SetLocalComponentEnabled(typeutil.QueryCoordRole)
}

if mr.EnableQueryNode {
queryNode = mr.runQueryNode(ctx, local, &wg)
componentMap[typeutil.QueryNodeRole] = queryNode
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
}

if mr.EnableDataNode {
dataNode = mr.runDataNode(ctx, local, &wg)
componentMap[typeutil.DataNodeRole] = dataNode
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
}
if mr.EnableIndexNode {
indexNode = mr.runIndexNode(ctx, local, &wg)
componentMap[typeutil.IndexNodeRole] = indexNode
paramtable.SetLocalComponentEnabled(typeutil.IndexNodeRole)
}

if mr.EnableProxy {
proxy = mr.runProxy(ctx, local, &wg)
componentMap[typeutil.ProxyRole] = proxy
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
}

if mr.EnableStreamingNode {
// Before initializing the local streaming node, make sure the local registry is ready.
streamingNode = mr.runStreamingNode(ctx, local, &wg)
componentMap[typeutil.StreamingNodeRole] = streamingNode
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
}

wg.Wait()
Expand Down
13 changes: 0 additions & 13 deletions cmd/roles/roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,6 @@ import (
)

func TestRoles(t *testing.T) {
r := MilvusRoles{}

assert.True(t, r.EnvValue("1"))
assert.True(t, r.EnvValue(" 1 "))
assert.True(t, r.EnvValue("True"))
assert.True(t, r.EnvValue(" True "))
assert.True(t, r.EnvValue(" TRue "))
assert.False(t, r.EnvValue("0"))
assert.False(t, r.EnvValue(" 0 "))
assert.False(t, r.EnvValue(" false "))
assert.False(t, r.EnvValue(" False "))
assert.False(t, r.EnvValue(" abc "))

ss := strings.SplitN("abcdef", "=", 2)
assert.Equal(t, len(ss), 1)
ss = strings.SplitN("adb=def", "=", 2)
Expand Down
13 changes: 7 additions & 6 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/shirou/gopsutil/v3/disk"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/config"
Expand Down Expand Up @@ -104,8 +105,6 @@ type ComponentParam struct {
StreamingCoordGrpcClientCfg GrpcClientConfig
StreamingNodeGrpcClientCfg GrpcClientConfig
IntegrationTestCfg integrationTestConfig

RuntimeConfig runtimeConfig
}

// Init initialize once
Expand Down Expand Up @@ -4850,11 +4849,13 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura
p.TxnDefaultKeepaliveTimeout.Init(base.mgr)
}

// runtimeConfig is just a private environment value table.
type runtimeConfig struct {
CreateTime RuntimeParamItem
UpdateTime RuntimeParamItem
Role RuntimeParamItem
NodeID RuntimeParamItem
createTime time.Time
updateTime time.Time
role string
nodeID atomic.Int64
components map[string]struct{}
}

type integrationTestConfig struct {
Expand Down
36 changes: 0 additions & 36 deletions pkg/util/paramtable/param_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,39 +396,3 @@ func getAndConvert[T any](v string, converter func(input string) (T, error), def
}
return t
}

type RuntimeParamItem struct {
value atomic.Value
}

func (rpi *RuntimeParamItem) GetValue() any {
return rpi.value.Load()
}

func (rpi *RuntimeParamItem) GetAsString() string {
value, ok := rpi.value.Load().(string)
if !ok {
return ""
}
return value
}

func (rpi *RuntimeParamItem) GetAsTime() time.Time {
value, ok := rpi.value.Load().(time.Time)
if !ok {
return time.Time{}
}
return value
}

func (rpi *RuntimeParamItem) GetAsInt64() int64 {
value, ok := rpi.value.Load().(int64)
if !ok {
return 0
}
return value
}

func (rpi *RuntimeParamItem) SetValue(value any) {
rpi.value.Store(value)
}
32 changes: 22 additions & 10 deletions pkg/util/paramtable/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
)

var (
once sync.Once
params ComponentParam
once sync.Once
params ComponentParam
runtimeParam = runtimeConfig{
components: make(map[string]struct{}, 0),
}
hookParams hookConfig
)

Expand Down Expand Up @@ -58,37 +61,46 @@ func GetHookParams() *hookConfig {
}

func SetNodeID(newID UniqueID) {
params.RuntimeConfig.NodeID.SetValue(newID)
runtimeParam.nodeID.Store(newID)
}

func GetNodeID() UniqueID {
return params.RuntimeConfig.NodeID.GetAsInt64()
return runtimeParam.nodeID.Load()
}

func GetStringNodeID() string {
return strconv.FormatInt(GetNodeID(), 10)
}

func SetRole(role string) {
params.RuntimeConfig.Role.SetValue(role)
runtimeParam.role = role
}

func GetRole() string {
return params.RuntimeConfig.Role.GetAsString()
return runtimeParam.role
}

func SetCreateTime(d time.Time) {
params.RuntimeConfig.CreateTime.SetValue(d)
runtimeParam.createTime = d
}

func GetCreateTime() time.Time {
return params.RuntimeConfig.CreateTime.GetAsTime()
return runtimeParam.createTime
}

func SetUpdateTime(d time.Time) {
params.RuntimeConfig.UpdateTime.SetValue(d)
runtimeParam.updateTime = d
}

func GetUpdateTime() time.Time {
return params.RuntimeConfig.UpdateTime.GetAsTime()
return runtimeParam.updateTime
}

func SetLocalComponentEnabled(component string) {
runtimeParam.components[component] = struct{}{}
}

func IsLocalComponentEnabled(component string) bool {
_, ok := runtimeParam.components[component]
return ok
}
12 changes: 12 additions & 0 deletions pkg/util/paramtable/service_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func TestServiceParam(t *testing.T) {
Expand Down Expand Up @@ -221,3 +222,14 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, 10000, Params.PaginationSize.GetAsInt())
})
}

func TestRuntimConfig(t *testing.T) {
SetRole(typeutil.StandaloneRole)
assert.Equal(t, GetRole(), typeutil.StandaloneRole)

SetLocalComponentEnabled(typeutil.QueryNodeRole)
assert.True(t, IsLocalComponentEnabled(typeutil.QueryNodeRole))

SetLocalComponentEnabled(typeutil.QueryCoordRole)
assert.True(t, IsLocalComponentEnabled(typeutil.QueryCoordRole))
}

0 comments on commit 118678b

Please sign in to comment.