Skip to content

Commit

Permalink
Kafka partition selection based on bifrost partition method (#122)
Browse files Browse the repository at this point in the history
* Kafka partition selection based on bifrost partition method

* expose kafka partition configuration

* set default

* remove breaks

* Update transport/transporters/kafka/batch/batch.go

Co-authored-by: Ruban Rengaraju <[email protected]>

---------

Co-authored-by: Ruban Rengaraju <[email protected]>
  • Loading branch information
stlava and ruban-rengaraju authored Sep 15, 2023
1 parent ef2aee8 commit 0a9c5e3
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 30 deletions.
9 changes: 5 additions & 4 deletions itests/tests/kafka/test_txn_partition/envfile.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
EXPECTED_COUNT=21
# Bifrost
WORKERS=1
KAFKA_PARTITION_METHOD=transaction-constant

# Verifier
KAFKA_PARTITION_COUNT=2
WORKERS=3
PARTITION_METHOD=transaction
BATCHER_ROUTING_METHOD=round-robin
EXPECTED_COUNT=21
52 changes: 41 additions & 11 deletions transport/transporters/kafka/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package batch
import (
"time"

"github.com/google/uuid"

"github.com/Nextdoor/pg-bifrost.git/marshaller"
"github.com/Nextdoor/pg-bifrost.git/transport/progress"
"github.com/Nextdoor/pg-bifrost.git/transport/transporters/kafka/utils"
"github.com/Shopify/sarama"

"github.com/Nextdoor/pg-bifrost.git/transport"
Expand All @@ -29,21 +32,29 @@ import (
)

type KafkaBatch struct {
maxBatchSize int
maxMessageBytes int
kafkaMessages []*sarama.ProducerMessage
transactions *ordered_map.OrderedMap
byteSize int64
mtime int64
ctime int64
partitionKey string
topic string
maxBatchSize int
maxMessageBytes int
kafkaMessages []*sarama.ProducerMessage
transactions *ordered_map.OrderedMap
byteSize int64
mtime int64
ctime int64
partitionKey string // this is the partition key for bifrost not kafka
topic string
kafkaPartMethod utils.KafkaPartitionMethod
kafkaPartitionKey string
}

func NewKafkaBatch(topic string, partitionKey string, maxBathSize, maxMessageBytes int) transport.Batch {
func NewKafkaBatch(topic string, partitionKey string, maxBathSize, maxMessageBytes int, kafkaPartMethod utils.KafkaPartitionMethod) transport.Batch {
messages := []*sarama.ProducerMessage{}
transactions := ordered_map.NewOrderedMap()

// random kafka partition key when using batch partitioning
kafkaPartitionKey := ""
if kafkaPartMethod == utils.KAFKA_PART_BATCH {
kafkaPartitionKey = uuid.NewString()
}

return &KafkaBatch{
maxBathSize,
maxMessageBytes,
Expand All @@ -54,6 +65,8 @@ func NewKafkaBatch(topic string, partitionKey string, maxBathSize, maxMessageByt
time.Now().UnixNano(),
partitionKey,
topic,
kafkaPartMethod,
kafkaPartitionKey,
}
}

Expand All @@ -70,7 +83,24 @@ func (b *KafkaBatch) Add(msg *marshaller.MarshalledMessage) (bool, error) {
kafkaMsg := &sarama.ProducerMessage{
Topic: b.topic,
Value: sarama.ByteEncoder(msg.Json),
Key: sarama.StringEncoder(msg.PartitionKey),
}

// Set partition key
switch b.kafkaPartMethod {
case utils.KAFKA_PART_TXN:
// Use the time based key which is a composite of the txn + time because it adds more entropy
// when the sarama hasher picks a partition based on hashing this value.
kafkaMsg.Key = sarama.StringEncoder(msg.TimeBasedKey)
case utils.KAFKA_PART_TXN_CONST:
// Similar to above but low entropy so that it's consistent for testing
kafkaMsg.Key = sarama.StringEncoder(msg.Transaction)
case utils.KAFKA_PART_BATCH:
kafkaMsg.Key = sarama.StringEncoder(b.kafkaPartitionKey)
case utils.KAFKA_PART_TABLE_NAME:
kafkaMsg.Key = sarama.StringEncoder(msg.Table)
case utils.KAFKA_PART_RANDOM:
// causes Sarama to use random partitioning
kafkaMsg.Key = nil
}

// Sarama client only permits messages up to size `MaxMessageBytes`
Expand Down
18 changes: 10 additions & 8 deletions transport/transporters/kafka/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package batch
import (
"testing"

"github.com/Nextdoor/pg-bifrost.git/transport/transporters/kafka/utils"

"github.com/Nextdoor/pg-bifrost.git/marshaller"
"github.com/Nextdoor/pg-bifrost.git/transport"
"github.com/Nextdoor/pg-bifrost.git/transport/progress"
Expand All @@ -12,7 +14,7 @@ import (
)

func TestAddTransaction(t *testing.T) {
b := NewKafkaBatch("test-topic", "", 1, 1000000)
b := NewKafkaBatch("test-topic", "", 1, 1000000, utils.KAFKA_PART_TXN)

begin := &marshaller.MarshalledMessage{
Operation: "BEGIN",
Expand Down Expand Up @@ -60,7 +62,7 @@ func TestAddTransaction(t *testing.T) {
}

func TestFullBatch(t *testing.T) {
batch := NewKafkaBatch("test-topic", "", 1, 1000000)
batch := NewKafkaBatch("test-topic", "", 1, 1000000, utils.KAFKA_PART_TXN)

insert1 := &marshaller.MarshalledMessage{
Operation: "INSERT",
Expand Down Expand Up @@ -89,7 +91,7 @@ func TestFullBatch(t *testing.T) {
}

func TestMessageTooBig(t *testing.T) {
batch := NewKafkaBatch("test-topic", "", 1, 0)
batch := NewKafkaBatch("test-topic", "", 1, 0, utils.KAFKA_PART_TXN)

data := make([]byte, 10)

Expand All @@ -113,7 +115,7 @@ func TestMessageTooBig(t *testing.T) {
}

func TestPartialTransaction(t *testing.T) {
b := NewKafkaBatch("test-topic", "", 2, 1000000)
b := NewKafkaBatch("test-topic", "", 2, 1000000, utils.KAFKA_PART_TXN)

begin1 := &marshaller.MarshalledMessage{
Operation: "BEGIN",
Expand Down Expand Up @@ -182,27 +184,27 @@ func TestPartialTransaction(t *testing.T) {
}

func TestClose(t *testing.T) {
b := NewKafkaBatch("test-topic", "", 2, 1000000)
b := NewKafkaBatch("test-topic", "", 2, 1000000, utils.KAFKA_PART_TXN)
success, err := b.Close()

assert.Equal(t, true, success)
assert.Equal(t, nil, err)
}

func TestIsFullFalse(t *testing.T) {
b := NewKafkaBatch("test-topic", "", 2, 1000000)
b := NewKafkaBatch("test-topic", "", 2, 1000000, utils.KAFKA_PART_TXN)
assert.Equal(t, false, b.IsFull())
}

func TestIsEmptyTrue(t *testing.T) {
b := NewKafkaBatch("test-topic", "", 2, 1000000)
b := NewKafkaBatch("test-topic", "", 2, 1000000, utils.KAFKA_PART_TXN)
assert.Equal(t, true, b.IsEmpty())
assert.Equal(t, 0, b.NumMessages())
assert.Equal(t, int64(0), b.GetPayloadByteSize())
}

func TestIsEmptyFalse(t *testing.T) {
b := NewKafkaBatch("test-topic", "", 2, 1000000)
b := NewKafkaBatch("test-topic", "", 2, 1000000, utils.KAFKA_PART_TXN)

insert := &marshaller.MarshalledMessage{
Operation: "INSERT",
Expand Down
12 changes: 12 additions & 0 deletions transport/transporters/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
ConfVarKafkaPrivateKey = "kafka-private-key"
ConfVarKafkaPublicKey = "kafka-public-key"
ConfVarKafkaVerifyProducer = "kafka-verify-producer"
ConfVarKafkaPartitionMethod = "kafka-partition-method"
)

var Flags = []cli.Flag{
Expand Down Expand Up @@ -113,4 +114,15 @@ var Flags = []cli.Flag{
Usage: "Whether or not to verify producer upon startup",
EnvVar: "KAFKA_VERIFY_PRODUCER",
}),
altsrc.NewStringFlag(cli.StringFlag{
Name: ConfVarKafkaPartitionMethod,
Usage: "Controls which kafka partition a message will be written to. Options " +
"are 'batch', 'transaction', 'random', and 'tablename'. In batch mode all message in a " +
"batch will be sent to the same partition. In transaction mode the transaction " +
"of a message dictates which partition the message will be sent to. In random " +
"mode a message is sent to a partition at random. In tablename mode the table " +
"associated with the message will be used for partition selection.",
EnvVar: "KAFKA_PARTITION_METHOD",
Value: "random",
}),
}
18 changes: 16 additions & 2 deletions transport/transporters/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"os"

"github.com/Nextdoor/pg-bifrost.git/transport/transporters/kafka/utils"

"github.com/Nextdoor/pg-bifrost.git/shutdown"
"github.com/Nextdoor/pg-bifrost.git/stats"
"github.com/Nextdoor/pg-bifrost.git/transport"
Expand Down Expand Up @@ -155,6 +157,7 @@ type KafkaBatchFactory struct {
topic string
maxMessageBytes int
batchSize int
kafkaPartMethod utils.KafkaPartitionMethod
}

func verifySend(producer *sarama.SyncProducer, topic string) error {
Expand Down Expand Up @@ -189,9 +192,20 @@ func NewBatchFactory(transportConfig map[string]interface{}) transport.BatchFact
log.Fatalf("Expected type for %s is %s", ConfVarKafkaBatchSize, "int")
}

return KafkaBatchFactory{topic, maxMessageBytes, batchSize}
partMethodVar := transportConfig[ConfVarKafkaPartitionMethod]
partMethodStr, ok := partMethodVar.(string)
if !ok {
log.Fatalf("Expected type for %s is %s", ConfVarKafkaPartitionMethod, "string")
}

partMethod, ok := utils.NameToPartitionMethod[partMethodStr]
if !ok {
log.Fatalf("Invalid kafka partition method '%s'", partMethodStr)
}

return KafkaBatchFactory{topic, maxMessageBytes, batchSize, partMethod}
}

func (f KafkaBatchFactory) NewBatch(partitionKey string) transport.Batch {
return batch.NewKafkaBatch(f.topic, partitionKey, f.batchSize, f.maxMessageBytes)
return batch.NewKafkaBatch(f.topic, partitionKey, f.batchSize, f.maxMessageBytes, f.kafkaPartMethod)
}
10 changes: 5 additions & 5 deletions transport/transporters/kafka/transporter/transporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestSendOk(t *testing.T) {
maxMessageBytes := 1000000

tp := NewTransporter(sh, in, statsChan, txns, *log, mockProducer, topic)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes, 0)

marshalledMessage := marshaller.MarshalledMessage{
Operation: "INSERT",
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestSendMultipleInBatchOk(t *testing.T) {
maxMessageBytes := 1000000

tp := NewTransporter(sh, in, statsChan, txns, *log, mockProducer, topic)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes, 0)

for i := 0; i < 5; i++ {
marshalledMessage := &marshaller.MarshalledMessage{
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestPanicHandling(t *testing.T) {
maxMessageBytes := 1000000

tp := NewTransporter(sh, in, statsChan, txns, *log, mockProducer, topic)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes, 0)

marshalledMessage := marshaller.MarshalledMessage{
Operation: "INSERT",
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestFailedSend(t *testing.T) {
maxMessageBytes := 1000000

tp := NewTransporter(sh, in, statsChan, txns, *log, mockProducer, topic)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes, 0)

marshalledMessageOne := marshaller.MarshalledMessage{
Operation: "INSERT",
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestSucceedAndFailSend(t *testing.T) {
maxMessageBytes := 1000000

tp := NewTransporter(sh, in, statsChan, txns, *log, mockProducer, topic)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes)
b := batch.NewKafkaBatch(topic, "", batchSize, maxMessageBytes, 0)

marshalledMessageOne := marshaller.MarshalledMessage{
Operation: "INSERT",
Expand Down
37 changes: 37 additions & 0 deletions transport/transporters/kafka/utils/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2023 Nextdoor.com, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

type KafkaPartitionMethod int

const (
KAFKA_PART_TXN KafkaPartitionMethod = iota // Uses each msgs's txn to select kafka partition
KAFKA_PART_BATCH // All messages in a batch go to the same random partition
KAFKA_PART_RANDOM // Messages are randomly assigned a partition by Sarama
KAFKA_PART_TXN_CONST // Similar to KAFKA_PART_TXN but should only be used for testing
KAFKA_PART_TABLE_NAME // Partitions by message table name
)

var (
NameToPartitionMethod = map[string]KafkaPartitionMethod{
"random": KAFKA_PART_RANDOM,
"batch": KAFKA_PART_BATCH,
"transaction": KAFKA_PART_TXN,
"transaction-constant": KAFKA_PART_TXN_CONST,
"tablename": KAFKA_PART_TABLE_NAME,
}
)

0 comments on commit 0a9c5e3

Please sign in to comment.