diff --git a/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala index 77e7294..e75bab2 100644 --- a/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala @@ -69,7 +69,7 @@ object KafkaCase { readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // 2 -> MAKE READER (should be in using block for newer versions of scala) - val reader = new ReaderImpl[EdlaChangeTopic](readerProps, "KillMePleaseTopic") // must be assigned outside using, otherwise exception from constructor is swallowed by using block + val reader = new ReaderImpl[EdlaChangeTopic](readerProps, "KillMePleaseTopic") try { for (item <- reader) println(item) diff --git a/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala b/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala index 127cc6b..0158106 100644 --- a/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala +++ b/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala @@ -47,9 +47,10 @@ class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Dura private def fetchNextBatch(): util.Iterator[ConsumerRecord[String, String]] = { log.info("Fetching next batch") var nextIterator = consumer.poll(timeout).iterator() - while(neverEnding && !nextIterator.hasNext) + while(neverEnding && !nextIterator.hasNext) { log.info("Re-Fetching next batch") nextIterator = consumer.poll(timeout).iterator() + } nextIterator } }