@@ -123,12 +123,15 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
123
123
RaftProtos .LogEntryProto entry = mock (RaftProtos .LogEntryProto .class );
124
124
when (entry .getTerm ()).thenReturn (1L );
125
125
when (entry .getIndex ()).thenReturn (1L );
126
+ RaftProtos .LogEntryProto entryNext = mock (RaftProtos .LogEntryProto .class );
127
+ when (entryNext .getTerm ()).thenReturn (1L );
128
+ when (entryNext .getIndex ()).thenReturn (2L );
126
129
TransactionContext trx = mock (TransactionContext .class );
127
130
ContainerStateMachine .Context context = mock (ContainerStateMachine .Context .class );
128
131
when (trx .getStateMachineContext ()).thenReturn (context );
129
132
130
133
setUpMockDispatcherReturn (failWithException );
131
- setUpMockRequestProtoReturn (context , "Test Data" , 1 , 1 );
134
+ setUpMockRequestProto (context );
132
135
133
136
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
134
137
Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
@@ -141,9 +144,14 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
141
144
assertResults (failWithException , throwable );
142
145
143
146
// Writing data to another container(containerId 2) should also fail.
144
- setUpMockRequestProtoReturn (context , "Test Data" , 2 , 1 );
145
-
146
- stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
147
+ when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
148
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
149
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
150
+ .setBlockID (
151
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
152
+ .setContainerID (2 )
153
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
154
+ stateMachine .write (entryNext , trx ).exceptionally (throwableSetter ).get ();
147
155
verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
148
156
any (DispatcherContext .class ));
149
157
assertInstanceOf (StorageContainerException .class , throwable .get ());
@@ -162,33 +170,22 @@ public final void setUpMockDispatcherReturn(boolean failWithException) {
162
170
}
163
171
}
164
172
165
- public final void setUpMockRequestProtoReturn (ContainerStateMachine .Context context , String content ,
166
- int containerId , int localId ) {
173
+ public final void setUpMockRequestProto (ContainerStateMachine .Context context ) {
167
174
when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
168
175
.setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
169
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 (content ))
176
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
170
177
.setBlockID (
171
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (containerId )
172
- .setLocalID (localId ).build ()).build ())
173
- .setContainerID (containerId )
174
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
175
- }
176
-
177
- public final void setUpMockLogProtoReturn (ContainerStateMachine .Context context , int containerId , int localId ) {
178
- when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
179
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
180
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
181
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (containerId )
182
- .setLocalID (localId ).build ()).build ())
183
- .setContainerID (containerId )
178
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
179
+ .setContainerID (1 )
184
180
.setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
185
181
}
186
182
187
183
public final Function <Throwable , ? extends Message > getThrowableSetter (AtomicReference <Throwable > throwable ) {
188
- return t -> {
184
+ Function < Throwable , ? extends Message > throwableSetter = t -> {
189
185
throwable .set (t );
190
186
return null ;
191
187
};
188
+ return throwableSetter ;
192
189
}
193
190
194
191
public final void assertResults (boolean failWithException , AtomicReference <Throwable > throwable ) {
@@ -215,8 +212,12 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
215
212
216
213
setUpMockDispatcherReturn (failWithException );
217
214
// Failing apply transaction on congtainer 1.
218
- setUpMockLogProtoReturn (context , 1 , 1 );
219
-
215
+ when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
216
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
217
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
218
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
219
+ .setContainerID (1 )
220
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
220
221
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
221
222
Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
222
223
//apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first
@@ -237,7 +238,12 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
237
238
238
239
// Another apply transaction on a different container 2 shouldn't fail because the previous apply transaction
239
240
// failure was only on container 1.
240
- setUpMockLogProtoReturn (context , 2 , 1 );
241
+ when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
242
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
243
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
244
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
245
+ .setContainerID (2 )
246
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
241
247
242
248
reset (dispatcher );
243
249
throwable .set (null );
@@ -274,9 +280,12 @@ public void testWriteTimout() throws Exception {
274
280
return null ;
275
281
}).when (dispatcher ).dispatch (any (), any ());
276
282
277
- setUpMockRequestProtoReturn (context , "Test Data" , 1 , 1 );
283
+ setUpMockRequestProto (context );
278
284
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
279
- Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
285
+ Function <Throwable , ? extends Message > throwableSetter = t -> {
286
+ throwable .set (t );
287
+ return null ;
288
+ };
280
289
Field writeChunkWaitMaxNs = stateMachine .getClass ().getDeclaredField ("writeChunkWaitMaxNs" );
281
290
writeChunkWaitMaxNs .setAccessible (true );
282
291
writeChunkWaitMaxNs .set (stateMachine , 1000_000_000 );
0 commit comments