Skip to content

MINOR: Cleanups in storage module #20087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

mimaison
Copy link
Member

@mimaison mimaison commented Jul 2, 2025

Cleanups including:

  • Java 17 syntax, record and switch
  • assertEquals() order
  • javadoc

@github-actions github-actions bot added core Kafka Broker storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature KIP-932 Queues for Kafka labels Jul 2, 2025
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR, left some comments

public final long recoveryPoint;
public final LogOffsetMetadata nextOffsetMetadata;

public record LoadedLogOffsets(long logStartOffset, long recoveryPoint, LogOffsetMetadata nextOffsetMetadata) {
public LoadedLogOffsets(final long logStartOffset,
final long recoveryPoint,
final LogOffsetMetadata nextOffsetMetadata) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remain equals(), hashCode(), toString() for this record?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed equals() and hashCode()


int i = 0;
for (RecordBatch batch : validatedRecords.batches()) {
assertTrue(batch.isValid());
assertEquals(batch.timestampType(), TimestampType.CREATE_TIME);
assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
maybeCheckBaseTimestamp(timestampSeq.get(0), batch);
assertEquals(batch.maxTimestamp(), batch.maxTimestamp());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion also can remove

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are the same issue at L365, L1811, We could update the actual value to TestUtils.toList(batch).stream().map(Record::timestamp).max(Long::compare).get(), the root cause is #16167

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found this issue while reviewing this PR and have already filed a MINOR to fix it before the comments. #20093

public final short producerEpoch;

public LastRecord(OptionalLong lastDataOffset, short producerEpoch) {
public record LastRecord(OptionalLong lastDataOffset, short producerEpoch) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove equals and hashCode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@wernerdv
Copy link
Contributor

wernerdv commented Jul 2, 2025

@mimaison I think you meant the 'storage' module in the PR title.

@@ -34,6 +34,12 @@
import java.util.Collection;
import java.util.Iterator;

/**
* @param <R1> The type of records used to formulate the expectations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The formatting of the type parameters in this javadoc comment is a little odd.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I blame IntelliJ :) Fixed

@mimaison mimaison changed the title MINOR: Cleanups in server module MINOR: Cleanups in storage module Jul 3, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison thanks for this cleanup

public int quotaWindowSizeSeconds() {
return quotaWindowSizeSeconds;
}

@Override
public String toString() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the generated toString should be good enough I think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

@@ -94,7 +94,7 @@ public OptionalLong read() {
Content content = Json.parseStringAs(text, Content.class);
return OptionalLong.of(content.brokerEpoch);
} catch (Exception e) {
logger.debug("Fail to read the clean shutdown file in " + cleanShutdownFile.toPath() + ":" + e);
logger.debug("Fail to read the clean shutdown file in {}:{}", cleanShutdownFile.toPath(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last parameter could be handled well if it is an exception object. Perhaps, we could use {} instead of {}:{}?

result = 31 * result + Long.hashCode(timestamp);
return result;
}

@Override
public String toString() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not here as this also prints firstSeq and firstOffset which are not fields.

result = 31 * result + Boolean.hashCode(isAborted);
return result;
}
public record CompletedTxn(long producerId, long firstOffset, long lastOffset, boolean isAborted) {

@Override
public String toString() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

IndexValue(T index) {
this.index = index;
}
private record IndexValue<T extends AbstractIndex>(T index) implements IndexWrapper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an argument before for the "suitable" variables in a record class. I prefer to use record class only if all variables are immutable. That can ensure the hashCode and equals are consistent. What do you think? That is a kind of personal taste, so any feedback is welcomed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a bit of a gray area. I'm happy to keep it as a regular class. I undid that change

@chia7712
Copy link
Member

@mimaison could you please fix the conflicts?

@mimaison
Copy link
Member Author

Rebased on trunk. The test failure, HandlingSourceTopicDeletionIntegrationTest, seems unrelated, it's in streams. There's a Jira for it: https://issues.apache.org/jira/browse/KAFKA-19511

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison thanks for this cleanup. overall LGTM

if (!file.exists()) {
return true;
}
return file.delete();
} catch (final Exception e) {
LOGGER.error(format("Encountered error while deleting %s", file.getAbsolutePath()));
LOGGER.error("Encountered error while deleting {}", file.getAbsolutePath());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should add the e to the logger

LOGGER.error("Encountered error while deleting {}", file.getAbsolutePath(), e);

public Integer getExpectedFromSecondTierCount() {
return expectedFromSecondTierCount;
}
public record ConsumableSpec(Long fetchOffset, Integer expectedTotalCount, Integer expectedFromSecondTierCount) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the usage of this class, and it appears using primitive types should be fine.

public Integer getEventCount() {
return eventCount;
}
public record DeletableSpec(Integer sourceBrokerId, LocalTieredStorageEvent.EventType eventType, Integer eventCount) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

public RemoteFetchCount getFetchCount() {
return fetchCount;
}
public record FetchableSpec(Integer sourceBrokerId, RemoteFetchCount fetchCount) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

public List<ProducerRecord<String, String>> getRecords() {
return records;
}
public record OffloadableSpec(Integer sourceBrokerId, Integer baseOffset,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants