51
51
import org .apache .ratis .proto .RaftProtos ;
52
52
import org .apache .ratis .protocol .Message ;
53
53
import org .apache .ratis .protocol .RaftGroup ;
54
- import org .apache .ratis .protocol .RaftGroupId ;
55
54
import org .apache .ratis .protocol .RaftPeer ;
56
55
import org .apache .ratis .server .DivisionInfo ;
57
56
import org .apache .ratis .server .RaftServer ;
@@ -79,6 +78,7 @@ abstract class TestContainerStateMachine {
79
78
.setNameFormat ("ChunkWriter-" + i + "-%d" )
80
79
.build ())).collect (Collectors .toList ());
81
80
private final boolean isLeader ;
81
+ private final String CONTAINER_DATA = "Test Data" ;
82
82
83
83
TestContainerStateMachine (boolean isLeader ) {
84
84
this .isLeader = isLeader ;
@@ -101,8 +101,6 @@ public void setup() throws IOException {
101
101
when (division .getInfo ()).thenReturn (info );
102
102
when (info .isLeader ()).thenReturn (isLeader );
103
103
when (ratisServer .getServerDivision (any ())).thenReturn (division );
104
- stateMachine = new ContainerStateMachine (null ,
105
- RaftGroupId .randomId (), dispatcher , controller , executor , ratisServer , conf , "containerOp" );
106
104
}
107
105
108
106
@@ -131,79 +129,27 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
131
129
when (trx .getStateMachineContext ()).thenReturn (context );
132
130
133
131
setUpMockDispatcherReturn (failWithException );
134
- setUpMockRequestProtoReturn (context , "Test Data" , 1 , 1 );
132
+ setUpMockRequestProtoReturn (context , 1 , 1 );
135
133
136
- AtomicReference <Throwable > throwable = new AtomicReference <>(null );
137
- Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
134
+ ThrowableCatcher catcher = new ThrowableCatcher ();
138
135
139
- stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
136
+ stateMachine .write (entry , trx ).exceptionally (catcher . asSetter () ).get ();
140
137
verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
141
138
any (DispatcherContext .class ));
142
139
reset (dispatcher );
143
- assertNotNull (throwable . get ());
144
- assertResults (failWithException , throwable );
140
+ assertNotNull (catcher . getReceived ());
141
+ assertResults (failWithException , catcher . getCaught () );
145
142
146
143
// Writing data to another container(containerId 2) should also fail.
147
- setUpMockRequestProtoReturn (context , "Test Data" , 2 , 1 );
148
- stateMachine .write (entryNext , trx ).exceptionally (throwableSetter ).get ();
144
+ setUpMockRequestProtoReturn (context , 2 , 1 );
145
+ stateMachine .write (entryNext , trx ).exceptionally (catcher . asSetter () ).get ();
149
146
verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
150
147
any (DispatcherContext .class ));
151
- assertInstanceOf (StorageContainerException .class , throwable . get ());
152
- StorageContainerException sce = (StorageContainerException ) throwable . get ();
148
+ assertInstanceOf (StorageContainerException .class , catcher . getReceived ());
149
+ StorageContainerException sce = (StorageContainerException ) catcher . getReceived ();
153
150
assertEquals (ContainerProtos .Result .CONTAINER_UNHEALTHY , sce .getResult ());
154
151
}
155
152
156
- public final void setUpMockDispatcherReturn (boolean failWithException ) {
157
- if (failWithException ) {
158
- when (dispatcher .dispatch (any (), any ())).thenThrow (new RuntimeException ());
159
- } else {
160
- when (dispatcher .dispatch (any (), any ())).thenReturn (ContainerProtos .ContainerCommandResponseProto
161
- .newBuilder ().setCmdType (ContainerProtos .Type .WriteChunk )
162
- .setResult (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR )
163
- .build ());
164
- }
165
- }
166
-
167
- public final void setUpMockRequestProtoReturn (ContainerStateMachine .Context context , String content ,
168
- int containerId , int localId ) {
169
- when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
170
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
171
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 (content ))
172
- .setBlockID (
173
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (containerId )
174
- .setLocalID (localId ).build ()).build ())
175
- .setContainerID (containerId )
176
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
177
- }
178
-
179
- public final Function <Throwable , ? extends Message > getThrowableSetter (AtomicReference <Throwable > throwable ) {
180
- Function <Throwable , ? extends Message > throwableSetter = t -> {
181
- throwable .set (t );
182
- return null ;
183
- };
184
- return throwableSetter ;
185
- }
186
-
187
- public final void assertResults (boolean failWithException , AtomicReference <Throwable > throwable ) {
188
- if (failWithException ) {
189
- assertInstanceOf (RuntimeException .class , throwable .get ());
190
- } else {
191
- assertInstanceOf (StorageContainerException .class , throwable .get ());
192
- StorageContainerException sce = (StorageContainerException ) throwable .get ();
193
- assertEquals (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR , sce .getResult ());
194
- }
195
- }
196
-
197
- public final void setUpLogProtoReturn (ContainerStateMachine .Context context , int containerId , int localId ) {
198
- when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
199
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
200
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
201
- ContainerProtos .DatanodeBlockID .newBuilder ().
202
- setContainerID (containerId ).setLocalID (localId ).build ()).build ())
203
- .setContainerID (containerId )
204
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
205
- }
206
-
207
153
@ ParameterizedTest
208
154
@ ValueSource (booleans = {true , false })
209
155
public void testApplyTransactionFailure (boolean failWithException ) throws ExecutionException ,
@@ -219,36 +165,35 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
219
165
setUpMockDispatcherReturn (failWithException );
220
166
// Failing apply transaction on congtainer 1.
221
167
setUpLogProtoReturn (context , 1 , 1 );
222
- AtomicReference <Throwable > throwable = new AtomicReference <>(null );
223
- Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
168
+ ThrowableCatcher catcher = new ThrowableCatcher ();
224
169
//apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first
225
170
// failure on container 1.
226
- stateMachine .applyTransaction (trx ).exceptionally (throwableSetter ).get ();
171
+ stateMachine .applyTransaction (trx ).exceptionally (catcher . asSetter () ).get ();
227
172
verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
228
173
any (DispatcherContext .class ));
229
174
reset (dispatcher );
230
- assertNotNull (throwable . get ());
231
- assertResults (failWithException , throwable );
175
+ assertNotNull (catcher . getCaught ());
176
+ assertResults (failWithException , catcher . getCaught () );
232
177
// Another apply transaction on same container 1 should fail because the previous apply transaction failed.
233
- stateMachine .applyTransaction (trx ).exceptionally (throwableSetter ).get ();
178
+ stateMachine .applyTransaction (trx ).exceptionally (catcher . asSetter () ).get ();
234
179
verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
235
180
any (DispatcherContext .class ));
236
- assertInstanceOf (StorageContainerException .class , throwable . get ());
237
- StorageContainerException sce = (StorageContainerException ) throwable . get ();
181
+ assertInstanceOf (StorageContainerException .class , catcher . getReceived ());
182
+ StorageContainerException sce = (StorageContainerException ) catcher . getReceived ();
238
183
assertEquals (ContainerProtos .Result .CONTAINER_UNHEALTHY , sce .getResult ());
239
184
240
185
// Another apply transaction on a different container 2 shouldn't fail because the previous apply transaction
241
186
// failure was only on container 1.
242
187
setUpLogProtoReturn (context , 2 , 1 );
243
188
reset (dispatcher );
244
- throwable .set (null );
189
+ catcher . getCaught () .set (null );
245
190
when (dispatcher .dispatch (any (), any ())).thenReturn (ContainerProtos .ContainerCommandResponseProto
246
191
.newBuilder ().setCmdType (ContainerProtos .Type .WriteChunk ).setResult (ContainerProtos .Result .SUCCESS )
247
192
.build ());
248
- Message succcesfulTransaction = stateMachine .applyTransaction (trx ).exceptionally (throwableSetter ).get ();
193
+ Message succcesfulTransaction = stateMachine .applyTransaction (trx ).exceptionally (catcher . asSetter () ).get ();
249
194
verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
250
195
any (DispatcherContext .class ));
251
- assertNull (throwable . get ());
196
+ assertNull (catcher . getReceived ());
252
197
ContainerProtos .ContainerCommandResponseProto resp =
253
198
ContainerProtos .ContainerCommandResponseProto .parseFrom (succcesfulTransaction .getContent ());
254
199
assertEquals (ContainerProtos .Result .SUCCESS , resp .getResult ());
@@ -275,26 +220,90 @@ public void testWriteTimout() throws Exception {
275
220
return null ;
276
221
}).when (dispatcher ).dispatch (any (), any ());
277
222
278
- setUpMockRequestProtoReturn (context , "Test data" , 1 , 1 );
279
- AtomicReference <Throwable > throwable = new AtomicReference <>(null );
280
- Function <Throwable , ? extends Message > throwableSetter = t -> {
281
- throwable .set (t );
282
- return null ;
283
- };
223
+ setUpMockRequestProtoReturn (context , 1 , 1 );
224
+ ThrowableCatcher catcher = new ThrowableCatcher ();
225
+
284
226
Field writeChunkWaitMaxNs = stateMachine .getClass ().getDeclaredField ("writeChunkWaitMaxNs" );
285
227
writeChunkWaitMaxNs .setAccessible (true );
286
228
writeChunkWaitMaxNs .set (stateMachine , 1000_000_000 );
287
229
CompletableFuture <Message > firstWrite = stateMachine .write (entry , trx );
288
230
Thread .sleep (2000 );
289
231
CompletableFuture <Message > secondWrite = stateMachine .write (entryNext , trx );
290
- firstWrite .exceptionally (throwableSetter ).get ();
291
- assertNotNull (throwable . get ());
292
- assertInstanceOf (InterruptedException .class , throwable . get ());
293
-
294
- secondWrite .exceptionally (throwableSetter ).get ();
295
- assertNotNull (throwable . get ());
296
- assertInstanceOf (StorageContainerException .class , throwable . get ());
297
- StorageContainerException sce = (StorageContainerException ) throwable . get ();
232
+ firstWrite .exceptionally (catcher . asSetter () ).get ();
233
+ assertNotNull (catcher . getCaught ());
234
+ assertInstanceOf (InterruptedException .class , catcher . getReceived ());
235
+
236
+ secondWrite .exceptionally (catcher . asSetter () ).get ();
237
+ assertNotNull (catcher . getReceived ());
238
+ assertInstanceOf (StorageContainerException .class , catcher . getReceived ());
239
+ StorageContainerException sce = (StorageContainerException ) catcher . getReceived ();
298
240
assertEquals (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR , sce .getResult ());
299
241
}
242
+
243
+ private void setUpMockDispatcherReturn (boolean failWithException ) {
244
+ if (failWithException ) {
245
+ when (dispatcher .dispatch (any (), any ())).thenThrow (new RuntimeException ());
246
+ } else {
247
+ when (dispatcher .dispatch (any (), any ())).thenReturn (ContainerProtos .ContainerCommandResponseProto
248
+ .newBuilder ().setCmdType (ContainerProtos .Type .WriteChunk )
249
+ .setResult (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR )
250
+ .build ());
251
+ }
252
+ }
253
+
254
+ private void setUpMockRequestProtoReturn (ContainerStateMachine .Context context ,
255
+ int containerId , int localId ) {
256
+ when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
257
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
258
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 (CONTAINER_DATA ))
259
+ .setBlockID (
260
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (containerId )
261
+ .setLocalID (localId ).build ()).build ())
262
+ .setContainerID (containerId )
263
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
264
+ }
265
+
266
+ private void assertResults (boolean failWithException , AtomicReference <Throwable > throwable ) {
267
+ if (failWithException ) {
268
+ assertInstanceOf (RuntimeException .class , throwable .get ());
269
+ } else {
270
+ assertInstanceOf (StorageContainerException .class , throwable .get ());
271
+ StorageContainerException sce = (StorageContainerException ) throwable .get ();
272
+ assertEquals (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR , sce .getResult ());
273
+ }
274
+ }
275
+
276
+ private void setUpLogProtoReturn (ContainerStateMachine .Context context , int containerId , int localId ) {
277
+ when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
278
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
279
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
280
+ ContainerProtos .DatanodeBlockID .newBuilder ().
281
+ setContainerID (containerId ).setLocalID (localId ).build ()).build ())
282
+ .setContainerID (containerId )
283
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
284
+ }
285
+
286
+ private static class ThrowableCatcher {
287
+
288
+ private final AtomicReference <Throwable > caught = new AtomicReference <>(null );
289
+
290
+ public Function <Throwable , ? extends Message > asSetter () {
291
+ return t -> {
292
+ caught .set (t );
293
+ return null ;
294
+ };
295
+ }
296
+
297
+ public AtomicReference <Throwable > getCaught () {
298
+ return caught ;
299
+ }
300
+
301
+ public Throwable getReceived () {
302
+ return caught .get ();
303
+ }
304
+
305
+ public void reset () {
306
+ caught .set (null );
307
+ }
308
+ }
300
309
}
0 commit comments