Skip to content

Commit 2ebb79b

Browse files
HeartSaVioRzsxwing
authored andcommitted
[SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350
## What changes were proposed in this pull request? This patch adds the check to verify consumer group id is given correctly when custom group id is provided to Kafka parameter. ## How was this patch tested? Modified UT. Closes apache#23544 from HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 954ef96 commit 2ebb79b

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

+12-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
2828
import scala.io.Source
2929
import scala.util.Random
3030

31+
import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
3132
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
3233
import org.apache.kafka.common.TopicPartition
3334
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
638639
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
639640
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
640641

642+
val customGroupId = "id-" + Random.nextInt()
641643
val dsKafka = spark
642644
.readStream
643645
.format("kafka")
644-
.option("kafka.group.id", "id-" + Random.nextInt())
646+
.option("kafka.group.id", customGroupId)
645647
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
646648
.option("subscribe", topic)
647649
.option("startingOffsets", "earliest")
@@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
652654

653655
testStream(dsKafka)(
654656
makeSureGetOffsetCalled,
655-
CheckAnswer(1 to 30: _*)
657+
CheckAnswer(1 to 30: _*),
658+
Execute { _ =>
659+
val consumerGroups = testUtils.listConsumerGroups()
660+
val validGroups = consumerGroups.valid().get()
661+
val validGroupsId = validGroups.asScala.map(_.groupId())
662+
assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
663+
s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
664+
s"expected group id: $customGroupId")
665+
}
656666
)
657667
}
658668

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala

+12-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010
2020
import java.util.Locale
2121
import java.util.concurrent.atomic.AtomicInteger
2222

23+
import scala.collection.JavaConverters._
24+
import scala.util.Random
25+
2326
import org.apache.kafka.clients.producer.ProducerRecord
2427
import org.apache.kafka.common.TopicPartition
2528

@@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
247250
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
248251
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))
249252

250-
val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
253+
val customGroupId = "id-" + Random.nextInt()
254+
val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId))
251255
checkAnswer(df, (1 to 30).map(_.toString).toDF())
256+
257+
val consumerGroups = testUtils.listConsumerGroups()
258+
val validGroups = consumerGroups.valid().get()
259+
val validGroupsId = validGroups.asScala.map(_.groupId())
260+
assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
261+
s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
262+
s"expected group id: $customGroupId")
252263
}
253264

254265
test("read Kafka transactional messages: read_committed") {

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
3333
import kafka.server.checkpoints.OffsetCheckpointFile
3434
import kafka.utils.ZkUtils
3535
import org.apache.kafka.clients.CommonClientConfigs
36-
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
36+
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions}
3737
import org.apache.kafka.clients.consumer.KafkaConsumer
3838
import org.apache.kafka.clients.producer._
3939
import org.apache.kafka.common.TopicPartition
@@ -311,6 +311,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
311311
offsets
312312
}
313313

314+
def listConsumerGroups(): ListConsumerGroupsResult = {
315+
adminClient.listConsumerGroups()
316+
}
317+
314318
protected def brokerConfiguration: Properties = {
315319
val props = new Properties()
316320
props.put("broker.id", "0")

0 commit comments

Comments
 (0)