From 6a859e89d7243adeea444095cd2ec793f3943fb9 Mon Sep 17 00:00:00 2001 From: Leslie Yan Date: Fri, 7 Aug 2020 09:55:49 -0400 Subject: [PATCH 1/6] feat: Add ability to bundle all records from one micro-batch into PutRecords --- README.md | 1 + .../sql/kinesis/KinesisSourceProvider.scala | 29 ++++---- .../spark/sql/kinesis/KinesisWriteTask.scala | 69 +++++++++++++++++-- 3 files changed, 79 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 3a58c45..3383f05 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ Refering $SPARK_HOME to the Spark installation directory. | kinesis.executor.maxConnections | 1 | Specify the maximum connections to Kinesis | | kinesis.executor.aggregationEnabled | true | Specify if records should be aggregated before sending them to Kinesis | | kniesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End | +| kinesis.executor.sink.bundle.records | false | Bundle all records from one micro-batch into PutRecords | ## Roadmap * We need to migrate to DataSource V2 APIs for MicroBatchExecution. diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala index cad76e9..c4955b9 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala @@ -51,10 +51,10 @@ private[kinesis] class KinesisSourceProvider extends DataSourceRegister */ override def sourceSchema( - sqlContext: SQLContext, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): (String, StructType) = { + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } validateStreamOptions(caseInsensitiveParams) require(schema.isEmpty, "Kinesis source has a fixed schema and cannot be set with a custom one") @@ -62,11 +62,11 @@ private[kinesis] class KinesisSourceProvider extends DataSourceRegister } override def createSource( - sqlContext: SQLContext, - metadataPath: String, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): Source = { + sqlContext: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } @@ -138,7 +138,7 @@ private[kinesis] class KinesisSourceProvider extends DataSourceRegister "Sink endpoint url is a required field") } if (caseInsensitiveParams.contains(SINK_AGGREGATION_ENABLED) && ( - caseInsensitiveParams(SINK_AGGREGATION_ENABLED).trim != "true" && + caseInsensitiveParams(SINK_AGGREGATION_ENABLED).trim != "true" && caseInsensitiveParams(SINK_AGGREGATION_ENABLED).trim != "false" )) { throw new IllegalArgumentException( @@ -238,11 +238,12 @@ private[kinesis] object KinesisSourceProvider extends Logging { private[kinesis] val SINK_RECORD_MAX_BUFFERED_TIME = "kinesis.executor.recordmaxbufferedtime" private[kinesis] val SINK_MAX_CONNECTIONS = "kinesis.executor.maxconnections" private[kinesis] val SINK_AGGREGATION_ENABLED = "kinesis.executor.aggregationenabled" - private[kinesis] val SINK_FLUSH_WAIT_TIME_MILLIS = "kniesis.executor.flushwaittimemillis" + private[kinesis] val SINK_FLUSH_WAIT_TIME_MILLIS = "kinesis.executor.flushwaittimemillis" + private[kinesis] val SINK_SINK_BUNDLE_RECORDS = "kinesis.executor.sink.bundle.records" private[kinesis] def getKinesisPosition( - params: Map[String, String]): InitialKinesisPosition = { + params: Map[String, String]): InitialKinesisPosition = { val CURRENT_TIMESTAMP = System.currentTimeMillis params.get(STARTING_POSITION_KEY).map(_.trim) match { case Some(position) if position.toLowerCase(Locale.ROOT) == "latest" => @@ -269,7 +270,7 @@ private[kinesis] object KinesisSourceProvider extends Logging { private[kinesis] val DEFAULT_SINK_AGGREGATION: String = "true" private[kinesis] val DEFAULT_FLUSH_WAIT_TIME_MILLIS: String = "100" -} - + private[kinesis] val DEFAULT_SINK_BUNDLE_RECORDS: String = "false" +} diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala index c4dcbfe..0e3959d 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala @@ -43,11 +43,66 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin s"${KinesisSourceProvider.SINK_FLUSH_WAIT_TIME_MILLIS} has to be a positive integer") } + private val sinKBundleRecords = Try(producerConfiguration.getOrElse( + KinesisSourceProvider.SINK_SINK_BUNDLE_RECORDS, + KinesisSourceProvider.DEFAULT_SINK_BUNDLE_RECORDS).toBoolean).getOrElse { + throw new IllegalArgumentException( + s"${KinesisSourceProvider.SINK_SINK_BUNDLE_RECORDS} has to be a boolean value") + } + private var failedWrite: Throwable = _ def execute(iterator: Iterator[InternalRow]): Unit = { + + if (sinKBundleRecords) { + bundleExecute(iterator) + } else { + singleExecute(iterator) + } + + } + + def bundleExecute(iterator: Iterator[InternalRow]): Unit = { + producer = CachedKinesisProducer.getOrCreate(producerConfiguration) + + val kinesisCallBack = new FutureCallback[UserRecordResult]() { + + override def onFailure(t: Throwable): Unit = { + if (failedWrite == null && t!= null) { + failedWrite = t + logError(s"Writing to $streamName failed due to ${t.getCause}") + } + } + + override def onSuccess(result: UserRecordResult): Unit = { + logDebug(s"Successfully put records: \n " + + s"sequenceNumber=${result.getSequenceNumber}, \n" + + s"shardId=${result.getShardId}, \n" + + s"attempts=${result.getAttempts.size}") + } + + } + + while (iterator.hasNext && failedWrite == null) { + val currentRow = iterator.next() + val projectedRow = projection(currentRow) + val partitionKey = projectedRow.getString(0) + val data = projectedRow.getBinary(1) + + while (producer.getOutstandingRecordsCount > 1e4) Thread.sleep(100) + + val future = producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap(data)) + + Futures.addCallback(future, kinesisCallBack); + } + + } + + + def singleExecute(iterator: Iterator[InternalRow]): Unit = { producer = CachedKinesisProducer.getOrCreate(producerConfiguration) + while (iterator.hasNext && failedWrite == null) { val currentRow = iterator.next() val projectedRow = projection(currentRow) @@ -56,11 +111,10 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin sendData(partitionKey, data) } - } - def sendData(partitionKey: String, data: Array[Byte]): String = { - var sentSeqNumbers = new String + } + def sendData(partitionKey: String, data: Array[Byte]): Unit = { val future = producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap(data)) val kinesisCallBack = new FutureCallback[UserRecordResult]() { @@ -73,14 +127,17 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin } override def onSuccess(result: UserRecordResult): Unit = { - val shardId = result.getShardId - sentSeqNumbers = result.getSequenceNumber + logDebug(s"Successfully put records: \n " + + s"sequenceNumber=${result.getSequenceNumber}, \n" + + s"shardId=${result.getShardId}, \n" + + s"attempts=${result.getAttempts.size}") } + } + Futures.addCallback(future, kinesisCallBack) producer.flushSync() - sentSeqNumbers } private def flushRecordsIfNecessary(): Unit = { From e346bc4b180d0b42f5aad9556d441ed0e67de52a Mon Sep 17 00:00:00 2001 From: Leslie Yan Date: Fri, 7 Aug 2020 10:03:57 -0400 Subject: [PATCH 2/6] chore: formatting --- .../org/apache/spark/sql/kinesis/KinesisSourceProvider.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala index c4955b9..c68f61e 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala @@ -242,8 +242,7 @@ private[kinesis] object KinesisSourceProvider extends Logging { private[kinesis] val SINK_SINK_BUNDLE_RECORDS = "kinesis.executor.sink.bundle.records" - private[kinesis] def getKinesisPosition( - params: Map[String, String]): InitialKinesisPosition = { + private[kinesis] def getKinesisPosition(params: Map[String, String]): InitialKinesisPosition = { val CURRENT_TIMESTAMP = System.currentTimeMillis params.get(STARTING_POSITION_KEY).map(_.trim) match { case Some(position) if position.toLowerCase(Locale.ROOT) == "latest" => From fb4cf4078d8dd55cdb54dc8bd9d6bfd5a85f5350 Mon Sep 17 00:00:00 2001 From: Leslie Yan Date: Fri, 7 Aug 2020 10:13:16 -0400 Subject: [PATCH 3/6] chore: fix typo in doc --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3383f05..5384fed 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,7 @@ Refering $SPARK_HOME to the Spark installation directory. | kinesis.executor.recordMaxBufferedTime | 1000 (millis) | Specify the maximum buffered time of a record | | kinesis.executor.maxConnections | 1 | Specify the maximum connections to Kinesis | | kinesis.executor.aggregationEnabled | true | Specify if records should be aggregated before sending them to Kinesis | -| kniesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End | +| kinesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End | | kinesis.executor.sink.bundle.records | false | Bundle all records from one micro-batch into PutRecords | ## Roadmap From 3a8a515bbb10f8f848c071fab4ec8a5bf0852065 Mon Sep 17 00:00:00 2001 From: Leslie Yan Date: Fri, 7 Aug 2020 11:26:56 -0400 Subject: [PATCH 4/6] fix: fix var name --- .../org/apache/spark/sql/kinesis/KinesisSourceProvider.scala | 2 +- .../scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala index c68f61e..9f6b0e0 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala @@ -239,7 +239,7 @@ private[kinesis] object KinesisSourceProvider extends Logging { private[kinesis] val SINK_MAX_CONNECTIONS = "kinesis.executor.maxconnections" private[kinesis] val SINK_AGGREGATION_ENABLED = "kinesis.executor.aggregationenabled" private[kinesis] val SINK_FLUSH_WAIT_TIME_MILLIS = "kinesis.executor.flushwaittimemillis" - private[kinesis] val SINK_SINK_BUNDLE_RECORDS = "kinesis.executor.sink.bundle.records" + private[kinesis] val SINK_BUNDLE_RECORDS = "kinesis.executor.sink.bundle.records" private[kinesis] def getKinesisPosition(params: Map[String, String]): InitialKinesisPosition = { diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala index 0e3959d..912ddc5 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala @@ -44,10 +44,10 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin } private val sinKBundleRecords = Try(producerConfiguration.getOrElse( - KinesisSourceProvider.SINK_SINK_BUNDLE_RECORDS, + KinesisSourceProvider.SINK_BUNDLE_RECORDS, KinesisSourceProvider.DEFAULT_SINK_BUNDLE_RECORDS).toBoolean).getOrElse { throw new IllegalArgumentException( - s"${KinesisSourceProvider.SINK_SINK_BUNDLE_RECORDS} has to be a boolean value") + s"${KinesisSourceProvider.SINK_BUNDLE_RECORDS} has to be a boolean value") } private var failedWrite: Throwable = _ From f75312d1048ea1bc8ee7f2351f0ea6d3aab51a3a Mon Sep 17 00:00:00 2001 From: Leslie Yan Date: Thu, 13 Aug 2020 18:37:59 -0400 Subject: [PATCH 5/6] feat: enable RecordTtl setup and send less than 500 records in each putRecords request --- .../sql/kinesis/CachedKinesisProducer.scala | 13 +++++--- .../sql/kinesis/KinesisSourceProvider.scala | 3 ++ .../spark/sql/kinesis/KinesisWriteTask.scala | 31 ++++++++++++------- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/kinesis/CachedKinesisProducer.scala b/src/main/scala/org/apache/spark/sql/kinesis/CachedKinesisProducer.scala index 9db1991..8462443 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/CachedKinesisProducer.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/CachedKinesisProducer.scala @@ -22,16 +22,15 @@ import java.util.concurrent.{ExecutionException, TimeUnit} import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging + import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.AmazonKinesis import com.amazonaws.services.kinesis.producer.{KinesisProducer, KinesisProducerConfiguration} import com.google.common.cache._ import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging - private[kinesis] object CachedKinesisProducer extends Logging { private type Producer = KinesisProducer @@ -69,6 +68,11 @@ private[kinesis] object CachedKinesisProducer extends Logging { .map { k => k.drop(8).toString -> producerConfiguration(k) } .toMap + val recordTtl = kinesisParams.getOrElse( + KinesisSourceProvider.SINK_RECORD_TTL, + KinesisSourceProvider.DEFAULT_SINK_RECORD_TTL) + .toLong + val recordMaxBufferedTime = kinesisParams.getOrElse( KinesisSourceProvider.SINK_RECORD_MAX_BUFFERED_TIME, KinesisSourceProvider.DEFAULT_SINK_RECORD_MAX_BUFFERED_TIME) @@ -123,6 +127,7 @@ private[kinesis] object CachedKinesisProducer extends Logging { } val kinesisProducer = new Producer(new KinesisProducerConfiguration() + .setRecordTtl(recordTtl) .setRecordMaxBufferedTime(recordMaxBufferedTime) .setMaxConnections(maxConnections) .setAggregationEnabled(aggregation) diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala index 9f6b0e0..9a34e4e 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala @@ -235,6 +235,7 @@ private[kinesis] object KinesisSourceProvider extends Logging { // Sink Options private[kinesis] val SINK_STREAM_NAME_KEY = "streamname" private[kinesis] val SINK_ENDPOINT_URL = "endpointurl" + private[kinesis] val SINK_RECORD_TTL = "kinesis.executor.recordTtl" private[kinesis] val SINK_RECORD_MAX_BUFFERED_TIME = "kinesis.executor.recordmaxbufferedtime" private[kinesis] val SINK_MAX_CONNECTIONS = "kinesis.executor.maxconnections" private[kinesis] val SINK_AGGREGATION_ENABLED = "kinesis.executor.aggregationenabled" @@ -262,6 +263,8 @@ private[kinesis] object KinesisSourceProvider extends Logging { private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1" + private[kinesis] val DEFAULT_SINK_RECORD_TTL: String = "30000" + private[kinesis] val DEFAULT_SINK_RECORD_MAX_BUFFERED_TIME: String = "1000" private[kinesis] val DEFAULT_SINK_MAX_CONNECTIONS: String = "1" diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala index 912ddc5..2edfa8b 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala @@ -63,7 +63,18 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin } - def bundleExecute(iterator: Iterator[InternalRow]): Unit = { + private def bundleExecute(iterator: Iterator[InternalRow]): Unit = { + + val groupedIterator: iterator.GroupedIterator[InternalRow] = iterator.grouped(490) + + while (groupedIterator.hasNext) { + val rowList = groupedIterator.next() + sendBundledData(rowList) + } + + } + + private def sendBundledData(rowList: List[InternalRow]): Unit = { producer = CachedKinesisProducer.getOrCreate(producerConfiguration) val kinesisCallBack = new FutureCallback[UserRecordResult]() { @@ -81,26 +92,22 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin s"shardId=${result.getShardId}, \n" + s"attempts=${result.getAttempts.size}") } - } - while (iterator.hasNext && failedWrite == null) { - val currentRow = iterator.next() - val projectedRow = projection(currentRow) + for (r <- rowList) { + + val projectedRow = projection(r) val partitionKey = projectedRow.getString(0) val data = projectedRow.getBinary(1) - while (producer.getOutstandingRecordsCount > 1e4) Thread.sleep(100) - val future = producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap(data)) - Futures.addCallback(future, kinesisCallBack); - } + Futures.addCallback(future, kinesisCallBack) + } } - - def singleExecute(iterator: Iterator[InternalRow]): Unit = { + private def singleExecute(iterator: Iterator[InternalRow]): Unit = { producer = CachedKinesisProducer.getOrCreate(producerConfiguration) while (iterator.hasNext && failedWrite == null) { @@ -114,7 +121,7 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin } - def sendData(partitionKey: String, data: Array[Byte]): Unit = { + private def sendData(partitionKey: String, data: Array[Byte]): Unit = { val future = producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap(data)) val kinesisCallBack = new FutureCallback[UserRecordResult]() { From 321333fe9694bd7135024930cc5ac39aabb8d2e2 Mon Sep 17 00:00:00 2001 From: Leslie Yan Date: Tue, 8 Sep 2020 15:27:46 -0400 Subject: [PATCH 6/6] feat: make number of records in each PutRecords request configurable --- README.md | 3 ++- .../apache/spark/sql/kinesis/KinesisSourceProvider.scala | 5 +++++ .../org/apache/spark/sql/kinesis/KinesisWriteTask.scala | 9 ++++++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5384fed..d4102ce 100644 --- a/README.md +++ b/README.md @@ -150,7 +150,8 @@ Refering $SPARK_HOME to the Spark installation directory. | kinesis.executor.maxConnections | 1 | Specify the maximum connections to Kinesis | | kinesis.executor.aggregationEnabled | true | Specify if records should be aggregated before sending them to Kinesis | | kinesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End | -| kinesis.executor.sink.bundle.records | false | Bundle all records from one micro-batch into PutRecords | +| kinesis.executor.sink.bundle.records | false | Bundle records from one micro-batch into PutRecords request| +| kinesis.executor.sink.max.bundle.records | 500 | Max number of records in each PutRecords request | ## Roadmap * We need to migrate to DataSource V2 APIs for MicroBatchExecution. diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala index 9a34e4e..9a91a68 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisSourceProvider.scala @@ -241,6 +241,8 @@ private[kinesis] object KinesisSourceProvider extends Logging { private[kinesis] val SINK_AGGREGATION_ENABLED = "kinesis.executor.aggregationenabled" private[kinesis] val SINK_FLUSH_WAIT_TIME_MILLIS = "kinesis.executor.flushwaittimemillis" private[kinesis] val SINK_BUNDLE_RECORDS = "kinesis.executor.sink.bundle.records" + private[kinesis] val SINK_MAX_BUNDLE_RECORDS = "kinesis.executor.sink.max.bundle.records" + private[kinesis] def getKinesisPosition(params: Map[String, String]): InitialKinesisPosition = { @@ -275,4 +277,7 @@ private[kinesis] object KinesisSourceProvider extends Logging { private[kinesis] val DEFAULT_SINK_BUNDLE_RECORDS: String = "false" + private[kinesis] val DEFAULT_SINK_MAX_BUNDLE_RECORDS: String = "500" + + } diff --git a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala index 2edfa8b..5c6f3b6 100644 --- a/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala +++ b/src/main/scala/org/apache/spark/sql/kinesis/KinesisWriteTask.scala @@ -50,6 +50,13 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin s"${KinesisSourceProvider.SINK_BUNDLE_RECORDS} has to be a boolean value") } + private val maxBundleRecords = Try(producerConfiguration.getOrElse( + KinesisSourceProvider.SINK_MAX_BUNDLE_RECORDS, + KinesisSourceProvider.DEFAULT_SINK_MAX_BUNDLE_RECORDS).toInt).getOrElse { + throw new IllegalArgumentException( + s"${KinesisSourceProvider.SINK_MAX_BUNDLE_RECORDS} has to be a integer value") + } + private var failedWrite: Throwable = _ @@ -65,7 +72,7 @@ private[kinesis] class KinesisWriteTask(producerConfiguration: Map[String, Strin private def bundleExecute(iterator: Iterator[InternalRow]): Unit = { - val groupedIterator: iterator.GroupedIterator[InternalRow] = iterator.grouped(490) + val groupedIterator: iterator.GroupedIterator[InternalRow] = iterator.grouped(maxBundleRecords) while (groupedIterator.hasNext) { val rowList = groupedIterator.next()