Skip to content

Conversation

@jeffxiang
Copy link
Contributor

@jeffxiang jeffxiang commented Nov 3, 2025

offsetsForTimes() is now implemented using the metadata file in S3 to perform a more efficient search process. The mechanism is as follows:

  • Retrieve metadata file from S3 for the particular topic-partition
  • Get the topic-partition's timeindex from the metadata, which provides each segment's last modified timestamp on Kafka
  • Locate the segment with the greatest last-modified timestamp less than or equal to the target timestamp. This gives us the earliest segment that we should search from. For example, if we have:
000100.log --> timestamp = 100
000200.log --> timestamp = 200
000300.log --> timestamp = 300
...

and we query offsetsForTimes(timestamp=250), we start searching from 000200.log. Even though 000200.log doesn't contain our target offset (the last modified timestamp of this segment is 200 while we're searching for timestamp=250), we still want to start our search here in case the timeindex has a gap immediately following this segment.

  • We start searching linearly from this segment onward by loading each segment's .timeindex file and looking at the last entry, corresponding to the last record in that segment. If that entry's timestamp is less than the target timestamp, we continue onto the next segment. Usually, the very next segment is the target segment unless there is a gap in the metadata timeindex entries
  • If the last entry in the segment timeindex file is greater than or equal to the target timestamp, we have found the segment
  • Once we have located the segment, we extract the greatest segment timeindex entry which is less than or equal to the target timestamp
  • After locating this segment timeindex entry, we perform a linear scan forward in the records starting from this offset, to locate the exact offset which is the first offset with a timestamp >= the target timestamp

@jeffxiang jeffxiang requested a review from a team as a code owner November 3, 2025 23:30
@jeffxiang jeffxiang changed the title Consumer seek using metadata implementation Consumer timestamp search using metadata implementation Nov 20, 2025
Comment on lines +513 to +533
S3Records segmentRecords = null;
try {
segmentRecords = S3Records.open(
logObject.getLeft(),
logObject.getMiddle(),
startPosition,
false,
true,
logObject.getRight().intValue(),
true);

Iterator<S3ChannelRecordBatch> batches = segmentRecords.batchesFrom(startPosition).iterator();
while (batches.hasNext()) {
S3ChannelRecordBatch batch = batches.next();
for (Record record : batch) {
if (record.timestamp() >= targetTimestamp) {
return Optional.of(new OffsetAndTimestamp(record.offset(), record.timestamp(), Optional.empty()));
}
}
}
} catch (IOException e) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance of resource leak in case of an exception? Should this be a try-with-resources block instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the finally block we close the segmentRecords

return Optional.empty();
}

TimeIndex timeIndex = metadataOptional.get().getTimeIndex();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a null check for metadataOptional.get() to avoid NPE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never return a null for the metadataOptional or put a null value into the Optional<TopicPartitionMetadata>. The only check needed is Optional.isPresent() which is done above.

* 2. Find the largest timestamp in the segment's timeindex file
* 3. If the largest timestamp in the segment's timeindex file is less than the target timestamp, we continue to the next segment.
* 4. If the largest timestamp in the segment's timeindex file is greater than or equal to the target timestamp, we have found the segment.
* 5. Find the segment timeindex entry with a timestamp less than or equal to the target timestamp.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For step 5, do we perform a binary search?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implicitly, yes. We do this by putting the entries into a sorted ConcurrentSkipListSet within the TimeIndex.load() method and then calling ConcurrentSkipListSet.floor() method. This is done in https://github.com/pinterest/tiered-storage/blob/consumer_seek/ts-common/src/main/java/com/pinterest/kafka/tieredstorage/common/metadata/TimeIndex.java#L296

@jeffxiang jeffxiang merged commit a19031f into main Nov 20, 2025
1 check passed
@jeffxiang jeffxiang deleted the consumer_seek branch November 20, 2025 03:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants