Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

public class RecordHeader implements Header {
private ByteBuffer keyBuffer;
private String key;
private ByteBuffer valueBuffer;
private byte[] value;
private volatile String key;
private volatile ByteBuffer valueBuffer;
private volatile byte[] value;

public RecordHeader(String key, byte[] value) {
Objects.requireNonNull(key, "Null header keys are not permitted");
Expand All @@ -42,16 +42,24 @@ public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {

public String key() {
if (key == null) {
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
keyBuffer = null;
synchronized (this) {
if (key == null) {
key = Utils.utf8(keyBuffer, keyBuffer.remaining());
keyBuffer = null;
}
}
}
return key;
}

public byte[] value() {
if (value == null && valueBuffer != null) {
value = Utils.toArray(valueBuffer);
valueBuffer = null;
synchronized (this) {
if (value == null && valueBuffer != null) {
value = Utils.toArray(valueBuffer);
valueBuffer = null;
}
}
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -265,4 +273,44 @@ static void assertHeader(String key, String value, Header actual) {
assertArrayEquals(value.getBytes(), actual.value());
}

@RepeatedTest(100)
public void testRecordHeaderIsReadThreadSafe() throws Exception {
int threads = 8;
RecordHeader header = new RecordHeader(
ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8)),
ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8))
);

ExecutorService pool = Executors.newFixedThreadPool(threads);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could leverage CompletableFuture

        CountDownLatch startLatch = new CountDownLatch(1);
        var fs = IntStream.range(0, threads).mapToObj(__ -> CompletableFuture.supplyAsync(() -> {
            try {
                startLatch.await();
                header.key();
                header.value();
                return null;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        })).collect(Collectors.toUnmodifiableList());
        startLatch.countDown();
        fs.forEach(CompletableFuture::join);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing: could you please add similar unit test for null value?

CountDownLatch startLatch = new CountDownLatch(1);
AtomicBoolean raceDetected = new AtomicBoolean(false);

try {
Runnable task = () -> {
try {
startLatch.await();
header.key();
header.value();
} catch (NullPointerException e) {
raceDetected.set(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
};

for (int i = 0; i < threads; i++) pool.submit(task);

startLatch.countDown();

pool.shutdown();
pool.awaitTermination(5, TimeUnit.SECONDS);

assertFalse(raceDetected.get(), "Read race condition detected in RecordHeader!");
} finally {
if (!pool.isTerminated()) {
pool.shutdownNow();
}
}
}
}
3 changes: 3 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ <h5><a id="upgrade_4_2_0_from" href="#upgrade_4_2_0_from">Upgrading Servers to 4

<h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4.2.0</a></h5>
<ul>
<li>
The <code>org.apache.kafka.common.header.internals.RecordHeader</code> class has been updated to be read thread-safe. See <a href="https://cwiki.apache.org/confluence/x/nYmhFg">KIP-1205</a> for details.
</li>
<li>
The <code>org.apache.kafka.disallowed.login.modules</code> config was deprecated. Please use the <code>org.apache.kafka.allowed.login.modules</code>
instead.
Expand Down