Skip to content

Commit 436a19b

Browse files
authored
Rename RequestBatchBuffer methods from 'flushable' to 'extract' to clarify it doesn't do the flushing (#6485)
* Rename RequestBatchBuffer methods from 'flushable' to 'extract' to clarify it doesn't do the flushing * Handled review comment
1 parent 4d9db75 commit 436a19b

File tree

4 files changed

+49
-49
lines changed

4 files changed

+49
-49
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,18 @@ public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>>
6767
batchContextMap.forEach(action);
6868
}
6969

70-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey) {
71-
return batchContextMap.get(batchKey).flushableRequests();
70+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfReady(String batchKey) {
71+
return batchContextMap.get(batchKey).extractBatchIfReady();
7272
}
7373

74-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(String batchKey,
74+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfSizeExceeded(String batchKey,
7575
RequestT request) {
76-
return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request);
76+
return batchContextMap.get(batchKey).extractBatchIfSizeExceeded(request);
7777
}
7878

79-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
79+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractEntriesForScheduledFlush(String batchKey,
8080
int maxBatchItems) {
81-
return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems);
81+
return batchContextMap.get(batchKey).extractEntriesForScheduledFlush(maxBatchItems);
8282
}
8383

8484
public void cancelScheduledFlush(String batchKey) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ public RequestBatchBuffer(ScheduledFuture<?> scheduledFlush,
6464
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
6565
}
6666

