From c7e29cd48c8d287a428292dac1fb494575ae49a6 Mon Sep 17 00:00:00 2001 From: "Miroslav Chomut (CZ)" Date: Fri, 23 Aug 2024 09:38:08 +0200 Subject: [PATCH] scala3to scala2 post-downgrade fixes (#11) --- .../main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala | 2 +- .../main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala b/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala index 77e7294..e75bab2 100644 --- a/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala +++ b/examples/src/main/scala/za/co/absa/KafkaCase/Examples/KafkaCase.scala @@ -69,7 +69,7 @@ object KafkaCase { readerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // 2 -> MAKE READER (should be in using block for newer versions of scala) - val reader = new ReaderImpl[EdlaChangeTopic](readerProps, "KillMePleaseTopic") // must be assigned outside using, otherwise exception from constructor is swallowed by using block + val reader = new ReaderImpl[EdlaChangeTopic](readerProps, "KillMePleaseTopic") try { for (item <- reader) println(item) diff --git a/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala b/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala index 127cc6b..0158106 100644 --- a/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala +++ b/reader/src/main/scala/za/co/absa/KafkaCase/Reader/ReaderImpl.scala @@ -47,9 +47,10 @@ class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Dura private def fetchNextBatch(): util.Iterator[ConsumerRecord[String, String]] = { log.info("Fetching next batch") var nextIterator = consumer.poll(timeout).iterator() - while(neverEnding && !nextIterator.hasNext) + while(neverEnding && !nextIterator.hasNext) { log.info("Re-Fetching next batch") nextIterator = consumer.poll(timeout).iterator() + } nextIterator } }