Skip to content

Commit

Permalink
fix: kafka metadata interface
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed May 16, 2023
1 parent 4f29924 commit cf7d757
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions kafka/metadata/kafka_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"sync"

"github.com/Trendyol/go-dcp-client/wrapper"

"github.com/Trendyol/go-dcp-client/metadata"
"github.com/Trendyol/go-dcp-client/models"

Expand Down Expand Up @@ -49,7 +51,7 @@ func (s *kafkaMetadata) Save(state map[uint16]*models.CheckpointDocument, dirtyO
func (s *kafkaMetadata) Load( //nolint:funlen
vbIDs []uint16,
bucketUUID string,
) (map[uint16]*models.CheckpointDocument, bool, error) {
) (*wrapper.SyncMap[uint16, *models.CheckpointDocument], bool, error) {
partitions, err := s.kafkaClient.GetPartitions(s.topic)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -93,8 +95,7 @@ func (s *kafkaMetadata) Load( //nolint:funlen
}(consumer, endOffset.LastOffset)
}

state := map[uint16]*models.CheckpointDocument{}
stateLock := &sync.Mutex{}
state := &wrapper.SyncMap[uint16, *models.CheckpointDocument]{}
exist := false

go func() {
Expand All @@ -110,9 +111,7 @@ func (s *kafkaMetadata) Load( //nolint:funlen

vbID, err := strconv.ParseUint(string(m.Key), 0, 16)
if err == nil {
stateLock.Lock()
state[uint16(vbID)] = doc
stateLock.Unlock()
state.Store(uint16(vbID), doc)
} else {
s.logger.Printf("cannot load checkpoint, vbID: %d %v", vbID, err)
panic(err)
Expand All @@ -123,9 +122,7 @@ func (s *kafkaMetadata) Load( //nolint:funlen
wg.Wait()

for _, vbID := range vbIDs {
if _, ok := state[vbID]; !ok {
state[vbID] = models.NewEmptyCheckpointDocument(bucketUUID)
}
state.LoadOrStore(vbID, models.NewEmptyCheckpointDocument(bucketUUID))
}

return state, exist, nil
Expand Down

0 comments on commit cf7d757

Please sign in to comment.