Skip to content

Commit

Permalink
[#2353][Improvement] Fix the warning: unchecked method invocation: me…
Browse files Browse the repository at this point in the history
…thod sendCachedBuffer in class RMRecordsReader.RecordsCombiner is applied to given types
  • Loading branch information
cchung100m committed Feb 8, 2025
1 parent 934a972 commit 409d462
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class RecordsFetcher extends Thread {
RecordsFetcher(int partitionId) {
this.partitionId = partitionId;
this.sleepTime = initFetchSleepTime;
this.recordBuffer = new RecordBuffer(partitionId);
this.recordBuffer = new RecordBuffer<>(partitionId);
this.nextQueue =
combiner == null ? mergeBuffers.get(partitionId) : combineBuffers.get(partitionId);
this.serverInfos = shuffleServerInfoMap.get(partitionId);
Expand Down Expand Up @@ -529,7 +529,7 @@ public void run() {
}
if (recordBuffer.size() >= maxRecordsNumPerBuffer) {
nextQueue.put(recordBuffer);
recordBuffer = new RecordBuffer(partitionId);
recordBuffer = new RecordBuffer<>(partitionId);
}
recordBuffer.addRecord(reader.getCurrentKey(), reader.getCurrentValue());
}
Expand Down Expand Up @@ -564,12 +564,12 @@ class RecordsCombiner extends Thread {
// The RecordBuffer has a capacity limit, records for the same key may be
// distributed in different RecordBuffers. So we need a cachedBuffer used
// to record the buffer of the last combine.
private RecordBuffer cached;
private RecordBuffer<K, C> cached;
private Queue nextQueue;

RecordsCombiner(int partitionId) {
this.partitionId = partitionId;
this.cached = new RecordBuffer(partitionId);
this.cached = new RecordBuffer<>(partitionId);
this.nextQueue = mergeBuffers.get(partitionId);
setName("RecordsCombiner-" + partitionId);
}
Expand All @@ -592,7 +592,7 @@ public void run() {
// we can send the cached to downstream directly.
if (cached.size() > 0 && !isSameKey(cached.getLastKey(), current.getFirstKey())) {
sendCachedBuffer(cached);
cached = new RecordBuffer(partitionId);
cached = new RecordBuffer<>(partitionId);
}

// 3 combine the current, then cache it. By this way, we can handle the specical case
Expand Down

0 comments on commit 409d462

Please sign in to comment.