Skip to content

Commit ff117ef

Browse files
committed
Some minor refactoring
1 parent 45a39f2 commit ff117ef

File tree

2 files changed

+2
-35
lines changed
  • code/kafka-streams-module-demo/src/main/kotlin/com/isel/kafkastreamsmoduledemo

2 files changed

+2
-35
lines changed

code/kafka-streams-module-demo/src/main/kotlin/com/isel/kafkastreamsmoduledemo/kafkaStreamsExperimentations/UseCase.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class UseCase(
6363
}
6464

6565
/**
66-
* Creates or accesses the GlobalKTable responsible for storing the keys and topics to know to which gateway topic it goes.
66+
* Creates and/or accesses the GlobalKTable responsible for storing the keys and topics to know to which gateway topic it goes.
6767
*
6868
*/
6969
final fun startStore(): ReadOnlyKeyValueStore<String, ValueAndTimestamp<Array<TopicKeys>>> {
@@ -111,9 +111,7 @@ class UseCase(
111111
val inputStream: KStream<Long, String> = builder.stream(inputTopic, Consumed.with(Serdes.Long(), Serdes.String()))
112112

113113
for (gatewayEntry in keyStorage.all().iterator()) {
114-
println("${KafkaStreamsUtils.YELLOW_TEXT}************************************************************************************************************************${KafkaStreamsUtils.RESET_TEXT_COLOR}")
115114
KafkaStreamsUtils.printlnBetweenColoredLines("gateway entry key[${gatewayEntry.key}]", KafkaStreamsUtils.PURPLE_TEXT)
116-
println("${KafkaStreamsUtils.YELLOW_TEXT}************************************************************************************************************************${KafkaStreamsUtils.RESET_TEXT_COLOR}")
117115

118116
inputStream.filter {key, value -> isKeyForGateway(key, gatewayEntry.key, inputTopic)}.to(gatewayEntry.key)
119117
}

code/kafka-streams-module-demo/src/main/kotlin/com/isel/kafkastreamsmoduledemo/utils/Utils.kt

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class KafkaStreamsUtils(
107107
thread {
108108
while (true) {
109109
consumer.poll(Duration.ofSeconds(5)).forEach { record ->
110-
logWithColor("[${System.currentTimeMillis()}] - Consumer key: [${record.key()}] and value[${record.value()}] from topic:[${record.topic()}] and timestamp [${record.timestamp()}]")
110+
printlnWithColor("[${System.currentTimeMillis()}] - Consumer key: [${record.key()}] and value[${record.value()}] from topic:[${record.topic()}] and timestamp [${record.timestamp()}]")
111111
}
112112
}
113113
}
@@ -120,38 +120,7 @@ data class TopicKeys(
120120
) {
121121
}
122122

123-
class TopicKeysArrayDeserializerTentative: Deserializer<Array<TopicKeys>> {
124-
private val objectMapper = ObjectMapper()
125-
override fun deserialize(topic: String?, data: ByteArray?): Array<TopicKeys>? {
126-
return try {
127-
if (data == null) {
128-
println("Null received at deserializing")
129-
return null
130-
}
131-
println("Deserializing...")
132-
objectMapper.readValue(String(data, charset("UTF-8")), Array<TopicKeys>::class.java)
133-
} catch (e: Exception) {
134-
throw SerializationException("Error when deserializing byte[] to TopicKeys[]")
135-
}
136-
}
137-
138-
}
139123

140-
class TopicKeysArraySerializerTentative : Serializer<Array<TopicKeys>> {
141-
private val objectMapper = ObjectMapper()
142-
override fun serialize(topic: String?, data: Array<TopicKeys>?): ByteArray? {
143-
return try {
144-
if (data == null) {
145-
println("Null received at serializing")
146-
return null
147-
}
148-
println("Serializing...")
149-
objectMapper.writeValueAsBytes(data)
150-
} catch (e: java.lang.Exception) {
151-
throw SerializationException("Error when serializing MessageDto to byte[]")
152-
}
153-
}
154-
}
155124

156125
class TopicKeysArrayDeserializer: Deserializer<Array<TopicKeys>> {
157126
override fun deserialize(topic: String?, data: ByteArray?): Array<TopicKeys>? {

0 commit comments

Comments
 (0)