Skip to content

Commit

Permalink
KAFKA-16339: [2/4 KStream#flatTransform] Remove Deprecated "transform…
Browse files Browse the repository at this point in the history
…er" methods and classes (#17245)

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
fonsdant authored Nov 8, 2024
1 parent 7413a5a commit 9565043
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 539 deletions.
297 changes: 9 additions & 288 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1209,48 +1209,6 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
builder);
}

@Override
@Deprecated
public <K1, V1> KStream<K1, V1> flatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
return flatTransform(transformerSupplier, Named.as(name), stateStoreNames);
}

@Override
@Deprecated
public <K1, V1> KStream<K1, V1> flatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
ApiUtils.checkSupplier(transformerSupplier);
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
}

final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier), name),
stateStoreNames);
transformNode.keyChangingOperation(true);

builder.addGraphNode(graphNode, transformNode);

// cannot inherit key and value serde
return new KStreamImpl<>(
name,
null,
null,
subTopologySourceNodes,
true,
transformNode,
builder);
}

@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@
* @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, String...)
* @see KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, String...)
* @see KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,6 @@ public class KStreamImplTest {
private final MockApiProcessorSupplier<String, String, Void, Void> processorSupplier = new MockApiProcessorSupplier<>();
private final MockApiFixedKeyProcessorSupplier<String, String, Void> fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.TransformerSupplier<String, String, Iterable<KeyValue<String, String>>> flatTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>>() {
@Override
public void init(final ProcessorContext context) {}

@Override
public Iterable<KeyValue<String, String>> transform(final String key, final String value) {
return Collections.singleton(new KeyValue<>(key, value));
}

@Override
public void close() {}
};
@SuppressWarnings("deprecation")
private final org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, String> valueTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.ValueTransformer<String, String>() {
@Override
Expand Down Expand Up @@ -1576,139 +1562,49 @@ public void shouldProcessFromSourcesThatMatchMultiplePattern() {
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransform() {
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
public void shouldNotAllowBadProcessSupplierOnProcess() {
final org.apache.kafka.streams.processor.api.Processor<String, String, Void, Void> processor =
processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer)
() -> testStream.process(() -> processor)
);
assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called."));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithStores() {
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
public void shouldNotAllowBadProcessSupplierOnProcessWithStores() {
final org.apache.kafka.streams.processor.api.Processor<String, String, Void, Void> processor =
processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer, "storeName")
() -> testStream.process(() -> processor, "storeName")
);
assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called."));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamed() {
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
public void shouldNotAllowBadProcessSupplierOnProcessWithNamed() {
final org.apache.kafka.streams.processor.api.Processor<String, String, Void, Void> processor =
processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer, Named.as("flatTransformer"))
() -> testStream.process(() -> processor, Named.as("flatTransformer"))
);
assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called."));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamedAndStores() {
final org.apache.kafka.streams.kstream.Transformer<String, String, Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
public void shouldNotAllowBadProcessSupplierOnProcessWithNamedAndStores() {
final org.apache.kafka.streams.processor.api.Processor<String, String, Void, Void> processor =
processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> testStream.flatTransform(() -> transformer, Named.as("flatTransformer"), "storeName")
() -> testStream.process(() -> processor, Named.as("processor"), "storeName")
);
assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called."));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullTransformerSupplierOnFlatTransform() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(null));
assertThat(exception.getMessage(), equalTo("transformerSupplier can't be null"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullTransformerSupplierOnFlatTransformWithStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(null, "storeName"));
assertThat(exception.getMessage(), equalTo("transformerSupplier can't be null"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(null, Named.as("flatTransformer")));
assertThat(exception.getMessage(), equalTo("transformerSupplier can't be null"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamedAndStores() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(null, Named.as("flatTransformer"), "storeName"));
assertThat(exception.getMessage(), equalTo("transformerSupplier can't be null"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNamesOnFlatTransform() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(flatTransformerSupplier, (String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNameOnFlatTransform() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(flatTransformerSupplier, (String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNamesOnFlatTransformWithNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(flatTransformerSupplier, Named.as("flatTransform"), (String[]) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullStoreNameOnFlatTransformWithNamed() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(flatTransformerSupplier, Named.as("flatTransform"), (String) null));
assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullNamedOnFlatTransform() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(flatTransformerSupplier, (Named) null));
assertThat(exception.getMessage(), equalTo("named can't be null"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowNullNamedOnFlatTransformWithStoreName() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.flatTransform(flatTransformerSupplier, (Named) null, "storeName"));
assertThat(exception.getMessage(), equalTo("named can't be null"));
}

@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnTransformValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
package org.apache.kafka.streams.scala
package kstream

import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
GlobalKTable,
JoinWindows,
KStream => KStreamJ,
Printed,
TransformerSupplier,
ValueTransformerSupplier,
ValueTransformerWithKeySupplier
}
Expand All @@ -36,7 +34,6 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
KeyValueMapperFromFunction,
MapperFromFunction,
PredicateFromFunction,
TransformerSupplierAsJava,
ValueMapperFromFunction,
ValueMapperWithKeyFromFunction,
ValueTransformerSupplierAsJava,
Expand Down Expand Up @@ -495,52 +492,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def toTable(named: Named, materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
new KTable(inner.toTable(named, materialized))

/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
* and computes zero or more output records.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `Transformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
@deprecated(since = "3.3", message = "Use process(ProcessorSupplier, String*) instead.")
def flatTransform[K1, V1](
transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
stateStoreNames: String*
): KStream[K1, V1] =
new KStream(inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*))

/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
* and computes zero or more output records.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `Transformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
* @param named a [[Named]] config used to name the processor in the topology
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
@deprecated(since = "3.3", message = "Use process(ProcessorSupplier, Named, String*) instead.")
def flatTransform[K1, V1](
transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
named: Named,
stateStoreNames: String*
): KStream[K1, V1] =
new KStream(inner.flatTransform(transformerSupplier.asJava, named, stateStoreNames: _*))

/**
* Transform the value of each input record into zero or more records (with possible new type) in the
* output stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ package org.apache.kafka.streams.scala.kstream

import java.time.Duration.ofSeconds
import java.time.{Duration, Instant}
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
JoinWindows,
Named,
Transformer,
ValueTransformer,
ValueTransformerSupplier,
ValueTransformerWithKey,
Expand Down Expand Up @@ -289,42 +287,6 @@ class KStreamTest extends TestDriver {
testDriver.close()
}

@nowarn
@Test
def testFlatTransformCorrectlyRecords(): Unit = {
class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] {
override def init(context: ProcessorContext): Unit = {}

override def transform(key: String, value: String): Iterable[KeyValue[String, String]] =
Array(new KeyValue(s"$key-transformed", s"$value-transformed"))

override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"

val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransform(() => new TestTransformer)
.to(sinkTopic)

val now = Instant.now()
val testDriver = createTestDriver(builder, now)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)

testInput.pipeInput("1", "value", now)

val result = testOutput.readKeyValue()
assertEquals("value-transformed", result.value)
assertEquals("1-transformed", result.key)

assertTrue(testOutput.isEmpty)

testDriver.close()
}

@nowarn
@Test
def testCorrectlyFlatTransformValuesInRecords(): Unit = {
Expand Down

0 comments on commit 9565043

Please sign in to comment.