@@ -126,6 +126,37 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
126
126
TransactionContext trx = mock (TransactionContext .class );
127
127
ContainerStateMachine .Context context = mock (ContainerStateMachine .Context .class );
128
128
when (trx .getStateMachineContext ()).thenReturn (context );
129
+
130
+ setUpMockDispatcherReturn (failWithException );
131
+ setUpMockRequestProto (context );
132
+
133
+ AtomicReference <Throwable > throwable = new AtomicReference <>(null );
134
+ Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
135
+
136
+ stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
137
+ verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
138
+ any (DispatcherContext .class ));
139
+ reset (dispatcher );
140
+ assertNotNull (throwable .get ());
141
+ assertResults (failWithException , throwable );
142
+
143
+ // Writing data to another container(containerId 2) should also fail.
144
+ when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
145
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
146
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
147
+ .setBlockID (
148
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
149
+ .setContainerID (2 )
150
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
151
+ stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
152
+ verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
153
+ any (DispatcherContext .class ));
154
+ assertInstanceOf (StorageContainerException .class , throwable .get ());
155
+ StorageContainerException sce = (StorageContainerException ) throwable .get ();
156
+ assertEquals (ContainerProtos .Result .CONTAINER_UNHEALTHY , sce .getResult ());
157
+ }
158
+
159
+ public final void setUpMockDispatcherReturn (boolean failWithException ) {
129
160
if (failWithException ) {
130
161
when (dispatcher .dispatch (any (), any ())).thenThrow (new RuntimeException ());
131
162
} else {
@@ -134,45 +165,34 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
134
165
.setResult (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR )
135
166
.build ());
136
167
}
168
+ }
137
169
170
+ public final void setUpMockRequestProto (ContainerStateMachine .Context context ) {
138
171
when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
139
172
.setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
140
173
ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
141
174
.setBlockID (
142
175
ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
143
176
.setContainerID (1 )
144
177
.setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
145
- AtomicReference <Throwable > throwable = new AtomicReference <>(null );
178
+ }
179
+
180
+ public final Function <Throwable , ? extends Message > getThrowableSetter (AtomicReference <Throwable > throwable ) {
146
181
Function <Throwable , ? extends Message > throwableSetter = t -> {
147
182
throwable .set (t );
148
183
return null ;
149
184
};
150
- stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
151
- verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
152
- any (DispatcherContext .class ));
153
- reset (dispatcher );
154
- assertNotNull (throwable .get ());
185
+ return throwableSetter ;
186
+ }
187
+
188
+ public final void assertResults (boolean failWithException , AtomicReference <Throwable > throwable ) {
155
189
if (failWithException ) {
156
190
assertInstanceOf (RuntimeException .class , throwable .get ());
157
191
} else {
158
192
assertInstanceOf (StorageContainerException .class , throwable .get ());
159
193
StorageContainerException sce = (StorageContainerException ) throwable .get ();
160
194
assertEquals (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR , sce .getResult ());
161
195
}
162
- // Writing data to another container(containerId 2) should also fail.
163
- when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
164
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
165
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
166
- .setBlockID (
167
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
168
- .setContainerID (2 )
169
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
170
- stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
171
- verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
172
- any (DispatcherContext .class ));
173
- assertInstanceOf (StorageContainerException .class , throwable .get ());
174
- StorageContainerException sce = (StorageContainerException ) throwable .get ();
175
- assertEquals (ContainerProtos .Result .CONTAINER_UNHEALTHY , sce .getResult ());
176
196
}
177
197
178
198
@ ParameterizedTest
@@ -186,14 +206,8 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
186
206
ContainerStateMachine .Context context = mock (ContainerStateMachine .Context .class );
187
207
when (trx .getLogEntry ()).thenReturn (entry );
188
208
when (trx .getStateMachineContext ()).thenReturn (context );
189
- if (failWithException ) {
190
- when (dispatcher .dispatch (any (), any ())).thenThrow (new RuntimeException ());
191
- } else {
192
- when (dispatcher .dispatch (any (), any ())).thenReturn (ContainerProtos .ContainerCommandResponseProto
193
- .newBuilder ().setCmdType (ContainerProtos .Type .WriteChunk )
194
- .setResult (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR )
195
- .build ());
196
- }
209
+
210
+ setUpMockDispatcherReturn (failWithException );
197
211
// Failing apply transaction on congtainer 1.
198
212
when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
199
213
.setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
@@ -202,24 +216,15 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
202
216
.setContainerID (1 )
203
217
.setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
204
218
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
205
- Function <Throwable , ? extends Message > throwableSetter = t -> {
206
- throwable .set (t );
207
- return null ;
208
- };
219
+ Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
209
220
//apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first
210
221
// failure on container 1.
211
222
stateMachine .applyTransaction (trx ).exceptionally (throwableSetter ).get ();
212
223
verify (dispatcher , times (1 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
213
224
any (DispatcherContext .class ));
214
225
reset (dispatcher );
215
226
assertNotNull (throwable .get ());
216
- if (failWithException ) {
217
- assertInstanceOf (RuntimeException .class , throwable .get ());
218
- } else {
219
- assertInstanceOf (StorageContainerException .class , throwable .get ());
220
- StorageContainerException sce = (StorageContainerException ) throwable .get ();
221
- assertEquals (ContainerProtos .Result .CONTAINER_INTERNAL_ERROR , sce .getResult ());
222
- }
227
+ assertResults (failWithException , throwable );
223
228
// Another apply transaction on same container 1 should fail because the previous apply transaction failed.
224
229
stateMachine .applyTransaction (trx ).exceptionally (throwableSetter ).get ();
225
230
verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
@@ -272,13 +277,7 @@ public void testWriteTimout() throws Exception {
272
277
return null ;
273
278
}).when (dispatcher ).dispatch (any (), any ());
274
279
275
- when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
276
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
277
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
278
- .setBlockID (
279
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
280
- .setContainerID (1 )
281
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
280
+ setUpMockRequestProto (context );
282
281
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
283
282
Function <Throwable , ? extends Message > throwableSetter = t -> {
284
283
throwable .set (t );
0 commit comments