Skip to content

Commit

Permalink
allow reset of consumer-group in dead state (fixes #217)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Feb 11, 2025
1 parent e7b6bdc commit d1a3c30
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- [#227](https://github.com/deviceinsight/kafkactl/issues/227) Incorrect handling of Base64-encoded values when producing from JSON
- [#228](https://github.com/deviceinsight/kafkactl/issues/228) Fix parsing version of gcloud kubectl.
- [#217](https://github.com/deviceinsight/kafkactl/issues/217) Allow reset of consumer-group in dead state

## 5.4.0 - 2024-11-28
### Added
Expand Down
30 changes: 11 additions & 19 deletions internal/consumergroupoffsets/consumer-group-offset-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
)

type ResetConsumerGroupOffsetFlags struct {
Topic []string
AllTopics bool
Partition int32
Offset int64
OldestOffset bool
NewestOffset bool
Execute bool
OutputFormat string
allowedGroupState string
ToDatetime string
Topic []string
AllTopics bool
Partition int32
Offset int64
OldestOffset bool
NewestOffset bool
Execute bool
OutputFormat string
ToDatetime string
}

type ConsumerGroupOffsetOperation struct {
Expand Down Expand Up @@ -91,20 +90,14 @@ func (operation *ConsumerGroupOffsetOperation) ResetConsumerGroupOffset(flags Re

output.Debugf("reset consumer-group offset for topics: %v", topics)

if flags.allowedGroupState == "" {
// a reset is only allowed if group is empty (no one in the group)
// for creation of the group state "Dead" is allowed
flags.allowedGroupState = "Empty"
}

if flags.Execute {
if descriptions, err = admin.DescribeConsumerGroups([]string{groupName}); err != nil {
return errors.Wrap(err, "failed to describe consumer group")
}

for _, description := range descriptions {
// https://stackoverflow.com/a/61745884/1115279
if description.State != flags.allowedGroupState {
// https://stackoverflow.com/a/61745884
if description.State != "Empty" && description.State != "Dead" {
return errors.Errorf("cannot reset offsets on consumer group %s. There are consumers assigned (state: %s)", groupName, description.State)
}
}
Expand Down Expand Up @@ -155,7 +148,6 @@ func (operation *ConsumerGroupOffsetOperation) ResetConsumerGroupOffset(flags Re

func (operation *ConsumerGroupOffsetOperation) CreateConsumerGroup(flags ResetConsumerGroupOffsetFlags, group string) error {

flags.allowedGroupState = "Dead"
flags.Execute = true
flags.OutputFormat = "none"

Expand Down

0 comments on commit d1a3c30

Please sign in to comment.