Skip to content
This repository was archived by the owner on Sep 20, 2024. It is now read-only.

Commit 5af45bb

Browse files
committed
Drop dependency on wvanbergen/kazoo-go
1 parent 007c803 commit 5af45bb

File tree

20 files changed

+857
-1827
lines changed

20 files changed

+857
-1827
lines changed

Gopkg.lock

+1-9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

-4
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@
4141
name = "github.com/sirupsen/logrus"
4242
version = "1.0.1"
4343

44-
[[constraint]]
45-
branch = "master"
46-
name = "github.com/wvanbergen/kazoo-go"
47-
4844
[[constraint]]
4945
name = "google.golang.org/grpc"
5046
version = "1.15.0"

config/config.go

-13
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/Shopify/sarama"
1313
"github.com/pkg/errors"
14-
"github.com/wvanbergen/kazoo-go"
1514
"gopkg.in/yaml.v2"
1615
)
1716

@@ -271,18 +270,6 @@ func (pc PartitionerConstructor) ToPartitionerConstructor() (sarama.PartitionerC
271270
return v, nil
272271
}
273272

274-
func (p *Proxy) KazooCfg() *kazoo.Config {
275-
kazooCfg := kazoo.NewConfig()
276-
kazooCfg.Chroot = p.ZooKeeper.Chroot
277-
// ZooKeeper documentation says following about the session timeout: "The
278-
// current (ZooKeeper) implementation requires that the timeout be a
279-
// minimum of 2 times the tickTime (as set in the server configuration) and
280-
// a maximum of 20 times the tickTime". The default tickTime is 2 seconds.
281-
// See http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
282-
kazooCfg.Timeout = 15 * time.Second
283-
return kazooCfg
284-
}
285-
286273
// SaramaProducerCfg returns a config for sarama producer.
287274
func (p *Proxy) SaramaProducerCfg() *sarama.Config {
288275
saramaCfg := sarama.NewConfig()

consumer/consumerimpl/consumerimpl.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package consumerimpl
22

33
import (
4+
"time"
5+
46
"github.com/Shopify/sarama"
57
"github.com/mailgun/kafka-pixy/actor"
68
"github.com/mailgun/kafka-pixy/config"
@@ -9,7 +11,7 @@ import (
911
"github.com/mailgun/kafka-pixy/consumer/groupcsm"
1012
"github.com/mailgun/kafka-pixy/offsetmgr"
1113
"github.com/pkg/errors"
12-
"github.com/wvanbergen/kazoo-go"
14+
"github.com/samuel/go-zookeeper/zk"
1315
)
1416

1517
// T is a Kafka consumer implementation that automatically maintains consumer
@@ -29,7 +31,7 @@ type t struct {
2931
cfg *config.Proxy
3032
dispatcher *dispatcher.T
3133
kafkaClt sarama.Client
32-
kazooClt *kazoo.Kazoo
34+
zkConn *zk.Conn
3335
offsetMgrF offsetmgr.Factory
3436
}
3537

@@ -41,7 +43,13 @@ func Spawn(parentActDesc *actor.Descriptor, cfg *config.Proxy, offsetMgrF offset
4143
return nil, errors.Wrap(err, "failed to create Kafka client for message streams")
4244
}
4345

44-
kazooClt, err := kazoo.NewKazoo(cfg.ZooKeeper.SeedPeers, cfg.KazooCfg())
46+
// ZooKeeper documentation says following about the session timeout: "The
47+
// current (ZooKeeper) implementation requires that the timeout be a
48+
// minimum of 2 times the tickTime (as set in the server configuration) and
49+
// a maximum of 20 times the tickTime". The default tickTime is 2 seconds.
50+
// See http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
51+
sessionTimeout := 15 * time.Second
52+
zkConn, _, err := zk.Connect(cfg.ZooKeeper.SeedPeers, sessionTimeout)
4553
if err != nil {
4654
return nil, errors.Wrap(err, "failed to create kazoo.Kazoo")
4755
}
@@ -51,7 +59,7 @@ func Spawn(parentActDesc *actor.Descriptor, cfg *config.Proxy, offsetMgrF offset
5159
cfg: cfg,
5260
kafkaClt: kafkaClt,
5361
offsetMgrF: offsetMgrF,
54-
kazooClt: kazooClt,
62+
zkConn: zkConn,
5563
}
5664
c.dispatcher = dispatcher.Spawn(c.actDesc, c, c.cfg)
5765
return c, nil
@@ -73,7 +81,7 @@ func (c *t) AsyncConsume(group, topic string) <-chan consumer.Response {
7381
// implements `consumer.T`
7482
func (c *t) Stop() {
7583
c.dispatcher.Stop()
76-
c.kazooClt.Close()
84+
c.zkConn.Close()
7785
c.kafkaClt.Close()
7886
}
7987

@@ -84,7 +92,7 @@ func (c *t) KeyOf(rq consumer.Request) dispatcher.Key {
8492

8593
// implements `dispatcher.Factory`.
8694
func (c *t) SpawnChild(childSpec dispatcher.ChildSpec) {
87-
groupcsm.Spawn(c.actDesc, childSpec, c.cfg, c.kafkaClt, c.kazooClt, c.offsetMgrF)
95+
groupcsm.Spawn(c.actDesc, childSpec, c.cfg, c.kafkaClt, c.zkConn, c.offsetMgrF)
8896
}
8997

9098
// String returns a string ID of this instance to be used in logs.

consumer/groupcsm/groupcsm.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/mailgun/kafka-pixy/offsetmgr"
2020
"github.com/mailgun/kafka-pixy/prettyfmt"
2121
"github.com/pkg/errors"
22-
"github.com/wvanbergen/kazoo-go"
22+
"github.com/samuel/go-zookeeper/zk"
2323
)
2424

2525
// groupConsumer manages a fleet of topic consumers and disposes of those that
@@ -33,7 +33,7 @@ type T struct {
3333
group string
3434
dispatcher *dispatcher.T
3535
kafkaClt sarama.Client
36-
kazooClt *kazoo.Kazoo
36+
zkConn *zk.Conn
3737
msgFetcherF msgfetcher.Factory
3838
offsetMgrF offsetmgr.Factory
3939
subscriber *subscriber.T
@@ -45,8 +45,7 @@ type T struct {
4545
}
4646

4747
func Spawn(parentActDesc *actor.Descriptor, childSpec dispatcher.ChildSpec,
48-
cfg *config.Proxy, kafkaClt sarama.Client, kazooClt *kazoo.Kazoo,
49-
offsetMgrF offsetmgr.Factory,
48+
cfg *config.Proxy, kafkaClt sarama.Client, zkConn *zk.Conn, offsetMgrF offsetmgr.Factory,
5049
) *T {
5150
group := string(childSpec.Key())
5251
actDesc := parentActDesc.NewChild(fmt.Sprintf("%s", group))
@@ -56,13 +55,13 @@ func Spawn(parentActDesc *actor.Descriptor, childSpec dispatcher.ChildSpec,
5655
cfg: cfg,
5756
group: group,
5857
kafkaClt: kafkaClt,
59-
kazooClt: kazooClt,
58+
zkConn: zkConn,
6059
offsetMgrF: offsetMgrF,
6160
multiplexers: make(map[string]*multiplexer.T),
6261
topicCsmCh: make(chan *topiccsm.T, cfg.Consumer.ChannelBufferSize),
6362
}
6463

65-
gc.subscriber = subscriber.Spawn(gc.actDesc, gc.group, gc.cfg, gc.kazooClt)
64+
gc.subscriber = subscriber.Spawn(gc.actDesc, gc.group, gc.cfg, gc.zkConn)
6665
gc.msgFetcherF = msgfetcher.SpawnFactory(gc.actDesc, gc.cfg, gc.kafkaClt)
6766
actor.Spawn(gc.actDesc, &gc.wg, gc.run)
6867

consumer/partitioncsm/partitioncsm_test.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/mailgun/kafka-pixy/testhelpers"
1616
"github.com/mailgun/kafka-pixy/testhelpers/kafkahelper"
1717
log "github.com/sirupsen/logrus"
18-
"github.com/wvanbergen/kazoo-go"
1918
. "gopkg.in/check.v1"
2019
)
2120

@@ -58,10 +57,7 @@ func (s *PartitionCsmSuite) SetUpTest(c *C) {
5857

5958
s.ns = actor.Root().NewChild("T")
6059

61-
kazooClt, err := kazoo.NewKazoo(testhelpers.ZookeeperPeers, kazoo.NewConfig())
62-
c.Assert(err, IsNil)
63-
64-
s.groupMember = subscriber.Spawn(s.ns, group, s.cfg, kazooClt)
60+
s.groupMember = subscriber.Spawn(s.ns, group, s.cfg, s.kh.ZKConn())
6561
s.msgFetcherF = msgfetcher.SpawnFactory(s.ns, s.cfg, s.kh.KafkaClt())
6662
s.offsetMgrF = offsetmgr.SpawnFactory(s.ns, s.cfg, s.kh.KafkaClt())
6763

0 commit comments

Comments
 (0)