@@ -129,6 +129,37 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
129
129
TransactionContext trx = mock (TransactionContext .class );
130
130
ContainerStateMachine .Context context = mock (ContainerStateMachine .Context .class );
131
131
when (trx .getStateMachineContext ()).thenReturn (context );
132
+
133
+ setUpMockDispatcherReturn (failWithException );
134
+ setUpMockRequestProto (context );
135
+
136
+ AtomicReference <Throwable > throwable = new AtomicReference <>(null );
137
+ Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
138
+
139
+ stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
140
+ verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
141
+ any (DispatcherContext .class ));
142
+ reset (dispatcher );
143
+ assertNotNull (throwable .get ());
144
+ assertResults (failWithException , throwable );
145
+
146
+ // Writing data to another container(containerId 2) should also fail.
147
+ when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
148
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
149
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
150
+ .setBlockID (
151
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
152
+ .setContainerID (2 )
153
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
154
+ stateMachine .write (entryNext , trx ).exceptionally (throwableSetter ).get ();
155
+ verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
156
+ any (DispatcherContext .class ));
157
+ assertInstanceOf (StorageContainerException .class , throwable .get ());
158
+ StorageContainerException sce = (StorageContainerException ) throwable .get ();
159
+ assertEquals (ContainerProtos .Result .CONTAINER_UNHEALTHY , sce .getResult ());
160
+ }
161
+
162
+ public final void setUpMockDispatcherReturn (boolean failWithException ) {
132
163
if (failWithException ) {
133
164
when (dispatcher .dispatch (any (), any ())).thenThrow (new RuntimeException ());
134
165
} else {
@@ -137,45 +168,34 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
137
168
.setResult (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR )
138
169
.build ());
139
170
}
171
+ }
140
172
173
+ public final void setUpMockRequestProto (ContainerStateMachine .Context context ) {
141
174
when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
142
175
.setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
143
176
ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
144
177
.setBlockID (
145
178
ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
146
179
.setContainerID (1 )
147
180
.setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
148
- AtomicReference <Throwable > throwable = new AtomicReference <>(null );
181
+ }
182
+
183
+ public final Function <Throwable , ? extends Message > getThrowableSetter (AtomicReference <Throwable > throwable ) {
149
184
Function <Throwable , ? extends Message > throwableSetter = t -> {
150
185
throwable .set (t );
151
186
return null ;
152
187
};
153
- stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
154
- verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
155
- any (DispatcherContext .class ));
156
- reset (dispatcher );
157
- assertNotNull (throwable .get ());
188
+ return throwableSetter ;
189
+ }
190
+
191
+ public final void assertResults (boolean failWithException , AtomicReference <Throwable > throwable ) {
158
192
if (failWithException ) {
159
193
assertInstanceOf (RuntimeException .class , throwable .get ());
160
194
} else {
161
195
assertInstanceOf (StorageContainerException .class , throwable .get ());
162
196
StorageContainerException sce = (StorageContainerException ) throwable .get ();
163
197
assertEquals (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR , sce .getResult ());
164
198
}
165
- // Writing data to another container(containerId 2) should also fail.
166
- when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
167
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
168
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
169
- .setBlockID (
170
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
171
- .setContainerID (2 )
172
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
173
- stateMachine .write (entryNext , trx ).exceptionally (throwableSetter ).get ();
174
- verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
175
- any (DispatcherContext .class ));
176
- assertInstanceOf (StorageContainerException .class , throwable .get ());
177
- StorageContainerException sce = (StorageContainerException ) throwable .get ();
178
- assertEquals (ContainerProtos .Result .CONTAINER_UNHEALTHY , sce .getResult ());
179
199
}
180
200
181
201
@ ParameterizedTest
@@ -189,14 +209,8 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
189
209
ContainerStateMachine .Context context = mock (ContainerStateMachine .Context .class );
190
210
when (trx .getLogEntry ()).thenReturn (entry );
191
211
when (trx .getStateMachineContext ()).thenReturn (context );
192
- if (failWithException ) {
193
- when (dispatcher .dispatch (any (), any ())).thenThrow (new RuntimeException ());
194
- } else {
195
- when (dispatcher .dispatch (any (), any ())).thenReturn (ContainerProtos .ContainerCommandResponseProto
196
- .newBuilder ().setCmdType (ContainerProtos .Type .WriteChunk )
197
- .setResult (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR )
198
- .build ());
199
- }
212
+
213
+ setUpMockDispatcherReturn (failWithException );
200
214
// Failing apply transaction on congtainer 1.
201
215
when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
202
216
.setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
@@ -205,24 +219,15 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
205
219
.setContainerID (1 )
206
220
.setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
207
221
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
208
- Function <Throwable , ? extends Message > throwableSetter = t -> {
209
- throwable .set (t );
210
- return null ;
211
- };
222
+ Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
212
223
//apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first
213
224
// failure on container 1.
214
225
stateMachine .applyTransaction (trx ).exceptionally (throwableSetter ).get ();
215
226
verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
216
227
any (DispatcherContext .class ));
217
228
reset (dispatcher );
218
229
assertNotNull (throwable .get ());
219
- if (failWithException ) {
220
- assertInstanceOf (RuntimeException .class , throwable .get ());
221
- } else {
222
- assertInstanceOf (StorageContainerException .class , throwable .get ());
223
- StorageContainerException sce = (StorageContainerException ) throwable .get ();
224
- assertEquals (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR , sce .getResult ());
225
- }
230
+ assertResults (failWithException , throwable );
226
231
// Another apply transaction on same container 1 should fail because the previous apply transaction failed.
227
232
stateMachine .applyTransaction (trx ).exceptionally (throwableSetter ).get ();
228
233
verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
@@ -275,13 +280,7 @@ public void testWriteTimout() throws Exception {
275
280
return null ;
276
281
}).when (dispatcher ).dispatch (any (), any ());
277
282
278
- when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
279
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
280
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
281
- .setBlockID (
282
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
283
- .setContainerID (1 )
284
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
283
+ setUpMockRequestProto (context );
285
284
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
286
285
Function <Throwable , ? extends Message > throwableSetter = t -> {
287
286
throwable .set (t );
0 commit comments