Skip to content

Commit

Permalink
fix: unittest
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Jan 23, 2025
1 parent 13ec841 commit 7c9ec9a
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 116 deletions.
13 changes: 13 additions & 0 deletions internal/coordinator/coordclient/test_utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,16 @@ func ResetRegistration() {
rootCoordClient: syncutil.NewFuture[types.RootCoordClient](),
}
}

// ResetQueryCoordRegistration resets the query coord client to initial state.
func ResetQueryCoordRegistration() {
glocalClient.queryCoordClient = syncutil.NewFuture[types.QueryCoordClient]()
}

func ResetRootCoordRegistration() {
glocalClient.rootCoordClient = syncutil.NewFuture[types.RootCoordClient]()
}

func ResetDataCoordRegistration() {
glocalClient.dataCoordClient = syncutil.NewFuture[types.DataCoordClient]()
}
7 changes: 5 additions & 2 deletions internal/distributed/streaming/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
Expand All @@ -33,6 +34,7 @@ func TestMain(m *testing.M) {
}

func TestStreamingBroadcast(t *testing.T) {
t.Skip("cat not running without streaming service at background")
streamingutil.SetStreamingServiceEnabled()
streaming.Init()
defer streaming.Release()
Expand Down Expand Up @@ -64,6 +66,7 @@ func TestStreamingBroadcast(t *testing.T) {
// repeated broadcast with same resource key should be rejected
resp2, err := streaming.WAL().Broadcast().Append(context.Background(), msg)
assert.Error(t, err)
assert.True(t, status.AsStreamingError(err).IsResourceAcquired())
assert.Nil(t, resp2)

// resource key should be block until ack.
Expand Down Expand Up @@ -115,7 +118,7 @@ func TestStreamingBroadcast(t *testing.T) {
}

func TestStreamingProduce(t *testing.T) {
t.Skip()
t.Skip("cat not running without streaming service at background")
streamingutil.SetStreamingServiceEnabled()
streaming.Init()
defer streaming.Release()
Expand Down Expand Up @@ -197,7 +200,7 @@ func TestStreamingProduce(t *testing.T) {
}

func TestStreamingConsume(t *testing.T) {
t.Skip()
t.Skip("cat not running without streaming service at background")
streaming.Init()
defer streaming.Release()
ch := make(message.ChanMessageHandler, 10)
Expand Down
7 changes: 3 additions & 4 deletions internal/streamingcoord/client/broadcast/watcher_resuming.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"github.com/pkg/errors"
"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -141,10 +141,9 @@ func (evs *pendingEvents) AddPendingEvent(ev *pendingEvent) bool {
if existEv, ok := evs.evs[id]; ok {
existEv.notifier = append(existEv.notifier, ev.notifier...)
return true
} else {
evs.evs[id] = ev
return false
}
evs.evs[id] = ev
return false
}

func (evs *pendingEvents) Notify(ev *message.BroadcastEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

var errResourceKeyHeld = errors.New("resource key is held")

// newBroadcastTaskManager creates a new broadcast task manager with recovery info.
func newBroadcastTaskManager(protos []*streamingpb.BroadcastTask) (*broadcastTaskManager, []*pendingBroadcastTask) {
logger := resource.Resource().Logger().With(log.FieldComponent("broadcaster"))
Expand Down Expand Up @@ -140,7 +139,7 @@ func (bm *broadcastTaskManager) addBroadcastTask(msg message.BroadcastMutableMes
// Check if the resource key is held by other task.
for key := range header.ResourceKeys {
if _, ok := bm.resourceKeys[key]; ok {
return nil, errors.Wrapf(errResourceKeyHeld, "domain: %s, key: %s", key.Domain.String(), key.Key)
return nil, status.NewResourceAcquired(fmt.Sprintf("domain: %s, key: %s", key.Domain.String(), key.Key))
}
}
// setup the resource keys to make resource exclusive held.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
internaltypes "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/idalloc"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/proto/messagespb"
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
Expand Down Expand Up @@ -118,8 +119,9 @@ func TestBroadcaster(t *testing.T) {
// Test broadcast with a already exist resource key.
for {
var err error
result, err = bc.Broadcast(context.Background(), createNewBroadcastMsg([]string{"v1", "v2", "v3"}, message.NewCollectionNameResourceKey("c7")))
if errors.Is(err, errResourceKeyHeld) {
_, err = bc.Broadcast(context.Background(), createNewBroadcastMsg([]string{"v1", "v2", "v3"}, message.NewCollectionNameResourceKey("c7")))
if err != nil {
assert.True(t, status.AsStreamingError(err).IsResourceAcquired())
break
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package broadcast

import (
"context"
"errors"
"io"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster"
Expand Down
10 changes: 10 additions & 0 deletions internal/util/streamingutil/status/streaming_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (e *StreamingError) IsTxnUnavilable() bool {
e.Code == streamingpb.StreamingCode_STREAMING_CODE_INVALID_TRANSACTION_STATE
}

// IsResourceAcquired returns true if the resource is acquired.
func (e *StreamingError) IsResourceAcquired() bool {
return e.Code == streamingpb.StreamingCode_STREAMING_CODE_RESOURCE_ACQUIRED
}

// NewOnShutdownError creates a new StreamingError with code STREAMING_CODE_ON_SHUTDOWN.
func NewOnShutdownError(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, format, args...)
Expand Down Expand Up @@ -116,6 +121,11 @@ func NewUnrecoverableError(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_UNRECOVERABLE, format, args...)
}

// NewResourceAcquired creates a new StreamingError with code STREAMING_CODE_RESOURCE_ACQUIRED.
func NewResourceAcquired(format string, args ...interface{}) *StreamingError {
return New(streamingpb.StreamingCode_STREAMING_CODE_RESOURCE_ACQUIRED, format, args...)
}

// New creates a new StreamingError with the given code and cause.
func New(code streamingpb.StreamingCode, format string, args ...interface{}) *StreamingError {
if len(args) == 0 {
Expand Down
6 changes: 6 additions & 0 deletions internal/util/streamingutil/status/streaming_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,10 @@ func TestStreamingError(t *testing.T) {
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_ON_SHUTDOWN, pbErr.Code)

streamingErr = NewResourceAcquired("test, %d", 1)
assert.Contains(t, streamingErr.Error(), "code: STREAMING_CODE_RESOURCE_ACQUIRED, cause: test, 1")
assert.False(t, streamingErr.IsWrongStreamingNode())
pbErr = streamingErr.AsPBError()
assert.Equal(t, streamingpb.StreamingCode_STREAMING_CODE_RESOURCE_ACQUIRED, pbErr.Code)
}
2 changes: 1 addition & 1 deletion pkg/proto/messagespb/messages.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/proto/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ enum BroadcastTaskState {
BROADCAST_TASK_STATE_UNKNOWN = 0; // should never used.
BROADCAST_TASK_STATE_PENDING = 1; // task is pending.
BROADCAST_TASK_STATE_DONE = 2; // task has been broadcasted and acknowledged, the resource lock is released, and the persisted task can be cleared.
BROADCAST_TASK_STATE_WAIT_ACK = 3; // task has been broadcasted, waiting for ack, the resource lock is still held by some vchannels.
BROADCAST_TASK_STATE_WAIT_ACK = 3; // task has been broadcasted, waiting for ack, the resource lock is still acquired by some vchannels.
}

// BroadcastTask is the task to broadcast the messake.
Expand Down Expand Up @@ -267,6 +267,7 @@ enum StreamingCode {
STREAMING_CODE_TRANSACTION_EXPIRED = 9; // transaction expired
STREAMING_CODE_INVALID_TRANSACTION_STATE = 10; // invalid transaction state
STREAMING_CODE_UNRECOVERABLE = 11; // unrecoverable error
STREAMING_CODE_RESOURCE_ACQUIRED = 12; // resource is acquired by other operation
STREAMING_CODE_UNKNOWN = 999; // unknown error
}

Expand Down
Loading

0 comments on commit 7c9ec9a

Please sign in to comment.