diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala index 13183f7..b8bed18 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala @@ -19,11 +19,11 @@ package za.co.absa.kafkacase.examples.reader import com.typesafe.config.Config import io.circe.Decoder import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource -import za.co.absa.kafkacase.reader.ReaderImpl +import za.co.absa.kafkacase.reader.Reader object ReaderCustomResourceHandling { def apply[T: Decoder](readerConf: Config, topicName: String): Unit = { - withResource(ReaderImpl[T](readerConf, topicName))(reader => { + withResource(Reader[T](readerConf, topicName))(reader => { for (item <- reader) println(item) }) diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala index af381e1..2510743 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala @@ -18,11 +18,11 @@ package za.co.absa.kafkacase.examples.reader import com.typesafe.config.Config import io.circe.Decoder -import za.co.absa.kafkacase.reader.ReaderImpl +import za.co.absa.kafkacase.reader.Reader object ReaderManualResourceHandling { def apply[T: Decoder](readerConf: Config, topicName: String): Unit = { - val reader = ReaderImpl[T](readerConf, topicName) + val reader = Reader[T](readerConf, topicName) try { for (item <- reader) println(item) diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala index d14f5ab..0203a16 100644 --- a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala @@ -19,13 +19,16 @@ package za.co.absa.kafkacase.reader import com.typesafe.config.Config import io.circe.Decoder +import java.time.Duration import java.util.Properties trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoCloseable object Reader { - def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = { - val reader = ReaderImpl[T](readerProps, topicName) + private val DEFAULT_TIMEOUT = Duration.ofSeconds(6) + private val DEFAULT_NEVER_ENDING = false + + def readOnce[T: Decoder](reader: Reader[T], work: ((String, Either[String, T])) => Unit): Unit = { try { for (item <- reader) work(item) @@ -34,13 +37,54 @@ object Reader { } } - def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = { - val reader = ReaderImpl[T](readerConf, topicName) - try { - for (item <- reader) - work(item) - } finally { - reader.close() + // note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods + // Primary method that contains default arguments + def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit, neverEnding: Boolean = DEFAULT_NEVER_ENDING): Unit = + readOnce(apply[T](readerProps, topicName, neverEnding = neverEnding), work) + + // With never ending + def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit, neverEnding: Boolean): Unit = + readOnce(apply[T](readerConf, topicName, neverEnding = neverEnding), work) + + // Without never ending + def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = + readOnce(apply[T](readerConf, topicName, neverEnding = DEFAULT_NEVER_ENDING), work) + + // note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods + // Primary method that contains default arguments + def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = DEFAULT_NEVER_ENDING): Reader[TType] = { + if (neverEnding) + new ReaderNeverEnding[TType](props, topic, timeout) + else + new ReaderEnding[TType](props, topic, timeout) + } + + // Overloaded method with Config and all optional arguments + def apply[TType: Decoder](config: Config, topic: String, timeout: Duration, neverEnding: Boolean): Reader[TType] = { + val props = convertConfigToProperties(config) + apply[TType](props, topic, timeout, neverEnding) + } + + // Overloaded method with Config and neverEnding optional argument + def apply[TType: Decoder](config: Config, topic: String, neverEnding: Boolean): Reader[TType] = { + apply[TType](config, topic, DEFAULT_TIMEOUT, neverEnding) + } + + // Overloaded method with Config and timeout optional argument + def apply[TType: Decoder](config: Config, topic: String, timeout: Duration): Reader[TType] = { + apply[TType](config, topic, timeout, DEFAULT_NEVER_ENDING) + } + + // Overloaded method with Config and none of optional arguments + def apply[TType: Decoder](config: Config, topic: String): Reader[TType] = { + apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVER_ENDING) + } + + private def convertConfigToProperties(config: Config): Properties = { + val properties = new Properties() + config.entrySet().forEach { entry => + properties.put(entry.getKey, config.getString(entry.getKey)) } + properties } } diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderEnding.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderEnding.scala new file mode 100644 index 0000000..bb578f2 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderEnding.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.reader + +import io.circe.Decoder +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.slf4j.LoggerFactory +import za.co.absa.kafkacase.reader.ReaderEnding.log +import za.co.absa.kafkacase.reader.ReaderTools.parseRecord + +import java.time.Duration +import java.util +import java.util.Properties + +class ReaderEnding[TType: Decoder](props: Properties, topic: String, timeout: Duration) extends Reader[TType] { + private val consumer = new KafkaConsumer[String, String](props) + consumer.subscribe(util.Arrays.asList(topic)) + log.info("Fetching initial batch") + private var singlePollIterator = consumer.poll(timeout).iterator() + + override def hasNext: Boolean = singlePollIterator.hasNext + + override def next(): (String, Either[String, TType]) = { + log.info("Fetching next item") + val nextItem = singlePollIterator.next() + // Fetch next batch before return, so hasNext can answer correctly + if (!singlePollIterator.hasNext) { + log.info("Fetching next batch") + singlePollIterator = consumer.poll(timeout).iterator() + } + parseRecord(nextItem) + } + + def close(): Unit = consumer.close() +} + +object ReaderEnding { + private val log = LoggerFactory.getLogger(this.getClass) +} diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala deleted file mode 100644 index 3471b9e..0000000 --- a/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2024 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.kafkacase.reader - -import com.typesafe.config.Config -import io.circe.Decoder -import io.circe.jawn.decode -import org.slf4j.LoggerFactory -import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} -import za.co.absa.kafkacase.reader.ReaderImpl.{DEFAULT_TIMEOUT, convertConfigToProperties, log} - -import java.time.Duration -import java.util -import java.util.Properties - -class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Duration, neverEnding: Boolean) extends Reader[TType] { - private val consumer = new KafkaConsumer[String, String](props) - consumer.subscribe(util.Arrays.asList(topic)) - private var singlePollIterator = fetchNextBatch() - - override def hasNext: Boolean = singlePollIterator.hasNext - - override def next(): (String, Either[String, TType]) = { - log.info("Fetching next item") - val nextItem = singlePollIterator.next() - if (!singlePollIterator.hasNext) - singlePollIterator = fetchNextBatch() - val nextItemMaybeTyped = decode[TType](nextItem.value()) match { - case Left(_) => Left(s"Cannot parse ${nextItem.value()}") - case Right(item) => Right(item) - } - nextItem.key() -> nextItemMaybeTyped - } - - def close(): Unit = consumer.close() - - private def fetchNextBatch(): util.Iterator[ConsumerRecord[String, String]] = { - log.info("Fetching next batch") - var nextIterator = consumer.poll(timeout).iterator() - while(neverEnding && !nextIterator.hasNext) { - log.info("Re-Fetching next batch") - nextIterator = consumer.poll(timeout).iterator() - } - nextIterator - } -} - -object ReaderImpl { - private val DEFAULT_TIMEOUT: Duration = Duration.ofSeconds(3) - private val DEFAULT_NEVER_ENDING: Boolean = true - private val log = LoggerFactory.getLogger(this.getClass) - - // note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods - // Primary method that contains default arguments - def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = DEFAULT_NEVER_ENDING): ReaderImpl[TType] = { - new ReaderImpl[TType](props, topic, timeout, neverEnding) - } - - // Overloaded method with Config and all optional arguments - def apply[TType: Decoder](config: Config, topic: String, timeout: Duration, neverEnding: Boolean): ReaderImpl[TType] = { - val props = convertConfigToProperties(config) - apply[TType](props, topic, timeout, neverEnding) - } - - // Overloaded method with Config and neverEnding optional argument - def apply[TType: Decoder](config: Config, topic: String, neverEnding: Boolean): ReaderImpl[TType] = { - apply[TType](config, topic, DEFAULT_TIMEOUT, neverEnding) - } - - // Overloaded method with Config and timeout optional argument - def apply[TType: Decoder](config: Config, topic: String, timeout: Duration): ReaderImpl[TType] = { - apply[TType](config, topic, timeout, DEFAULT_NEVER_ENDING) - } - - // Overloaded method with Config and none of optional arguments - def apply[TType: Decoder](config: Config, topic: String): ReaderImpl[TType] = { - apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVER_ENDING) - } - - private def convertConfigToProperties(config: Config): Properties = { - val properties = new Properties() - config.entrySet().forEach { entry => - properties.put(entry.getKey, config.getString(entry.getKey)) - } - properties - } -} diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderNeverEnding.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderNeverEnding.scala new file mode 100644 index 0000000..9e4203e --- /dev/null +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderNeverEnding.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.reader + +import io.circe.Decoder +import org.slf4j.LoggerFactory +import org.apache.kafka.clients.consumer.KafkaConsumer +import za.co.absa.kafkacase.reader.ReaderNeverEnding.log +import za.co.absa.kafkacase.reader.ReaderTools.parseRecord + +import java.time.Duration +import java.util +import java.util.Properties + +class ReaderNeverEnding[TType: Decoder](props: Properties, topic: String, timeout: Duration) extends Reader[TType] { + private val consumer = new KafkaConsumer[String, String](props) + consumer.subscribe(util.Arrays.asList(topic)) + log.info("Fetching initial batch") + private var singlePollIterator = consumer.poll(timeout).iterator() + + override def hasNext: Boolean = true + + override def next(): (String, Either[String, TType]) = { + while(!singlePollIterator.hasNext) { + log.info("(Re)Fetching next batch") + singlePollIterator = consumer.poll(timeout).iterator() + } + log.info("Fetching next item") + val nextItem = singlePollIterator.next() + parseRecord(nextItem) + } + + def close(): Unit = consumer.close() +} + +object ReaderNeverEnding { + private val log = LoggerFactory.getLogger(this.getClass) +} diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderTools.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderTools.scala new file mode 100644 index 0000000..741866f --- /dev/null +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderTools.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2024 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.kafkacase.reader + +import io.circe.Decoder +import io.circe.jawn.decode +import org.apache.kafka.clients.consumer.ConsumerRecord + +object ReaderTools { + def parseRecord[TType: Decoder](record: ConsumerRecord[String, String]): (String, Either[String, TType]) = { + val maybeTyped = decode[TType](record.value()) match { + case Left(_) => Left(s"Cannot parse ${record.value()}") + case Right(item) => Right(item) + } + record.key() -> maybeTyped + } +}