Skip to content

Commit

Permalink
fix: unittest
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 27, 2024
1 parent be3fa87 commit 4a4cd7a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
7 changes: 6 additions & 1 deletion internal/datanode/channel/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,14 @@ func (s *ChannelManagerSuite) TestSubmitSkip() {
func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
channel := "by-dev-rootcoord-dml-0"

stream, err := s.pipelineParams.MsgStreamFactory.NewTtMsgStream(context.Background())
s.NoError(err)
s.NotNil(stream)
stream.AsProducer(context.Background(), []string{channel})

// watch
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
err := s.manager.Submit(info)
err = s.manager.Submit(info)
s.NoError(err)

// wait for result
Expand Down
5 changes: 5 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (c *consumer) Close() {
err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName)
if err != nil {
log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err))
// TODO: current rocksmq does't promise the msgmutex will be closed in some unittest,
// make the consuming goroutine leak.
// Here add a dirty way to close it.
close(c.msgMutex)
return
}
<-c.stopCh
}
Expand Down

0 comments on commit 4a4cd7a

Please sign in to comment.