Skip to content

Commit

Permalink
enhance: add rw/ro streaming query node replica management (#38677)
Browse files Browse the repository at this point in the history
issue: #38399

- Embed the query node into streaming node to make delegator available
at streaming node.
- The embedded query node has a special server label
`QUERYNODE_STREAMING-EMBEDDED`.
- Change the balance strategy to make the channel assigned to streaming
node as much as possible.

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jan 24, 2025
1 parent 8117d59 commit c84a074
Show file tree
Hide file tree
Showing 32 changed files with 1,145 additions and 123 deletions.
6 changes: 6 additions & 0 deletions cmd/milvus/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableIndexNode = true
case typeutil.StreamingNodeRole:
streamingutil.MustEnableStreamingService()
streamingutil.EnableEmbededQueryNode()
role.EnableStreamingNode = true
role.EnableQueryNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.EnableRootCoord = true
role.EnableProxy = true
Expand All @@ -175,6 +177,10 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableIndexNode = enableIndexNode
role.EnableProxy = enableProxy
role.EnableStreamingNode = enableStreamingNode
if enableStreamingNode && !enableQueryNode {
role.EnableQueryNode = true
streamingutil.EnableEmbededQueryNode()
}
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp())
os.Exit(-1)
Expand Down
106 changes: 106 additions & 0 deletions internal/coordinator/snmanager/streaming_node_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package snmanager

import (
"context"
"sync"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var StaticStreamingNodeManager = newStreamingNodeManager()

func newStreamingNodeManager() *StreamingNodeManager {
snm := &StreamingNodeManager{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
balancer: syncutil.NewFuture[balancer.Balancer](),
cond: syncutil.NewContextCond(&sync.Mutex{}),
latestAssignments: make(map[string]types.PChannelInfoAssigned),
streamingNodes: typeutil.NewUniqueSet(),
nodeChangedNotifier: syncutil.NewVersionedNotifier(),
}
go snm.execute()
return snm
}

// StreamingNodeManager is a manager for manage the querynode that embedded into streaming node.
// StreamingNodeManager is exclusive with ResourceManager.
type StreamingNodeManager struct {
notifier *syncutil.AsyncTaskNotifier[struct{}]
balancer *syncutil.Future[balancer.Balancer]
// The coord is merged after 2.6, so we don't need to make distribution safe.
cond *syncutil.ContextCond
latestAssignments map[string]types.PChannelInfoAssigned // The latest assignments info got from streaming coord balance module.
streamingNodes typeutil.UniqueSet
nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed.
}

// GetWALLocated returns the server id of the node that the wal of the vChannel is located.
func (s *StreamingNodeManager) GetWALLocated(vChannel string) int64 {
pchannel := funcutil.ToPhysicalChannel(vChannel)
var targetServerID int64

s.cond.L.Lock()
for {
if assignment, ok := s.latestAssignments[pchannel]; ok {
targetServerID = assignment.Node.ServerID
break
}
s.cond.Wait(context.Background())
}
s.cond.L.Unlock()
return targetServerID
}

// GetStreamingQueryNodeIDs returns the server ids of the streaming query nodes.
func (s *StreamingNodeManager) GetStreamingQueryNodeIDs() typeutil.UniqueSet {
s.cond.L.Lock()
defer s.cond.L.Unlock()
return s.streamingNodes.Clone()
}

// ListenNodeChanged returns a listener for node changed event.
func (s *StreamingNodeManager) ListenNodeChanged() *syncutil.VersionedListener {
return s.nodeChangedNotifier.Listen(syncutil.VersionedListenAtEarliest)
}

// SetBalancerReady set the balancer ready for the streaming node manager from streamingcoord initialization.
func (s *StreamingNodeManager) SetBalancerReady(b balancer.Balancer) {
s.balancer.Set(b)
}

func (s *StreamingNodeManager) execute() (err error) {
defer s.notifier.Finish(struct{}{})

balancer, err := s.balancer.GetWithContext(s.notifier.Context())
if err != nil {
return errors.Wrap(err, "failed to wait balancer ready")
}
for {
if err := balancer.WatchChannelAssignments(s.notifier.Context(), func(
version typeutil.VersionInt64Pair,
relations []types.PChannelInfoAssigned,
) error {
s.cond.LockAndBroadcast()
s.latestAssignments = make(map[string]types.PChannelInfoAssigned)
s.streamingNodes = typeutil.NewUniqueSet()
for _, relation := range relations {
s.latestAssignments[relation.Channel.Name] = relation
s.streamingNodes.Insert(relation.Node.ServerID)
}
s.nodeChangedNotifier.NotifyAll()
log.Info("streaming node manager updated", zap.Any("assignments", s.latestAssignments), zap.Any("streamingNodes", s.streamingNodes))
s.cond.L.Unlock()
return nil
}); err != nil {
return err
}
}
}
62 changes: 62 additions & 0 deletions internal/coordinator/snmanager/streaming_node_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package snmanager

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type pChannelInfoAssigned struct {
version typeutil.VersionInt64Pair
pchannels []types.PChannelInfoAssigned
}

func TestStreamingNodeManager(t *testing.T) {
m := newStreamingNodeManager()
b := mock_balancer.NewMockBalancer(t)

ch := make(chan pChannelInfoAssigned, 1)
b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).Run(
func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) {
for {
select {
case <-ctx.Done():
return
case p := <-ch:
cb(p.version, p.pchannels)
}
}
})
m.SetBalancerReady(b)

