Skip to content

Commit

Permalink
#37 NeverEnding reader postpones last message
Browse files Browse the repository at this point in the history
  • Loading branch information
ABMC831 committed Jan 27, 2025
1 parent 2fd43f4 commit 20aafb5
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package za.co.absa.kafkacase.examples.reader
import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.models.utils.ResourceHandler.withResource
import za.co.absa.kafkacase.reader.ReaderImpl
import za.co.absa.kafkacase.reader.Reader

object ReaderCustomResourceHandling {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
withResource(ReaderImpl[T](readerConf, topicName))(reader => {
withResource(Reader[T](readerConf, topicName))(reader => {
for (item <- reader)
println(item)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.reader.ReaderImpl
import za.co.absa.kafkacase.reader.Reader

object ReaderManualResourceHandling {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
val reader = ReaderImpl[T](readerConf, topicName)
val reader = Reader[T](readerConf, topicName)
try {
for (item <- reader)
println(item)
Expand Down
62 changes: 53 additions & 9 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package za.co.absa.kafkacase.reader
import com.typesafe.config.Config
import io.circe.Decoder

import java.time.Duration
import java.util.Properties

trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoCloseable

object Reader {
def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = ReaderImpl[T](readerProps, topicName)
private val DEFAULT_TIMEOUT = Duration.ofSeconds(6)
private val DEFAULT_NEVER_ENDING = false

def readOnce[T: Decoder](reader: Reader[T], work: ((String, Either[String, T])) => Unit): Unit = {
try {
for (item <- reader)
work(item)
Expand All @@ -34,13 +37,54 @@ object Reader {
}
}

def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = ReaderImpl[T](readerConf, topicName)
try {
for (item <- reader)
work(item)
} finally {
reader.close()
// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods
// Primary method that contains default arguments
def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit, neverEnding: Boolean = DEFAULT_NEVER_ENDING): Unit =
readOnce(apply[T](readerProps, topicName, neverEnding = neverEnding), work)

// With never ending
def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit, neverEnding: Boolean): Unit =
readOnce(apply[T](readerConf, topicName, neverEnding = neverEnding), work)

// Without never ending
def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit =
readOnce(apply[T](readerConf, topicName, neverEnding = DEFAULT_NEVER_ENDING), work)

// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods
// Primary method that contains default arguments
def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = DEFAULT_NEVER_ENDING): Reader[TType] = {
if (neverEnding)
new ReaderNeverEnding[TType](props, topic, timeout)
else
new ReaderEnding[TType](props, topic, timeout)
}

// Overloaded method with Config and all optional arguments
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration, neverEnding: Boolean): Reader[TType] = {
val props = convertConfigToProperties(config)
apply[TType](props, topic, timeout, neverEnding)
}

// Overloaded method with Config and neverEnding optional argument
def apply[TType: Decoder](config: Config, topic: String, neverEnding: Boolean): Reader[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, neverEnding)
}

// Overloaded method with Config and timeout optional argument
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration): Reader[TType] = {
apply[TType](config, topic, timeout, DEFAULT_NEVER_ENDING)
}

// Overloaded method with Config and none of optional arguments
def apply[TType: Decoder](config: Config, topic: String): Reader[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVER_ENDING)
}

private def convertConfigToProperties(config: Config): Properties = {
val properties = new Properties()
config.entrySet().forEach { entry =>
properties.put(entry.getKey, config.getString(entry.getKey))
}
properties
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.reader

import io.circe.Decoder
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import za.co.absa.kafkacase.reader.ReaderEnding.log
import za.co.absa.kafkacase.reader.ReaderTools.parseRecord

import java.time.Duration
import java.util
import java.util.Properties

class ReaderEnding[TType: Decoder](props: Properties, topic: String, timeout: Duration) extends Reader[TType] {
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
log.info("Fetching initial batch")
private var singlePollIterator = consumer.poll(timeout).iterator()

override def hasNext: Boolean = singlePollIterator.hasNext

override def next(): (String, Either[String, TType]) = {
log.info("Fetching next item")
val nextItem = singlePollIterator.next()
// Fetch next batch before return, so hasNext can answer correctly
if (!singlePollIterator.hasNext) {
log.info("Fetching next batch")
singlePollIterator = consumer.poll(timeout).iterator()
}
parseRecord(nextItem)
}

def close(): Unit = consumer.close()
}

object ReaderEnding {
private val log = LoggerFactory.getLogger(this.getClass)
}
101 changes: 0 additions & 101 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.reader

import io.circe.Decoder
import org.slf4j.LoggerFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import za.co.absa.kafkacase.reader.ReaderNeverEnding.log
import za.co.absa.kafkacase.reader.ReaderTools.parseRecord

import java.time.Duration
import java.util
import java.util.Properties

class ReaderNeverEnding[TType: Decoder](props: Properties, topic: String, timeout: Duration) extends Reader[TType] {
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
log.info("Fetching initial batch")
private var singlePollIterator = consumer.poll(timeout).iterator()

override def hasNext: Boolean = true

override def next(): (String, Either[String, TType]) = {
while(!singlePollIterator.hasNext) {
log.info("(Re)Fetching next batch")
singlePollIterator = consumer.poll(timeout).iterator()
}
log.info("Fetching next item")
val nextItem = singlePollIterator.next()
parseRecord(nextItem)
}

def close(): Unit = consumer.close()
}

object ReaderNeverEnding {
private val log = LoggerFactory.getLogger(this.getClass)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.kafkacase.reader

import io.circe.Decoder
import io.circe.jawn.decode
import org.apache.kafka.clients.consumer.ConsumerRecord

object ReaderTools {
def parseRecord[TType: Decoder](record: ConsumerRecord[String, String]): (String, Either[String, TType]) = {
val maybeTyped = decode[TType](record.value()) match {
case Left(_) => Left(s"Cannot parse ${record.value()}")
case Right(item) => Right(item)
}
record.key() -> maybeTyped
}
}

0 comments on commit 20aafb5

Please sign in to comment.