@@ -128,7 +128,7 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
128
128
when (trx .getStateMachineContext ()).thenReturn (context );
129
129
130
130
setUpMockDispatcherReturn (failWithException );
131
- setUpMockRequestProto (context );
131
+ setUpMockRequestProtoReturn (context , "Test Data" , 1 , 1 );
132
132
133
133
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
134
134
Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
@@ -141,13 +141,8 @@ public void testWriteFailure(boolean failWithException) throws ExecutionExceptio
141
141
assertResults (failWithException , throwable );
142
142
143
143
// Writing data to another container(containerId 2) should also fail.
144
- when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
145
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
146
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
147
- .setBlockID (
148
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
149
- .setContainerID (2 )
150
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
144
+ setUpMockRequestProtoReturn (context , "Test Data" , 2 , 1 );
145
+
151
146
stateMachine .write (entry , trx ).exceptionally (throwableSetter ).get ();
152
147
verify (dispatcher , times (0 )).dispatch (any (ContainerProtos .ContainerCommandRequestProto .class ),
153
148
any (DispatcherContext .class ));
@@ -167,22 +162,31 @@ public final void setUpMockDispatcherReturn(boolean failWithException) {
167
162
}
168
163
}
169
164
170
- public final void setUpMockRequestProto (ContainerStateMachine .Context context ) {
165
+ public final void setUpMockRequestProtoReturn (ContainerStateMachine .Context context , String content ,
166
+ int containerId , int localId ) {
171
167
when (context .getRequestProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
172
168
.setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
173
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 ("Test Data" ))
169
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setData (ByteString .copyFromUtf8 (content ))
174
170
.setBlockID (
175
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
176
- .setContainerID (1 )
171
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (containerId ).setLocalID (localId ).build ()).build ())
172
+ .setContainerID (containerId )
173
+ .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
174
+ }
175
+
176
+ public final void setUpMockLogProtoReturn (ContainerStateMachine .Context context , int containerId , int localId ) {
177
+ when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
178
+ .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
179
+ ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
180
+ ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (containerId ).setLocalID (localId ).build ()).build ())
181
+ .setContainerID (containerId )
177
182
.setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
178
183
}
179
184
180
185
public final Function <Throwable , ? extends Message > getThrowableSetter (AtomicReference <Throwable > throwable ) {
181
- Function < Throwable , ? extends Message > throwableSetter = t -> {
186
+ return t -> {
182
187
throwable .set (t );
183
188
return null ;
184
189
};
185
- return throwableSetter ;
186
190
}
187
191
188
192
public final void assertResults (boolean failWithException , AtomicReference <Throwable > throwable ) {
@@ -209,12 +213,8 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
209
213
210
214
setUpMockDispatcherReturn (failWithException );
211
215
// Failing apply transaction on congtainer 1.
212
- when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
213
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
214
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
215
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (1 ).setLocalID (1 ).build ()).build ())
216
- .setContainerID (1 )
217
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
216
+ setUpMockLogProtoReturn (context , 1 , 1 );
217
+
218
218
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
219
219
Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
220
220
//apply transaction will fail because of runtime exception thrown by dispatcher, which marks the first
@@ -235,12 +235,7 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
235
235
236
236
// Another apply transaction on a different container 2 shouldn't fail because the previous apply transaction
237
237
// failure was only on container 1.
238
- when (context .getLogProto ()).thenReturn (ContainerProtos .ContainerCommandRequestProto .newBuilder ()
239
- .setCmdType (ContainerProtos .Type .WriteChunk ).setWriteChunk (
240
- ContainerProtos .WriteChunkRequestProto .newBuilder ().setBlockID (
241
- ContainerProtos .DatanodeBlockID .newBuilder ().setContainerID (2 ).setLocalID (1 ).build ()).build ())
242
- .setContainerID (2 )
243
- .setDatanodeUuid (UUID .randomUUID ().toString ()).build ());
238
+ setUpMockLogProtoReturn (context , 2 , 1 );
244
239
245
240
reset (dispatcher );
246
241
throwable .set (null );
@@ -277,7 +272,7 @@ public void testWriteTimout() throws Exception {
277
272
return null ;
278
273
}).when (dispatcher ).dispatch (any (), any ());
279
274
280
- setUpMockRequestProto (context );
275
+ setUpMockRequestProtoReturn (context , "Test Data" , 1 , 1 );
281
276
AtomicReference <Throwable > throwable = new AtomicReference <>(null );
282
277
Function <Throwable , ? extends Message > throwableSetter = getThrowableSetter (throwable );
283
278
Field writeChunkWaitMaxNs = stateMachine .getClass ().getDeclaredField ("writeChunkWaitMaxNs" );
0 commit comments