Skip to content
This repository was archived by the owner on Sep 20, 2024. It is now read-only.

Commit d2bf590

Browse files
committed
Watch for group member subscription changes
We used to set watch only on the group membership. To notify group members about subscription changes members had to deregister and then register again.
1 parent 1393a2f commit d2bf590

File tree

6 files changed

+102
-124
lines changed

6 files changed

+102
-124
lines changed

CHANGELOG.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@
22

33
#### Version 0.14.1 (TBD)
44

5+
Implemented:
6+
* Consumer.RebalanceTimeout was removed, so rebalancing is triggered as soon
7+
as membership status of a consumer group or subscription of a consumer group
8+
member changes.
9+
510
Fixed:
611
* [#120](https://github.com/mailgun/kafka-pixy/pull/120) Consumption from a
7-
topic stopped a group
12+
topic stopped for a group.
813

914
#### Version 0.14.0 (2017-09-11)
1015

config/config.go

-3
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,6 @@ func (p *Proxy) validate() error {
376376
return errors.New("consumer.offsets_commit_interval must be > 0")
377377
case p.Consumer.OffsetsCommitTimeout <= 0:
378378
return errors.New("consumer.offsets_commit_timeout must be > 0")
379-
case p.Consumer.RebalanceDelay <= 0:
380-
return errors.New("consumer.rebalance_delay must be > 0")
381379
case p.Consumer.SubscriptionTimeout <= 0:
382380
return errors.New("consumer.subscription_timeout must be > 0")
383381
case p.Consumer.RetryBackoff <= 0:
@@ -428,7 +426,6 @@ func defaultProxyWithClientID(clientID string) *Proxy {
428426
c.Consumer.MaxRetries = -1
429427
c.Consumer.OffsetsCommitInterval = 500 * time.Millisecond
430428
c.Consumer.OffsetsCommitTimeout = 1500 * time.Millisecond
431-
c.Consumer.RebalanceDelay = 250 * time.Millisecond
432429
c.Consumer.SubscriptionTimeout = 15 * time.Second
433430
c.Consumer.RetryBackoff = 500 * time.Millisecond
434431
return c

consumer/subscriber/subscriber.go

+69-72
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package subscriber
22

33
import (
4+
"context"
45
"sort"
56
"sync"
67
"time"
@@ -23,13 +24,16 @@ const safeClaimRetriesCount = 10
2324
// leave and update their subscriptions, and generates notifications of such
2425
// changes. It also provides an API to for a partition consumer to claim and
2526
// release a group-topic-partition.
27+
//
28+
// FIXME: It is assumed that all members of the group are registered with the
29+
// FIXME: `static` pattern. If a member that pattern is either `white_list` or
30+
// FIXME: `black_list` joins the group the result will be unpredictable.
2631
type T struct {
2732
actDesc *actor.Descriptor
2833
cfg *config.Proxy
2934
group string
3035
groupZNode *kazoo.Consumergroup
3136
groupMemberZNode *kazoo.ConsumergroupInstance
32-
topics []string
3337
topicsCh chan []string
3438
subscriptionsCh chan map[string][]string
3539
stopCh chan none.T
@@ -149,133 +153,126 @@ func (ss *T) run() {
149153

150154
var (
151155
nilOrSubscriptionsCh chan<- map[string][]string
152-
nilOrGroupUpdatedCh <-chan zk.Event
156+
nilOrWatchCh <-chan none.T
153157
nilOrTimeoutCh <-chan time.Time
154-
pendingTopics []string
155-
pendingSubscriptions map[string][]string
158+
cancelWatch context.CancelFunc
156159
shouldSubmitTopics = false
157-
shouldFetchMembers = false
158160
shouldFetchSubscriptions = false
159-
members []*kazoo.ConsumergroupInstance
161+
topics []string
162+
subscriptions map[string][]string
160163
)
161164
for {
162165
select {
163-
case topics := <-ss.topicsCh:
164-
pendingTopics = normalizeTopics(topics)
165-
shouldSubmitTopics = !topicsEqual(pendingTopics, ss.topics)
166-
case nilOrSubscriptionsCh <- pendingSubscriptions:
166+
case topics = <-ss.topicsCh:
167+
sort.Strings(topics)
168+
shouldSubmitTopics = true
169+
case nilOrSubscriptionsCh <- subscriptions:
167170
nilOrSubscriptionsCh = nil
168-
case <-nilOrGroupUpdatedCh:
169-
nilOrGroupUpdatedCh = nil
170-
shouldFetchMembers = true
171+
case <-nilOrWatchCh:
172+
nilOrWatchCh = nil
173+
cancelWatch()
174+
shouldFetchSubscriptions = true
171175
case <-nilOrTimeoutCh:
176+
nilOrTimeoutCh = nil
172177
case <-ss.stopCh:
178+
if cancelWatch != nil {
179+
cancelWatch()
180+
}
173181
return
174182
}
175183

176184
if shouldSubmitTopics {
177-
if err = ss.submitTopics(pendingTopics); err != nil {
185+
if err = ss.submitTopics(topics); err != nil {
178186
ss.actDesc.Log().WithError(err).Error("Failed to submit topics")
179187
nilOrTimeoutCh = time.After(ss.cfg.Consumer.RetryBackoff)
180188
continue
181189
}
182-
ss.actDesc.Log().Infof("Submitted: topics=%v", pendingTopics)
190+
ss.actDesc.Log().Infof("Submitted: topics=%v", topics)
183191
shouldSubmitTopics = false
184-
shouldFetchMembers = true
185-
}
186-
187-
if shouldFetchMembers {
188-
members, nilOrGroupUpdatedCh, err = ss.groupZNode.WatchInstances()
189-
if err != nil {
190-
ss.actDesc.Log().WithError(err).Error("Failed to watch members")
191-
nilOrTimeoutCh = time.After(ss.cfg.Consumer.RetryBackoff)
192-
continue
192+
if cancelWatch != nil {
193+
cancelWatch()
193194
}
194-
shouldFetchMembers = false
195195
shouldFetchSubscriptions = true
196-
// To avoid unnecessary rebalancing in case of a deregister/register
197-
// sequences that happen when a member updates its topic subscriptions,
198-
// we delay subscription fetching.
199-
nilOrTimeoutCh = time.After(ss.cfg.Consumer.RebalanceDelay)
200-
continue
201196
}
202197

203198
if shouldFetchSubscriptions {
204-
pendingSubscriptions, err = ss.fetchSubscriptions(members)
199+
subscriptions, nilOrWatchCh, cancelWatch, err = ss.fetchSubscriptions()
205200
if err != nil {
206201
ss.actDesc.Log().WithError(err).Error("Failed to fetch subscriptions")
207202
nilOrTimeoutCh = time.After(ss.cfg.Consumer.RetryBackoff)
208203
continue
209204
}
210205
shouldFetchSubscriptions = false
211-
ss.actDesc.Log().Infof("Fetched subscriptions: %v", pendingSubscriptions)
206+
ss.actDesc.Log().Infof("Fetched subscriptions: %v", subscriptions)
212207
nilOrSubscriptionsCh = ss.subscriptionsCh
213208
}
214209
}
215210
}
216211

217-
// fetchSubscriptions retrieves registration records for the specified members
218-
// from ZooKeeper.
219-
//
220-
// FIXME: It is assumed that all members of the group are registered with the
221-
// FIXME: `static` pattern. If a member that pattern is either `white_list` or
222-
// FIXME: `black_list` joins the group the result will be unpredictable.
223-
func (ss *T) fetchSubscriptions(members []*kazoo.ConsumergroupInstance) (map[string][]string, error) {
212+
// fetchSubscriptions retrieves subscription topics for all group members and
213+
// returns a channel that will be closed
214+
func (ss *T) fetchSubscriptions() (map[string][]string, <-chan none.T, context.CancelFunc, error) {
215+
members, groupUpdateWatchCh, err := ss.groupZNode.WatchInstances()
216+
if err != nil {
217+
return nil, nil, nil, errors.Wrapf(err, "failed to watch members")
218+
}
219+
220+
memberUpdateWatchChs := make([]<-chan zk.Event, len(members))
224221
subscriptions := make(map[string][]string, len(members))
225-
for _, member := range members {
222+
for i, member := range members {
226223
var registration *kazoo.Registration
227-
registration, err := member.Registration()
224+
registration, memberUpdateWatchCh, err := member.WatchRegistration()
228225
for err != nil {
229-
return nil, errors.Wrapf(err, "Failed to fetch registration, member=%s", member.ID)
226+
return nil, nil, nil, errors.Wrapf(err, "failed to watch registration, member=%s", member.ID)
230227
}
231-
// Sort topics to ensure deterministic output.
228+
memberUpdateWatchChs[i] = memberUpdateWatchCh
229+
232230
topics := make([]string, 0, len(registration.Subscription))
233231
for topic := range registration.Subscription {
234232
topics = append(topics, topic)
235233
}
236-
subscriptions[member.ID] = normalizeTopics(topics)
234+
// Sort topics to ensure deterministic output.
235+
sort.Strings(topics)
236+
subscriptions[member.ID] = topics
237237
}
238-
return subscriptions, nil
238+
239+
aggregateWatchCh := make(chan none.T)
240+
ctx, cancel := context.WithCancel(context.Background())
241+
242+
go forwardWatch(ctx, groupUpdateWatchCh, aggregateWatchCh)
243+
for _, memberUpdateWatchCh := range memberUpdateWatchChs {
244+
go forwardWatch(ctx, memberUpdateWatchCh, aggregateWatchCh)
245+
}
246+
247+
return subscriptions, aggregateWatchCh, cancel, nil
239248
}
240249

241250
func (ss *T) submitTopics(topics []string) error {
242-
if len(ss.topics) != 0 {
251+
if len(topics) == 0 {
243252
err := ss.groupMemberZNode.Deregister()
244253
if err != nil && err != kazoo.ErrInstanceNotRegistered {
245-
return errors.Wrap(err, "Failed to deregister")
254+
return errors.Wrap(err, "failed to deregister")
246255
}
256+
return nil
247257
}
248-
ss.topics = nil
249-
if len(topics) != 0 {
250-
err := ss.groupMemberZNode.Register(topics)
251-
for err != nil {
252-
return errors.Wrap(err, "Failed to register")
253-
}
258+
err := ss.groupMemberZNode.Register(topics)
259+
for err != nil {
260+
return errors.Wrap(err, "failed to register")
254261
}
255-
ss.topics = topics
256262
return nil
257263
}
258264

259-
func normalizeTopics(s []string) []string {
260-
if len(s) == 0 {
261-
return nil
262-
}
263-
sort.Strings(s)
264-
return s
265+
func millisSince(t time.Time) time.Duration {
266+
return time.Now().Sub(t) / time.Millisecond * time.Millisecond
265267
}
266268

267-
func topicsEqual(lhs, rhs []string) bool {
268-
if len(lhs) != len(rhs) {
269-
return false
270-
}
271-
for i := range lhs {
272-
if lhs[i] != rhs[i] {
273-
return false
269+
func forwardWatch(ctx context.Context, watchCh <-chan zk.Event, downstreamCh chan<- none.T) {
270+
select {
271+
case <-watchCh:
272+
select {
273+
case downstreamCh <- none.V:
274+
case <-ctx.Done():
274275
}
276+
case <-ctx.Done():
275277
}
276-
return true
277-
}
278-
279-
func millisSince(t time.Time) time.Duration {
280-
return time.Now().Sub(t) / time.Millisecond * time.Millisecond
281278
}

consumer/subscriber/subscriber_test.go

+27-43
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package subscriber
22

33
import (
4+
"reflect"
45
"sync"
56
"testing"
67
"time"
@@ -35,25 +36,6 @@ func (s *GroupMemberSuite) SetUpTest(c *C) {
3536
s.ns = actor.Root().NewChild("T")
3637
}
3738

38-
func (s *GroupMemberSuite) TestNormalizeTopics(c *C) {
39-
c.Assert(normalizeTopics(nil), DeepEquals, []string(nil))
40-
c.Assert(normalizeTopics([]string{}), DeepEquals, []string(nil))
41-
c.Assert(normalizeTopics([]string{"c", "a", "b"}), DeepEquals, []string{"a", "b", "c"})
42-
43-
c.Assert(normalizeTopics([]string{"c", "a", "b"}), Not(DeepEquals), []string{"a", "b"})
44-
}
45-
46-
func (s *GroupMemberSuite) TestTopicsEqual(c *C) {
47-
c.Assert(topicsEqual([]string{}, nil), Equals, true)
48-
c.Assert(topicsEqual(nil, []string{}), Equals, true)
49-
c.Assert(topicsEqual([]string{}, []string{}), Equals, true)
50-
c.Assert(topicsEqual([]string{"a"}, []string{"a"}), Equals, true)
51-
c.Assert(topicsEqual([]string{"a", "b", "c"}, []string{"a", "b", "c"}), Equals, true)
52-
53-
c.Assert(topicsEqual([]string{"a", "b", "c"}, []string{"a", "b"}), Equals, false)
54-
c.Assert(topicsEqual([]string{"a", "b"}, []string{"b", "a"}), Equals, false)
55-
}
56-
5739
// When a list of topics is sent to the `topics()` channel, a membership change
5840
// is received with the same list of topics for the registrator name.
5941
func (s *GroupMemberSuite) TestSimpleSubscribe(c *C) {
@@ -86,8 +68,8 @@ func (s *GroupMemberSuite) TestSubscribeSequence(c *C) {
8668
map[string][]string{"m1": {"bazz", "blah"}})
8769
}
8870

89-
// If a group member resubscribes to the same list of topics, then nothing is
90-
// updated.
71+
// If a group member resubscribes to the same list of topics, then the same
72+
// member subscriptions are returned.
9173
func (s *GroupMemberSuite) TestReSubscribe(c *C) {
9274
cfg1 := newConfig("m1")
9375
ss1 := Spawn(s.ns.NewChild("m1"), "g1", cfg1, s.kazooClt)
@@ -103,20 +85,15 @@ func (s *GroupMemberSuite) TestReSubscribe(c *C) {
10385
"m1": {"bar", "foo"},
10486
"m2": {"bar", "bazz"},
10587
}
106-
c.Assert(<-ss1.Subscriptions(), DeepEquals, membership)
107-
c.Assert(<-ss2.Subscriptions(), DeepEquals, membership)
88+
wait4Subscription(c, ss1.Subscriptions(), membership, 3*time.Second)
89+
wait4Subscription(c, ss2.Subscriptions(), membership, 3*time.Second)
10890

10991
// When
11092
ss1.Topics() <- []string{"foo", "bar"}
11193

11294
// Then
113-
select {
114-
case update := <-ss1.Subscriptions():
115-
c.Errorf("Unexpected update: %v", update)
116-
case update := <-ss2.Subscriptions():
117-
c.Errorf("Unexpected update: %v", update)
118-
case <-time.After(300 * time.Millisecond):
119-
}
95+
wait4Subscription(c, ss1.Subscriptions(), membership, 3*time.Second)
96+
wait4Subscription(c, ss2.Subscriptions(), membership, 3*time.Second)
12097
}
12198

12299
// To unsubscribe from all topics an empty topic list can be sent.
@@ -177,11 +154,9 @@ func (s *GroupMemberSuite) TestSomethingAfterNothingBug(c *C) {
177154
defer ss1.Stop()
178155

179156
ss1.Topics() <- []string{"foo"}
180-
c.Assert(<-ss1.Subscriptions(), DeepEquals,
181-
map[string][]string{"m1": {"foo"}})
157+
c.Assert(<-ss1.Subscriptions(), DeepEquals, map[string][]string{"m1": {"foo"}})
182158
ss1.Topics() <- []string{}
183-
c.Assert(<-ss1.Subscriptions(), DeepEquals,
184-
map[string][]string{})
159+
c.Assert(<-ss1.Subscriptions(), DeepEquals, map[string][]string{})
185160

186161
// When
187162
ss1.Topics() <- []string{"foo"}
@@ -237,20 +212,17 @@ func (s *GroupMemberSuite) TestRedundantUpdateBug(c *C) {
237212
ss1.Topics() <- []string{"foo", "bar"}
238213
ss2.Topics() <- []string{"foo", "bazz", "blah"}
239214

240-
c.Assert(<-ss1.Subscriptions(), DeepEquals,
241-
map[string][]string{
242-
"m1": {"bar", "foo"},
243-
"m2": {"bazz", "blah", "foo"}})
215+
membership := map[string][]string{
216+
"m1": {"bar", "foo"},
217+
"m2": {"bazz", "blah", "foo"}}
218+
c.Assert(<-ss1.Subscriptions(), DeepEquals, membership)
244219

245220
// When
246221
ss2.Topics() <- []string{"bar"}
247222
ss2.Topics() <- []string{"foo", "bazz", "blah"}
248223

249224
// Then
250-
c.Assert(<-ss1.Subscriptions(), DeepEquals,
251-
map[string][]string{
252-
"m1": {"bar", "foo"},
253-
"m2": {"bazz", "blah", "foo"}})
225+
wait4Subscription(c, ss1.Subscriptions(), membership, 3*time.Second)
254226
}
255227

256228
// When a group registrator claims a topic partitions it becomes its owner.
@@ -426,6 +398,18 @@ func partitionOwner(gm *T, topic string, partition int32) (string, error) {
426398
func newConfig(clientId string) *config.Proxy {
427399
cfg := config.DefaultProxy()
428400
cfg.ClientID = clientId
429-
cfg.Consumer.RebalanceDelay = 100 * time.Millisecond
430401
return cfg
431402
}
403+
404+
func wait4Subscription(c *C, ch <-chan map[string][]string, want map[string][]string, timeout time.Duration) {
405+
for {
406+
select {
407+
case got := <-ch:
408+
if reflect.DeepEqual(got, want) {
409+
return
410+
}
411+
case <-time.After(timeout):
412+
c.Error("Timeout waiting for %v")
413+
}
414+
}
415+
}

default.yaml

-4
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,6 @@ proxies:
126126
# How frequently to commit offsets to Kafka.
127127
offsets_commit_interval: 500ms
128128

129-
# Consumer should wait this long after it gets notification that a
130-
# consumer joined/left its consumer group before starting rebalancing.
131-
rebalance_delay: 250ms
132-
133129
# If a request to a Kafka-Pixy fails for any reason, then it should wait this
134130
# long before retrying.
135131
retry_backoff: 500ms

0 commit comments

Comments
 (0)