67-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests() {
67+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfReady() {
6868
synchronized (flushLock) {
6969
return (isByteSizeThresholdCrossed(0) || isMaxBatchSizeLimitReached())
70-
? extractFlushedEntries(maxBatchItems)
70+
? extractEntries(maxBatchItems)
7171
: Collections.emptyMap();
7272
}
7373
}
@@ -77,12 +77,12 @@ private boolean isMaxBatchSizeLimitReached() {
7777
return idToBatchContext.size() >= maxBatchItems;
7878
}
7979

80-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(RequestT request) {
80+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractBatchIfSizeExceeded(RequestT request) {
8181
synchronized (flushLock) {
8282
if (maxBatchSizeInBytes > 0 && !idToBatchContext.isEmpty()) {
8383
int incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0);
8484
if (isByteSizeThresholdCrossed(incomingRequestBytes)) {
85-
return extractFlushedEntries(maxBatchItems);
85+
return extractEntries(maxBatchItems);
8686
}
8787
}
8888
return Collections.emptyMap();
@@ -100,16 +100,16 @@ private boolean isByteSizeThresholdCrossed(int incomingRequestBytes) {
100100
return totalPayloadSize > maxBatchSizeInBytes;
101101
}
102102

103-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
103+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractEntriesForScheduledFlush(int maxBatchItems) {
104104
synchronized (flushLock) {
105105
if (!idToBatchContext.isEmpty()) {
106-
return extractFlushedEntries(maxBatchItems);
106+
return extractEntries(maxBatchItems);
107107
}
108108
return Collections.emptyMap();
109109
}
110110
}
111111

112-
private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractFlushedEntries(int maxBatchItems) {
112+
private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractEntries(int maxBatchItems) {
113113
LinkedHashMap<String, BatchingExecutionContext<RequestT, ResponseT>> requestEntries = new LinkedHashMap<>();
114114
String nextEntry;
115115
while (requestEntries.size() < maxBatchItems && hasNextBatchEntry()) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ public CompletableFuture<ResponseT> batchRequest(RequestT request) {
7272
String batchKey = getBatchKey(request);
7373
// Handle potential byte size overflow only if there are request in map and if feature enabled
7474
if (requestsAndResponsesMaps.contains(batchKey) && batchConfiguration.maxBatchBytesSize() > 0) {
75-
Optional.of(requestsAndResponsesMaps.flushableRequestsOnByteLimitBeforeAdd(batchKey, request))
76-
.filter(flushableRequests -> !flushableRequests.isEmpty())
77-
.ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests));
75+
Optional.of(requestsAndResponsesMaps.extractBatchIfSizeExceeded(batchKey, request))
76+
.filter(extractedEntries -> !extractedEntries.isEmpty())
77+
.ifPresent(extractedEntries -> manualFlushBuffer(batchKey, extractedEntries));
7878
}
7979

8080
// Add request and response to the map, scheduling a flush if necessary
@@ -86,9 +86,9 @@ public CompletableFuture<ResponseT> batchRequest(RequestT request) {
8686
response);
8787

8888
// Immediately flush if the batch is full
89-
Optional.of(requestsAndResponsesMaps.flushableRequests(batchKey))
90-
.filter(flushableRequests -> !flushableRequests.isEmpty())
91-
.ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests));
89+
Optional.of(requestsAndResponsesMaps.extractBatchIfReady(batchKey))
90+
.filter(extractedEntries -> !extractedEntries.isEmpty())
91+
.ifPresent(extractedEntries -> manualFlushBuffer(batchKey, extractedEntries));
9292

9393
} catch (Exception e) {
9494
response.completeExceptionally(e);
@@ -153,21 +153,21 @@ private ScheduledFuture<?> scheduleBufferFlush(String batchKey, long timeOutInMs
153153
}
154154

155155
private void performScheduledFlush(String batchKey) {
156-
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
157-
requestsAndResponsesMaps.flushableScheduledRequests(batchKey, maxBatchItems);
158-
if (!flushableRequests.isEmpty()) {
159-
flushBuffer(batchKey, flushableRequests);
156+
Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractedEntries =
157+
requestsAndResponsesMaps.extractEntriesForScheduledFlush(batchKey, maxBatchItems);
158+
if (!extractedEntries.isEmpty()) {
159+
flushBuffer(batchKey, extractedEntries);
160160
}
161161
}
162162

163163
public void close() {
164164
requestsAndResponsesMaps.forEach((batchKey, batchBuffer) -> {
165165
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
166-
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
167-
requestsAndResponsesMaps.flushableRequests(batchKey);
166+
Map<String, BatchingExecutionContext<RequestT, ResponseT>>
167+
extractedEntries = requestsAndResponsesMaps.extractBatchIfReady(batchKey);
168168

169-
while (!flushableRequests.isEmpty()) {
170-
flushBuffer(batchKey, flushableRequests);
169+
while (!extractedEntries.isEmpty()) {
170+
flushBuffer(batchKey, extractedEntries);
171171
}
172172

173173
});

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,23 +52,23 @@ void whenPutRequestThenBufferContainsRequest() {
5252
}
5353

5454
@Test
55-
void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() {
55+
void whenExtractBatchIfReadyThenReturnRequestsUpToMaxBatchItems() {
5656
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
5757
CompletableFuture<String> response = new CompletableFuture<>();
5858
batchBuffer.put("request1", response);
59-
Map<String, BatchingExecutionContext<String, String>> flushedRequests = batchBuffer.flushableRequests();
60-
assertEquals(1, flushedRequests.size());
61-
assertTrue(flushedRequests.containsKey("0"));
59+
Map<String, BatchingExecutionContext<String, String>> extractedEntries = batchBuffer.extractBatchIfReady();
60+
assertEquals(1, extractedEntries.size());
61+
assertTrue(extractedEntries.containsKey("0"));
6262
}
6363

6464
@Test
65-
void whenFlushableScheduledRequestsThenReturnAllRequests() {
65+
void whenExtractEntriesForScheduledFlushThenReturnAllRequests() {
6666
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
6767
CompletableFuture<String> response = new CompletableFuture<>();
6868
batchBuffer.put("request1", response);
69-
Map<String, BatchingExecutionContext<String, String>> flushedRequests = batchBuffer.flushableScheduledRequests(1);
70-
assertEquals(1, flushedRequests.size());
71-
assertTrue(flushedRequests.containsKey("0"));
69+
Map<String, BatchingExecutionContext<String, String>> extractedEntries = batchBuffer.extractEntriesForScheduledFlush(1);
70+
assertEquals(1, extractedEntries.size());
71+
assertTrue(extractedEntries.containsKey("0"));
7272
}
7373

7474
@Test
@@ -119,28 +119,28 @@ void whenClearBufferThenBufferIsEmpty() {
119119
}
120120

121121
@Test
122-
void whenExtractFlushedEntriesThenReturnCorrectEntries() {
122+
void whenExtractEntriesThenReturnCorrectEntries() {
123123
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
124124
for (int i = 0; i < 5; i++) {
125125
batchBuffer.put("request" + i, new CompletableFuture<>());
126126
}
127-
Map<String, BatchingExecutionContext<String, String>> flushedEntries = batchBuffer.flushableRequests();
128-
assertEquals(5, flushedEntries.size());
127+
Map<String, BatchingExecutionContext<String, String>> extractedEntries = batchBuffer.extractBatchIfReady();
128+
assertEquals(5, extractedEntries.size());
129129
}
130130

131131
@Test
132132
void whenHasNextBatchEntryThenReturnTrue() {
133133
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
134134
batchBuffer.put("request1", new CompletableFuture<>());
135-
assertTrue(batchBuffer.flushableRequests().containsKey("0"));
135+
assertTrue(batchBuffer.extractBatchIfReady().containsKey("0"));
136136
}
137137

138138

139139
@Test
140140
void whenNextBatchEntryThenReturnNextEntryId() {
141141
batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize);
142142
batchBuffer.put("request1", new CompletableFuture<>());
143-
assertEquals("0", batchBuffer.flushableRequests().keySet().iterator().next());
143+
assertEquals("0", batchBuffer.extractBatchIfReady().keySet().iterator().next());
144144
}
145145

146146
@Test
@@ -151,9 +151,9 @@ void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() {
151151
batchBuffer.put(SendMessageRequest.builder().build(),
152152
new CompletableFuture<>());
153153
}
154-
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> flushedEntries =
155-
batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("Hi").build());
156-
assertEquals(0, flushedEntries.size());
154+
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> extractedEntries =
155+
batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("Hi").build());
156+
assertEquals(0, extractedEntries.size());
157157
}
158158

159159

@@ -166,9 +166,9 @@ void testFlushWhenPayloadExceedsMaxSize() {
166166
String largeMessageBody = createLargeString('a',245_760);
167167
batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(),
168168
new CompletableFuture<>());
169-
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> flushedEntries =
170-
batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build());
171-
assertEquals(1, flushedEntries.size());
169+
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> extractedEntries =
170+
batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("NewMessage").build());
171+
assertEquals(1, extractedEntries.size());
172172
}
173173

174174
@Test
@@ -181,11 +181,11 @@ void testFlushWhenCumulativePayloadExceedsMaxSize() {
181181
new CompletableFuture<>());
182182
batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(),
183183
new CompletableFuture<>());
184-
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> flushedEntries =
185-
batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build());
184+
Map<String, BatchingExecutionContext<SendMessageRequest, SendMessageResponse>> extractedEntries =
185+
batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("NewMessage").build());
186186

187187
//Flushes both the messages since thier sum is greater than 256Kb
188-
assertEquals(2, flushedEntries.size());
188+
assertEquals(2, extractedEntries.size());
189189
}
190190

191191

0 commit comments

Comments
 (0)