File tree Expand file tree Collapse file tree 2 files changed +24
-2
lines changed
main/java/org/apache/flink/runtime/checkpoint
test/java/org/apache/flink/runtime/checkpoint Expand file tree Collapse file tree 2 files changed +24
-2
lines changed Original file line number Diff line number Diff line change 1919package org .apache .flink .runtime .checkpoint ;
2020
2121import org .apache .flink .annotation .Internal ;
22+ import org .apache .flink .annotation .VisibleForTesting ;
2223import org .apache .flink .api .java .tuple .Tuple2 ;
2324import org .apache .flink .runtime .state .OperatorStateHandle ;
2425import org .apache .flink .runtime .state .OperatorStreamStateHandle ;
@@ -488,13 +489,16 @@ private static final class GroupByStateNameResults {
488489 }
489490 }
490491
491- private static final class StateEntry {
492+ @ VisibleForTesting
493+ static final class StateEntry {
492494 final List <Tuple2 <StreamStateHandle , OperatorStateHandle .StateMetaInfo >> entries ;
493495 final BitSet reportedSubtaskIndices ;
496+ final int parallelism ;
494497
495498 public StateEntry (int estimatedEntrySize , int parallelism ) {
496499 this .entries = new ArrayList <>(estimatedEntrySize );
497500 this .reportedSubtaskIndices = new BitSet (parallelism );
501+ this .parallelism = parallelism ;
498502 }
499503
500504 void addEntry (
@@ -506,7 +510,7 @@ void addEntry(
506510
507511 boolean isPartiallyReported () {
508512 return reportedSubtaskIndices .cardinality () > 0
509- && reportedSubtaskIndices .cardinality () < reportedSubtaskIndices . size () ;
513+ && reportedSubtaskIndices .cardinality () < parallelism ;
510514 }
511515 }
512516}
Original file line number Diff line number Diff line change 4747import org .apache .flink .testutils .TestingUtils ;
4848import org .apache .flink .testutils .executor .TestExecutorExtension ;
4949
50+ import org .junit .jupiter .api .Assertions ;
5051import org .junit .jupiter .api .Test ;
5152import org .junit .jupiter .api .extension .RegisterExtension ;
5253
@@ -162,6 +163,23 @@ void testRepartitionUnionState() {
162163 verifyOneKindPartitionableStateRescale (operatorState , operatorID );
163164 }
164165
166+ @ Test
167+ public void testPartiallyReported () {
168+ RoundRobinOperatorStateRepartitioner .StateEntry stateEntry =
169+ new RoundRobinOperatorStateRepartitioner .StateEntry (0 , 5 );
170+ stateEntry .addEntry (0 , null );
171+ stateEntry .addEntry (1 , null );
172+ stateEntry .addEntry (3 , null );
173+
174+ // assert partially report
175+ Assertions .assertTrue (stateEntry .isPartiallyReported ());
176+
177+ // assert fully report
178+ stateEntry .addEntry (2 , null );
179+ stateEntry .addEntry (4 , null );
180+ Assertions .assertFalse (stateEntry .isPartiallyReported ());
181+ }
182+
165183 @ Test
166184 void testRepartitionBroadcastState () {
167185 OperatorID operatorID = new OperatorID ();
You can’t perform that action at this time.
0 commit comments