Skip to content

Commit 13fe0e6

Browse files
committed
[FLINK-36455] Deprecate async parts of committable summary
Without async parts of committable summary, number of pending committables will always be 0. Failed committables will also be 0 as they will throw an error if unexpected or not they are silently ignored. The previous behavior with them being >0 actually led to infinite loops in the global committer.
1 parent 21c344c commit 13fe0e6

File tree

18 files changed

+150
-210
lines changed

18 files changed

+150
-210
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java

-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ private void appendCompactingResultsToSummary(CommittableSummary<FileSinkCommitt
176176
summary.getNumberOfSubtasks(),
177177
summary.getCheckpointIdOrEOI(),
178178
summary.getNumberOfCommittables() + results.size(),
179-
summary.getNumberOfPendingCommittables() + results.size(),
180179
summary.getNumberOfFailedCommittables())));
181180
for (FileSinkCommittable committable : results) {
182181
output.collect(

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactorOperatorTest.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ void testCompact() throws Exception {
9898
// 1summary+1compacted+2cleanup
9999
ListAssert<CommittableMessage<FileSinkCommittable>> results =
100100
assertThat(harness.extractOutputValues()).hasSize(4);
101-
results.element(0, as(committableSummary())).hasPendingCommittables(3);
102101
results.element(1, as(committableWithLineage()))
103102
.hasCommittable(committable("0", "compacted-0", 10));
104103
results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".0"));
@@ -140,7 +139,6 @@ void testPassthrough() throws Exception {
140139

141140
ListAssert<CommittableMessage<FileSinkCommittable>> messages =
142141
assertThat(harness.extractOutputValues()).hasSize(3);
143-
messages.element(0, as(committableSummary())).hasPendingCommittables(2);
144142
messages.element(1, as(committableWithLineage()))
145143
.hasCommittable(cleanupInprogressRequest);
146144
messages.element(2, as(committableWithLineage())).hasCommittable(cleanupPathRequest);
@@ -207,13 +205,13 @@ void testRestore() throws Exception {
207205
// 1summary+1compacted+2cleanup * 2
208206
ListAssert<CommittableMessage<FileSinkCommittable>> results =
209207
assertThat(harness.extractOutputValues()).hasSize(8);
210-
results.element(0, as(committableSummary())).hasPendingCommittables(3);
208+
results.element(0, as(committableSummary()));
211209
results.element(1, as(committableWithLineage()))
212210
.hasCommittable(committable("0", "compacted-0", 10));
213211
results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".0"));
214212
results.element(3, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".1"));
215213

216-
results.element(4, as(committableSummary())).hasPendingCommittables(3);
214+
results.element(4, as(committableSummary()));
217215
results.element(5, as(committableWithLineage()))
218216
.hasCommittable(committable("0", "compacted-2", 10));
219217
results.element(6, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".2"));
@@ -311,7 +309,7 @@ void testStateHandler() throws Exception {
311309
// + 1 summary + 1 cleanup + 1 summary
312310
ListAssert<CommittableMessage<FileSinkCommittable>> results =
313311
assertThat(harness.extractOutputValues()).hasSize(18);
314-
results.element(0, as(committableSummary())).hasPendingCommittables(14);
312+
results.element(0, as(committableSummary()));
315313

316314
List<FileSinkCommittable> expectedResult =
317315
Arrays.asList(
@@ -335,11 +333,11 @@ void testStateHandler() throws Exception {
335333
.hasCommittable(expectedResult.get(i));
336334
}
337335

338-
results.element(15, as(committableSummary())).hasPendingCommittables(1);
336+
results.element(15, as(committableSummary()));
339337
results.element(16, as(committableWithLineage()))
340338
.hasCommittable(cleanupPath("0", ".6"));
341339

342-
results.element(17, as(committableSummary())).hasPendingCommittables(3);
340+
results.element(17, as(committableSummary()));
343341
}
344342
}
345343

