Skip to content

Commit 7c9c80d

Browse files
committed
Fix memory leak when allocation failure in IoTConsensus queue.
1 parent 80d5aea commit 7c9c80d

File tree

2 files changed

+194
-0
lines changed

2 files changed

+194
-0
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ private boolean reserve(long size, boolean fromQueue) {
9898
result = queueMemorySizeInByte.addAndGet(size) < maxMemorySizeForQueueInByte;
9999
if (!result) {
100100
queueMemorySizeInByte.addAndGet(-size);
101+
memorySizeInByte.addAndGet(-size);
101102
}
102103
} else {
103104
syncMemorySizeInByte.addAndGet(size);
@@ -172,6 +173,16 @@ long getSyncMemorySizeInByte() {
172173
return syncMemorySizeInByte.get();
173174
}
174175

176+
@TestOnly
177+
public Long getMaxMemorySizeInByte() {
178+
return maxMemorySizeInByte;
179+
}
180+
181+
@TestOnly
182+
public Long getMaxMemorySizeForQueueInByte() {
183+
return maxMemorySizeForQueueInByte;
184+
}
185+
175186
private static final IoTConsensusMemoryManager INSTANCE = new IoTConsensusMemoryManager();
176187

177188
public static IoTConsensusMemoryManager getInstance() {
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.consensus.iot.logdispatcher;
21+
22+
import static org.junit.Assert.assertEquals;
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertTrue;
25+
26+
import java.nio.ByteBuffer;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
31+
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
32+
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
33+
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
34+
import org.junit.Test;
35+
36+
public class IoTConsensusMemoryManagerTest {
37+
38+
@Test
39+
public void testAllocateQueue() {
40+
IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance();
41+
long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte();
42+
43+
long occupiedMemory = 0;
44+
IndexedConsensusRequest request;
45+
List<IndexedConsensusRequest> requestList = new ArrayList<>();
46+
while (occupiedMemory <= maxMemory) {
47+
request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest(
48+
ByteBuffer.wrap(new byte[4 * 1024 * 1024]))));
49+
request.buildSerializedRequests();
50+
long requestSize = request.getMemorySize();
51+
if (occupiedMemory + requestSize < maxMemory) {
52+
boolean reserved = memoryManager.reserve(request);
53+
assertTrue(reserved);
54+
occupiedMemory += requestSize;
55+
assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte());
56+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
57+
requestList.add(request);
58+
} else {
59+
assertFalse(memoryManager.reserve(request));
60+
break;
61+
}
62+
}
63+
assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
64+
65+
for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
66+
memoryManager.free(indexedConsensusRequest);
67+
occupiedMemory -= indexedConsensusRequest.getMemorySize();
68+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
69+
assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte());
70+
}
71+
}
72+
73+
@Test
74+
public void testAllocateBatch() {
75+
IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance();
76+
long maxMemory = memoryManager.getQueueMemorySizeInByte();
77+
78+
long occupiedMemory = 0;
79+
80+
Batch batch;
81+
int batchSize = 5;
82+
List<Batch> batchList = new ArrayList<>();
83+
while (occupiedMemory < maxMemory) {
84+
batch = new Batch(IoTConsensusConfig.newBuilder().build());
85+
for (int i = 0; i < batchSize; i++) {
86+
IndexedConsensusRequest request;
87+
request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest(
88+
ByteBuffer.wrap(new byte[1024 * 1024]))));
89+
batch.addTLogEntry(new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false, request.getMemorySize()));
90+
}
91+
92+
long requestSize = batch.getMemorySize();
93+
if (occupiedMemory + requestSize < maxMemory) {
94+
assertTrue(memoryManager.reserve(batch));
95+
occupiedMemory += requestSize;
96+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
97+
batchList.add(batch);
98+
} else {
99+
assertFalse(memoryManager.reserve(batch));
100+
}
101+
}
102+
assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
103+
104+
for (Batch b : batchList) {
105+
memoryManager.free(b);
106+
occupiedMemory -= b.getMemorySize();
107+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
108+
}
109+
}
110+
111+
@Test
112+
public void testAllocateMixed() {
113+
IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance();
114+
long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte();
115+
116+
long occupiedMemory = 0;
117+
IndexedConsensusRequest request;
118+
List<IndexedConsensusRequest> requestList = new ArrayList<>();
119+
Batch batch;
120+
int batchSize = 5;
121+
List<Batch> batchList = new ArrayList<>();
122+
123+
int i = 0;
124+
while (occupiedMemory <= maxMemory) {
125+
if (i % 2 == 0) {
126+
request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest(
127+
ByteBuffer.wrap(new byte[4 * 1024 * 1024]))));
128+
request.buildSerializedRequests();
129+
long requestSize = request.getMemorySize();
130+
if (occupiedMemory + requestSize < maxMemory) {
131+
boolean reserved = memoryManager.reserve(request);
132+
assertTrue(reserved);
133+
occupiedMemory += requestSize;
134+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
135+
requestList.add(request);
136+
} else {
137+
assertFalse(memoryManager.reserve(request));
138+
break;
139+
}
140+
} else {
141+
batch = new Batch(IoTConsensusConfig.newBuilder().build());
142+
for (int j = 0; j < batchSize; j++) {
143+
IndexedConsensusRequest batchRequest;
144+
batchRequest = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest(
145+
ByteBuffer.wrap(new byte[1024 * 1024]))));
146+
batch.addTLogEntry(new TLogEntry(batchRequest.getSerializedRequests(), batchRequest.getSearchIndex(), false, batchRequest.getMemorySize()));
147+
}
148+
149+
long requestSize = batch.getMemorySize();
150+
if (occupiedMemory + requestSize < maxMemory) {
151+
assertTrue(memoryManager.reserve(batch));
152+
occupiedMemory += requestSize;
153+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
154+
batchList.add(batch);
155+
} else {
156+
assertFalse(memoryManager.reserve(batch));
157+
}
158+
}
159+
i ++;
160+
}
161+
assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
162+
163+
while (!requestList.isEmpty() || !batchList.isEmpty()) {
164+
if (!requestList.isEmpty()) {
165+
request = requestList.remove(0);
166+
memoryManager.free(request);
167+
occupiedMemory -= request.getMemorySize();
168+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
169+
i --;
170+
}
171+
if (!batchList.isEmpty()) {
172+
batch = batchList.remove(0);
173+
memoryManager.free(batch);
174+
occupiedMemory -= batch.getMemorySize();
175+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
176+
i --;
177+
}
178+
}
179+
assertEquals(0, i);
180+
assertEquals(0, memoryManager.getMemorySizeInByte());
181+
assertEquals(0, memoryManager.getQueueMemorySizeInByte());
182+
}
183+
}

0 commit comments

Comments
 (0)