Skip to content

Commit 6d7bec8

Browse files
committed
Address Gabor's comments
1 parent 4f49de1 commit 6d7bec8

File tree

3 files changed

+23
-14
lines changed

3 files changed

+23
-14
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/InflightDataRescalingDescriptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.ObjectStreamException;
2121
import java.io.Serializable;
2222
import java.util.Arrays;
23+
import java.util.Collections;
2324
import java.util.Objects;
2425
import java.util.Set;
2526

@@ -116,7 +117,7 @@ public static class InflightDataGateOrPartitionRescalingDescriptor implements Se
116117
new InflightDataGateOrPartitionRescalingDescriptor(
117118
new int[0],
118119
RescaleMappings.identity(0, 0),
119-
java.util.Collections.emptySet(),
120+
Collections.emptySet(),
120121
MappingType.IDENTITY) {
121122

122123
private static final long serialVersionUID = 1L;

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -505,12 +505,12 @@ public boolean hasInFlightDataForInputGate(int gateIndex) {
505505
TaskStateAssignment upstreamAssignment = getUpstreamAssignments()[gateIndex];
506506
if (upstreamAssignment != null && upstreamAssignment.hasOutputState()) {
507507
IntermediateResult inputResult = executionJobVertex.getInputs().get(gateIndex);
508-
int partitionIndex =
509-
Arrays.asList(inputResult.getProducer().getProducedDataSets())
510-
.indexOf(inputResult);
511-
512-
if (partitionIndex != -1) {
513-
return upstreamAssignment.outputStatePartitions.contains(partitionIndex);
508+
IntermediateDataSetID resultId = inputResult.getId();
509+
IntermediateResult[] producedDataSets = inputResult.getProducer().getProducedDataSets();
510+
for (int i = 0; i < producedDataSets.length; i++) {
511+
if (producedDataSets[i].getId().equals(resultId)) {
512+
return upstreamAssignment.outputStatePartitions.contains(i);
513+
}
514514
}
515515
}
516516

@@ -529,11 +529,12 @@ public boolean hasInFlightDataForResultPartition(int partitionIndex) {
529529
if (downstreamAssignment != null && downstreamAssignment.hasInputState()) {
530530
IntermediateResult producedResult =
531531
executionJobVertex.getProducedDataSets()[partitionIndex];
532-
int gateIndex =
533-
downstreamAssignment.executionJobVertex.getInputs().indexOf(producedResult);
534-
535-
if (gateIndex != -1) {
536-
return downstreamAssignment.inputStateGates.contains(gateIndex);
532+
IntermediateDataSetID resultId = producedResult.getId();
533+
List<IntermediateResult> inputs = downstreamAssignment.executionJobVertex.getInputs();
534+
for (int i = 0; i < inputs.size(); i++) {
535+
if (inputs.get(i).getId().equals(resultId)) {
536+
return downstreamAssignment.inputStateGates.contains(i);
537+
}
537538
}
538539
}
539540
return false;

flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,15 @@ private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String re
143143
temporaryFolder.newFolder().toURI().toString());
144144
conf.set(CheckpointingOptions.ENABLE_UNALIGNED, true);
145145
// Decrease the memory segment size to avoid the test is so slow for some reasons:
146-
// 1. Recovery phase needs to consume all inflight buffers
147-
// 2. Forward or rescale exchange does not support unaligned checkpoint.
146+
// 1. When a flink job recovers from unaligned checkpoint, it has to consume all inflight
147+
// buffers during recovery phase. The smaller the buffer size, the fewer records are
148+
// snapshotted during the checkpoint, resulting in fewer records are needed to be consumed
149+
// during recovery.
150+
// 2. Forward or rescale exchange does not support unaligned checkpoint, it means Forward
151+
// or rescale exchanges are still using aligned checkpoint even if unaligned checkpoint is
152+
// enabled. All buffers(records) before barrier must be consumed for aligned checkpoint.
153+
// The smaller the buffer size means the fewer records are needed to be consumed during
154+
// aligned checkpoint.
148155
conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("1 kb"));
149156
// To prevent the picked checkpoint is deleted
150157
conf.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 50);

0 commit comments

Comments
 (0)