streamingNodes := m.GetStreamingQueryNodeIDs()
assert.Empty(t, streamingNodes)

ch <- pChannelInfoAssigned{
version: typeutil.VersionInt64Pair{
Global: 1,
Local: 1,
},
pchannels: []types.PChannelInfoAssigned{
{
Channel: types.PChannelInfo{Name: "a_test", Term: 1},
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost:1"},
},
},
}

listener := m.ListenNodeChanged()
err := listener.Wait(context.Background())
assert.NoError(t, err)

node := m.GetWALLocated("a_test")
assert.Equal(t, node, int64(1))
streamingNodes = m.GetStreamingQueryNodeIDs()
assert.Equal(t, len(streamingNodes), 1)
}
18 changes: 14 additions & 4 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -108,6 +109,8 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int
}

func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
nodes = filterSQNIfStreamingServiceEnabled(nodes)

// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !forceAssign {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand All @@ -122,22 +125,29 @@ func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int
if len(nodesInfo) == 0 {
return nil
}

plans := make([]ChannelAssignPlan, 0)
scoreDelta := make(map[int64]int)
if streamingutil.IsStreamingServiceEnabled() {
channels, plans, scoreDelta = assignChannelToWALLocatedFirstForNodeInfo(channels, nodesInfo)
}

sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1)
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1)+scoreDelta[id1], b.scheduler.GetChannelTaskDelta(id2, -1)+scoreDelta[id2]
return cnt1+delta1 < cnt2+delta2
})
ret := make([]ChannelAssignPlan, 0, len(channels))

for i, c := range channels {
plan := ChannelAssignPlan{
Channel: c,
From: -1,
To: nodesInfo[i%len(nodesInfo)].ID(),
}
ret = append(ret, plan)
plans = append(plans, plan)
}
return ret
return plans
}

func (b *RoundRobinBalancer) BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
Expand Down
16 changes: 14 additions & 2 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -67,6 +68,17 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
}
}()

if streamingutil.IsStreamingServiceEnabled() {
// Make a plan to rebalance the channel first.
// The Streaming QueryNode doesn't make the channel level score, so just fallback to the ScoreBasedBalancer.
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()
channelPlan := b.ScoreBasedBalancer.balanceChannels(ctx, br, replica, stoppingBalance)
// If the channelPlan is not empty, do it directly, don't do the segment balance.
if len(channelPlan) > 0 {
return nil, channelPlan
}
}

exclusiveMode := true
channels := b.targetMgr.GetDmChannelsByCollection(ctx, replica.GetCollectionID(), meta.CurrentTarget)
for channelName := range channels {
Expand Down Expand Up @@ -122,15 +134,15 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
if b.permitBalanceChannel(replica.GetCollectionID()) {
if b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, channelName, rwNodes, roNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, channelName, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) && !streamingutil.IsStreamingServiceEnabled() {
channelPlans = append(channelPlans, b.genChannelPlan(ctx, replica, channelName, rwNodes)...)
}

Expand Down
66 changes: 41 additions & 25 deletions internal/querycoordv2/balance/multi_target_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -485,48 +486,63 @@ func (b *MultiTargetBalancer) BalanceReplica(ctx context.Context, replica *meta.
}
}()

if replica.NodesCount() == 0 {
return nil, nil
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()

channelPlans = b.balanceChannels(ctx, br, replica, stoppingBalance)
if len(channelPlans) == 0 {
segmentPlans = b.balanceSegments(ctx, replica, stoppingBalance)
}
return
}

func (b *MultiTargetBalancer) balanceChannels(ctx context.Context, br *balanceReport, replica *meta.Replica, stoppingBalance bool) []ChannelAssignPlan {
var rwNodes, roNodes []int64
if streamingutil.IsStreamingServiceEnabled() {
rwNodes, roNodes = replica.GetRWSQNodes(), replica.GetROSQNodes()
} else {
rwNodes, roNodes = replica.GetRWNodes(), replica.GetRONodes()
}

if len(rwNodes) == 0 || !b.permitBalanceChannel(replica.GetCollectionID()) {
return nil
}

if len(roNodes) != 0 {
if !stoppingBalance {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
return nil
}
return b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)
}

if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
return b.genChannelPlan(ctx, br, replica, rwNodes)
}
return nil
}

func (b *MultiTargetBalancer) balanceSegments(ctx context.Context, replica *meta.Replica, stoppingBalance bool) []SegmentAssignPlan {
rwNodes := replica.GetRWNodes()
roNodes := replica.GetRONodes()

if len(rwNodes) == 0 {
// no available nodes to balance
return nil, nil
if len(rwNodes) == 0 || !b.permitBalanceSegment(replica.GetCollectionID()) {
return nil
}

// print current distribution before generating plans
segmentPlans, channelPlans = make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0)
if len(roNodes) != 0 {
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
if !stoppingBalance {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
return nil, nil
return nil
}

log.Info("Handle stopping nodes",
zap.Any("stopping nodes", roNodes),
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, rwNodes, roNodes)...)
}
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(ctx, br, replica, rwNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = b.genSegmentPlan(ctx, replica, rwNodes)
}
return b.genStoppingSegmentPlan(ctx, replica, rwNodes, roNodes)
}

return segmentPlans, channelPlans
return b.genSegmentPlan(ctx, replica, rwNodes)
}

func (b *MultiTargetBalancer) genSegmentPlan(ctx context.Context, replica *meta.Replica, rwNodes []int64) []SegmentAssignPlan {
Expand Down
Loading

0 comments on commit c84a074

Please sign in to comment.