Skip to content

Commit

Permalink
fix: the close operation of rmq consumer is not sync (#38734)
Browse files Browse the repository at this point in the history
issue: #38399

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 30, 2024
1 parent ecc820e commit 56c5b66
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 2 deletions.
7 changes: 6 additions & 1 deletion internal/datanode/channel/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,14 @@ func (s *ChannelManagerSuite) TestSubmitSkip() {
func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
channel := "by-dev-rootcoord-dml-0"

stream, err := s.pipelineParams.MsgStreamFactory.NewTtMsgStream(context.Background())
s.NoError(err)
s.NotNil(stream)
stream.AsProducer(context.Background(), []string{channel})

// watch
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
err := s.manager.Submit(info)
err = s.manager.Submit(info)
s.NoError(err)

// wait for result
Expand Down
4 changes: 4 additions & 0 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ func (s *DataNodeServicesSuite) TestCompaction() {

func (s *DataNodeServicesSuite) TestFlushSegments() {
dmChannelName := "fake-by-dev-rootcoord-dml-channel-test-FlushSegments"
stream, err := s.node.factory.NewTtMsgStream(context.Background())
s.NoError(err)
s.NotNil(stream)
stream.AsProducer(context.Background(), []string{dmChannelName})
schema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
Expand Down
5 changes: 4 additions & 1 deletion pkg/mq/mqimpl/rocksmq/client/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
}

func (c *client) consume(consumer *consumer) {
defer c.wg.Done()
defer func() {
close(consumer.stopCh)
c.wg.Done()
}()

if err := c.blockUntilInitDone(consumer); err != nil {
log.Warn("consumer init failed", zap.Error(err))
Expand Down
8 changes: 8 additions & 0 deletions pkg/mq/mqimpl/rocksmq/client/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type consumer struct {

startOnce sync.Once

stopCh chan struct{}
msgMutex chan struct{}
initCh chan struct{}
messageCh chan common.Message
Expand Down Expand Up @@ -58,6 +59,7 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
client: c,
consumerName: options.SubscriptionName,
options: options,
stopCh: make(chan struct{}),
msgMutex: make(chan struct{}, 1),
initCh: initCh,
messageCh: messageCh,
Expand Down Expand Up @@ -133,7 +135,13 @@ func (c *consumer) Close() {
err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName)
if err != nil {
log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err))
// TODO: current rocksmq does't promise the msgmutex will be closed in some unittest,
// make the consuming goroutine leak.
// Here add a dirty way to close it.
close(c.msgMutex)
return
}
<-c.stopCh
}

func (c *consumer) GetLatestMsgID() (int64, error) {
Expand Down

0 comments on commit 56c5b66

Please sign in to comment.