@@ -378,7 +376,7 @@ void testStateHandlerRestore() throws Exception {
378376

379377
ListAssert<CommittableMessage<FileSinkCommittable>> results =
380378
assertThat(harness.extractOutputValues()).hasSize(3);
381-
results.element(0, as(committableSummary())).hasPendingCommittables(4);
379+
results.element(0, as(committableSummary()));
382380
results.element(1, as(committableWithLineage()))
383381
.hasCommittable(committable("0", "compacted-1", 1));
384382
results.element(2, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".1"));
@@ -437,7 +435,7 @@ void testStateHandlerRestore() throws Exception {
437435

438436
ListAssert<CommittableMessage<FileSinkCommittable>> results =
439437
assertThat(harness.extractOutputValues()).hasSize(2);
440-
results.element(0, as(committableSummary())).hasPendingCommittables(1);
438+
results.element(0, as(committableSummary()));
441439
results.element(1, as(committableWithLineage())).hasCommittable(cleanupPath("0", ".2"));
442440
}
443441
}

flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,29 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> {
3939
private final long checkpointId;
4040
/** The number of committables coming from the given subtask in the particular checkpoint. */
4141
private final int numberOfCommittables;
42+
43+
@Deprecated
4244
/** The number of committables that have not been successfully committed. */
4345
private final int numberOfPendingCommittables;
4446
/** The number of committables that are not retried and have been failed. */
4547
private final int numberOfFailedCommittables;
4648

49+
public CommittableSummary(
50+
int subtaskId,
51+
int numberOfSubtasks,
52+
long checkpointId,
53+
int numberOfCommittables,
54+
int numberOfFailedCommittables) {
55+
this(
56+
subtaskId,
57+
numberOfSubtasks,
58+
checkpointId,
59+
numberOfCommittables,
60+
0,
61+
numberOfFailedCommittables);
62+
}
63+
64+
@Deprecated
4765
public CommittableSummary(
4866
int subtaskId,
4967
int numberOfSubtasks,
@@ -75,8 +93,9 @@ public int getNumberOfCommittables() {
7593
return numberOfCommittables;
7694
}
7795

96+
@Deprecated
7897
public int getNumberOfPendingCommittables() {
79-
return numberOfPendingCommittables;
98+
return 0;
8099
}
81100

82101
public int getNumberOfFailedCommittables() {
@@ -89,7 +108,6 @@ public <NewCommT> CommittableSummary<NewCommT> map() {
89108
numberOfSubtasks,
90109
checkpointId,
91110
numberOfCommittables,
92-
numberOfPendingCommittables,
93111
numberOfFailedCommittables);
94112
}
95113

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,7 @@ private void emit(CheckpointCommittableManager<CommT> committableManager) {
196196
numberOfSubtasks,
197197
checkpointId,
198198
committables.size(),
199-
0,
200-
0)));
199+
committableManager.getNumFailed())));
201200
for (CommT committable : committables) {
202201
output.collect(
203202
new StreamRecord<>(

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManager.java

+19
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,24 @@ public interface CheckpointCommittableManager<CommT> {
6666
void commit(Committer<CommT> committer, int maxRetries)
6767
throws IOException, InterruptedException;
6868

69+
/**
70+
* Returns the number of committables that have been successfully committed; that is, the
71+
* corresponding {@link org.apache.flink.api.connector.sink2.Committer.CommitRequest} was not
72+
* used to signal an error of any kind (retryable or not).
73+
*
74+
* @return number of successful committables
75+
*/
6976
Collection<CommT> getSuccessfulCommittables();
77+
78+
/**
79+
* Returns the number of committables that have failed with a known error. By the current
80+
* semantics of {@link
81+
* org.apache.flink.api.connector.sink2.Committer.CommitRequest#signalFailedWithKnownReason(Throwable)}
82+
* discards the committable but proceeds processing. The returned number should be emitted
83+
* downstream in a {@link org.apache.flink.streaming.api.connector.sink2.CommittableSummary},
84+
* such that downstream can assess if all committables have been processed.
85+
*
86+
* @return number of failed committables
87+
*/
88+
int getNumFailed();
7089
}

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java

+7
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ public Collection<CommT> getSuccessfulCommittables() {
166166
.collect(Collectors.toList());
167167
}
168168

169+
@Override
170+
public int getNumFailed() {
171+
return subtasksCommittableManagers.values().stream()
172+
.mapToInt(SubtaskCommittableManager::getNumFailed)
173+
.sum();
174+
}
175+
169176
Stream<CommitRequestImpl<CommT>> getPendingRequests() {
170177
return subtasksCommittableManagers.values().stream()
171178
.peek(this::assertReceivedAll)

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public SubtaskSimpleVersionedSerializer() {
207207

208208
@Override
209209
public int getVersion() {
210-
return 1;
210+
return 2;
211211
}
212212

213213
@Override
@@ -219,7 +219,6 @@ public byte[] serialize(SubtaskCommittableManager<CommT> subtask) throws IOExcep
219219
new ArrayList<>(subtask.getRequests()),
220220
out);
221221
out.writeInt(subtask.getNumCommittables());
222-
out.writeInt(subtask.getNumDrained());
223222
out.writeInt(subtask.getNumFailed());
224223
return out.getCopyOfBuffer();
225224
}
@@ -236,7 +235,7 @@ public SubtaskCommittableManager<CommT> deserialize(int version, byte[] serializ
236235
return new SubtaskCommittableManager<>(
237236
requests,
238237
in.readInt(),
239-
in.readInt(),
238+
version >= 2 ? 0 : in.readInt(),
240239
in.readInt(),
241240
subtaskId,
242241
checkNotNull(

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java

+15-59
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,9 @@
2525
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
2626

2727
import java.util.ArrayDeque;
28-
import java.util.ArrayList;
2928
import java.util.Collection;
3029
import java.util.Collections;
3130
import java.util.Deque;
32-
import java.util.Iterator;
33-
import java.util.List;
3431
import java.util.Objects;
3532
import java.util.stream.Collectors;
3633
import java.util.stream.Stream;
@@ -46,7 +43,7 @@ class SubtaskCommittableManager<CommT> {
4643
private final int numExpectedCommittables;
4744
private final long checkpointId;
4845
private final int subtaskId;
49-
private int numDrained;
46+
@Deprecated private int numDrained;
5047
private int numFailed;
5148
private final SinkCommitterMetricGroup metricGroup;
5249

@@ -65,6 +62,17 @@ class SubtaskCommittableManager<CommT> {
6562
metricGroup);
6663
}
6764

65+
SubtaskCommittableManager(
66+
Collection<CommitRequestImpl<CommT>> requests,
67+
int numExpectedCommittables,
68+
int numFailed,
69+
int subtaskId,
70+
long checkpointId,
71+
SinkCommitterMetricGroup metricGroup) {
72+
this(requests, numExpectedCommittables, 0, numFailed, subtaskId, checkpointId, metricGroup);
73+
}
74+
75+
@Deprecated
6876
SubtaskCommittableManager(
6977
Collection<CommitRequestImpl<CommT>> requests,
7078
int numExpectedCommittables,
@@ -98,7 +106,7 @@ void add(CommT committable) {
98106
* @return if all committables have been received
99107
*/
100108
boolean hasReceivedAll() {
101-
return getNumCommittables() == numExpectedCommittables;
109+
return getNumCommittables() == numExpectedCommittables + numFailed;
102110
}
103111

104112
/**
@@ -107,26 +115,15 @@ boolean hasReceivedAll() {
107115
* @return number of so far received committables
108116
*/
109117
int getNumCommittables() {
110-
return requests.size() + numDrained + numFailed;
111-
}
112-
113-
/**
114-
* Returns the number of still expected commits.
115-
*
116-
* <p>Either the committables are not yet received or the commit is still pending.
117-
*
118-
* @return number of still expected committables
119-
*/
120-
int getNumPending() {
121-
return numExpectedCommittables - (numDrained + numFailed);
118+
return requests.size();
122119
}
123120

124121
int getNumFailed() {
125122
return numFailed;
126123
}
127124

128125
boolean isFinished() {
129-
return getNumPending() == 0;
126+
return getPendingRequests().findAny().isEmpty();
130127
}
131128

132129
/**
@@ -145,43 +142,6 @@ Stream<CommT> getSuccessfulCommittables() {
145142
.map(CommitRequestImpl::getCommittable);
146143
}
147144

148-
/**
149-
* Iterates through all currently registered {@link #requests} and returns all {@link
150-
* CommittableWithLineage} that could be successfully committed.
151-
*
152-
* <p>Invoking this method does not yield the same {@link CommittableWithLineage} again. Once
153-
* retrieved they are not part of {@link #requests} anymore.
154-
*
155-
* @return list of {@link CommittableWithLineage}
156-
*/
157-
List<CommittableWithLineage<CommT>> drainCommitted() {
158-
List<CommittableWithLineage<CommT>> committed = new ArrayList<>(requests.size());
159-
for (Iterator<CommitRequestImpl<CommT>> iterator = requests.iterator();
160-
iterator.hasNext(); ) {
161-
CommitRequestImpl<CommT> request = iterator.next();
162-
if (!request.isFinished()) {
163-
continue;
164-
}
165-
if (request.getState() == CommitRequestState.FAILED) {
166-
numFailed += 1;
167-
iterator.remove();
168-
continue;
169-
} else {
170-
committed.add(
171-
new CommittableWithLineage<>(
172-
request.getCommittable(), checkpointId, subtaskId));
173-
}
174-
iterator.remove();
175-
}
176-
177-
numDrained += committed.size();
178-
return committed;
179-
}
180-
181-
int getNumDrained() {
182-
return numDrained;
183-
}
184-
185145
int getSubtaskId() {
186146
return subtaskId;
187147
}
@@ -202,7 +162,6 @@ SubtaskCommittableManager<CommT> merge(SubtaskCommittableManager<CommT> other) {
202162
Stream.concat(requests.stream(), other.requests.stream())
203163
.collect(Collectors.toList()),
204164
numExpectedCommittables + other.numExpectedCommittables,
205-
numDrained + other.numDrained,
206165
numFailed + other.numFailed,
207166
subtaskId,
208167
checkpointId,
@@ -213,7 +172,6 @@ SubtaskCommittableManager<CommT> copy() {
213172
return new SubtaskCommittableManager<>(
214173
requests.stream().map(CommitRequestImpl::copy).collect(Collectors.toList()),
215174
numExpectedCommittables,
216-
numDrained,
217175
numFailed,
218176
subtaskId,
219177
checkpointId,
@@ -254,8 +212,6 @@ public String toString() {
254212
+ checkpointId
255213
+ ", subtaskId="
256214
+ subtaskId
257-
+ ", numDrained="
258-
+ numDrained
259215
+ ", numFailed="
260216
+ numFailed
261217
+ '}';

flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void testCommittableWithLinageSerDe() throws IOException {
4747
@Test
4848
void testCommittableSummarySerDe() throws IOException {
4949
final CommittableSummary<Integer> committableSummary =
50-
new CommittableSummary<>(1, 2, 3L, 4, 5, 6);
50+
new CommittableSummary<>(1, 2, 3L, 4, 5);
5151
final CommittableMessage<Integer> message =
5252
SERIALIZER.deserialize(
5353
CommittableMessageSerializer.VERSION,
@@ -58,7 +58,5 @@ void testCommittableSummarySerDe() throws IOException {
5858
assertThat(copy.getNumberOfSubtasks()).isEqualTo(2);
5959
assertThat(copy.getCheckpointIdOrEOI()).isEqualTo(3L);
6060
assertThat(copy.getNumberOfCommittables()).isEqualTo(4);
61-
assertThat(copy.getNumberOfPendingCommittables()).isEqualTo(5);
62-
assertThat(copy.getNumberOfFailedCommittables()).isEqualTo(6);
6361
}
6462
}

flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java

-4
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,6 @@ public CommittableSummaryAssert<CommT> hasOverallCommittables(int committableNum
4040
return returns(committableNumber, CommittableSummary::getNumberOfCommittables);
4141
}
4242

43-
public CommittableSummaryAssert<CommT> hasPendingCommittables(int committableNumber) {
44-
return returns(committableNumber, CommittableSummary::getNumberOfPendingCommittables);
45-
}
46-
4743
public CommittableSummaryAssert<CommT> hasFailedCommittables(int committableNumber) {
4844
return returns(committableNumber, CommittableSummary::getNumberOfFailedCommittables);
4945
}

0 commit comments

Comments
 (0)