From b13c51e0181bb4d46cd5681fa4cd5238684f9136 Mon Sep 17 00:00:00 2001 From: xuyang Date: Fri, 4 Jul 2025 15:56:00 +0800 Subject: [PATCH 1/2] [FLINK-38022][table] Throw exception in main thread in tests when any exception is thrown in async thread during looking up Furthermore, this pr is also trying to fix the potential bugs for ConcurrentModificationException while copying list in tests. --- .../operators/testutils/MockEnvironment.java | 12 +++ ...eyedTwoInputStreamOperatorTestHarness.java | 2 +- .../deltajoin/StreamingDeltaJoinOperator.java | 3 +- .../StreamingDeltaJoinOperatorTest.java | 91 ++++++++++++++++--- 4 files changed, 94 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index ec9bfcb917de9..1b4a7bb817252 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -72,6 +72,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.function.Consumer; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.util.Preconditions.checkArgument; @@ -134,6 +135,8 @@ public class MockEnvironment implements Environment, AutoCloseable { private Optional actualExternalFailureCause = Optional.empty(); + private Optional> externalFailureCauseConsumer = Optional.empty(); + private final TaskMetricGroup taskMetricGroup; private final ExternalResourceInfoProvider externalResourceInfoProvider; @@ -413,6 +416,11 @@ public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() { @Override public void failExternally(Throwable cause) { + if (externalFailureCauseConsumer.isPresent()) { + externalFailureCauseConsumer.get().accept(cause); + return; + } + if (!expectedExternalFailureCause.isPresent()) { throw new UnsupportedOperationException( "MockEnvironment does not support external task failure."); @@ -483,4 +491,8 @@ public void setExpectedExternalFailureCause(Class expectedT public Optional getActualExternalFailureCause() { return actualExternalFailureCause; } + + public void setExternalFailureCauseConsumer(Consumer externalFailureCauseConsumer) { + this.externalFailureCauseConsumer = Optional.of(externalFailureCauseConsumer); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index d3b1559581511..f4e4e9ef90706 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -106,7 +106,7 @@ public int numKeyedStateEntries() { } } - public void endAllInput() throws Exception { + public void endAllInputs() throws Exception { TwoInputStreamOperator op = (TwoInputStreamOperator) operator; if (op instanceof BoundedMultiInput) { ((BoundedMultiInput) op).endInput(1); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java index 19b7a30d60390..61edefbc6182b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java @@ -143,9 +143,10 @@ public class StreamingDeltaJoinOperator private final boolean[] isInputEnded; - // ---------------------------- Metrics ----------------------------------- private final transient AtomicInteger totalInflightNum = new AtomicInteger(0); + // ---------------------------- Metrics ----------------------------------- + private final transient AtomicLong asyncIOTime = new AtomicLong(Long.MIN_VALUE); /** diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java index bd92b6cb950d6..fb61c72cc38d2 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java @@ -61,6 +61,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -71,6 +72,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test class for {@link StreamingDeltaJoinOperator}. */ public class StreamingDeltaJoinOperatorTest { @@ -155,12 +157,22 @@ VarCharType.STRING_TYPE, new IntType(), new BooleanType() private RowDataHarnessAssertor assertor; + private Optional latestException = Optional.empty(); + @BeforeEach public void beforeEach() throws Exception { testHarness = createDeltaJoinOperatorTestHarness(); testHarness.setup(); testHarness.open(); StreamingDeltaJoinOperator operator = unwrapOperator(testHarness); + // set external failure cause consumer to prevent hang + testHarness + .getEnvironment() + .setExternalFailureCauseConsumer( + error -> { + latestException = Optional.of(error); + // DO NOT throw exception up again to avoid hang + }); operator.setAsyncExecutionController( new MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController())); prepareOperatorRuntimeInfo(operator); @@ -199,6 +211,8 @@ public void afterEach() throws Exception { testHarness.close(); leftTableCurrentData.clear(); rightTableCurrentData.clear(); + latestException = Optional.empty(); + MyAsyncFunction.clearException(); } @Test @@ -232,7 +246,7 @@ void testJoinBothAppendOnlyTables() throws Exception { testHarness.processElement2(rightRecord4); testHarness.processElement2(rightRecord5); - testHarness.endAllInput(); + waitAllDataProcessed(); final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); @@ -310,7 +324,7 @@ void testBlockingWithSameJoinKey() throws Exception { MyAsyncFunction.release(); - testHarness.endAllInput(); + waitAllDataProcessed(); final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); expectedOutput.add(insertRecord(100, true, "jklk1", "jklk1", 300, true)); expectedOutput.add(insertRecord(100, false, "jklk2", "jklk2", 300, false)); @@ -366,7 +380,7 @@ void testTableDataVisibleBeforeJoin() throws Exception { testHarness.processElement1(leftRecord3); testHarness.processElement2(rightRecord3); - testHarness.endAllInput(); + waitAllDataProcessed(); final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); @@ -434,9 +448,12 @@ void testCheckpointAndRestore() throws Exception { // checkpointing OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); + + // release async function to avoid timeout when closing + MyAsyncFunction.release(); testHarness.close(); - assertThat(testHarness.getOutput()).isEmpty(); + MyAsyncFunction.block(); // restoring testHarness = createDeltaJoinOperatorTestHarness(); @@ -446,6 +463,7 @@ void testCheckpointAndRestore() throws Exception { operator.setAsyncExecutionController( new MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController())); + latestException = Optional.empty(); testHarness.initializeState(snapshot); testHarness.open(); @@ -462,7 +480,7 @@ void testCheckpointAndRestore() throws Exception { MyAsyncFunction.release(); - testHarness.endAllInput(); + waitAllDataProcessed(); final ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); expectedOutput.add(insertRecord(100, true, "jklk1", "jklk1", 300, true)); expectedOutput.add(insertRecord(200, true, "jklk1", "jklk1", 300, true)); @@ -504,7 +522,7 @@ void testClearLegacyStateWhenCheckpointing() throws Exception { assertThat(testHarness.numKeyedStateEntries()).isEqualTo(2); MyAsyncFunction.release(); - testHarness.endAllInput(); + waitAllDataProcessed(); MyAsyncFunction.block(); @@ -515,7 +533,7 @@ void testClearLegacyStateWhenCheckpointing() throws Exception { assertThat(testHarness.numKeyedStateEntries()).isEqualTo(1); MyAsyncFunction.release(); - testHarness.endAllInput(); + waitAllDataProcessed(); testHarness.snapshot(2L, 0L); assertThat(testHarness.numKeyedStateEntries()).isEqualTo(0); @@ -529,6 +547,33 @@ void testClearLegacyStateWhenCheckpointing() throws Exception { "result mismatch", expectedOutput, testHarness.getOutput()); } + @Test + void testMeetExceptionWhenLookup() throws Exception { + Throwable expectedException = new IllegalStateException("Mock to fail"); + MyAsyncFunction.throwException(expectedException); + + StreamRecord record = insertRecord(100, true, "jklk1"); + testHarness.processElement1(record); + + // IllegalStateException(Failed to wait all data processed) + // +- Exception(Could not complete the stream element ...) + // +- RuntimeException(Failed to lookup table) + // +- Actual Exception + assertThatThrownBy(this::waitAllDataProcessed) + .cause() + .cause() + .cause() + .isEqualTo(expectedException); + } + + private void waitAllDataProcessed() throws Exception { + testHarness.endAllInputs(); + if (latestException.isPresent()) { + throw new IllegalStateException( + "Failed to wait all data processed", latestException.get()); + } + } + private KeyedTwoInputStreamOperatorTestHarness createDeltaJoinOperatorTestHarness() throws Exception { TaskMailbox mailbox = new TaskMailboxImpl(); @@ -639,9 +684,13 @@ private static void insertTableData(StreamRecord record, boolean insert RowData rowData = record.getValue(); try { if (insertLeftTable) { - leftTableCurrentData.add(rowData); + synchronized (leftTableCurrentData) { + leftTableCurrentData.add(rowData); + } } else { - rightTableCurrentData.add(rowData); + synchronized (rightTableCurrentData) { + rightTableCurrentData.add(rowData); + } } } catch (Exception e) { throw new IllegalStateException("Failed to insert table data", e); @@ -664,6 +713,8 @@ public static class MyAsyncFunction extends RichAsyncFunction { private static final AtomicInteger rightInvokeCount = new AtomicInteger(0); + private static Optional expectedThrownException = Optional.empty(); + // ===== runtime info ===== private Boolean treatRightAsLookupTable; @@ -679,11 +730,23 @@ public static void release() { Objects.requireNonNull(lock).countDown(); } + public static void throwException(Throwable t) { + expectedThrownException = Optional.of(t); + } + + public static void clearException() { + expectedThrownException = Optional.empty(); + } + @Override public void asyncInvoke(final RowData input, final ResultFuture resultFuture) { executorService.submit( () -> { try { + if (expectedThrownException.isPresent()) { + throw expectedThrownException.get(); + } + if (lock != null) { lock.await(); } @@ -692,13 +755,17 @@ public void asyncInvoke(final RowData input, final ResultFuture resultFu RowDataKeySelector streamSideJoinKeySelector; RowDataKeySelector lookupSideJoinKeySelector; if (Objects.requireNonNull(treatRightAsLookupTable)) { - lookupTableData = new LinkedList<>(rightTableCurrentData); + synchronized (rightTableCurrentData) { + lookupTableData = new LinkedList<>(rightTableCurrentData); + } streamSideJoinKeySelector = leftJoinKeySelector.copy(); lookupSideJoinKeySelector = rightJoinKeySelector.copy(); leftInvokeCount.incrementAndGet(); } else { - lookupTableData = new LinkedList<>(leftTableCurrentData); + synchronized (leftTableCurrentData) { + lookupTableData = new LinkedList<>(leftTableCurrentData); + } streamSideJoinKeySelector = rightJoinKeySelector.copy(); lookupSideJoinKeySelector = leftJoinKeySelector.copy(); @@ -715,7 +782,7 @@ public void asyncInvoke(final RowData input, final ResultFuture resultFu } resultFuture.complete(results); - } catch (Exception e) { + } catch (Throwable e) { resultFuture.completeExceptionally( new RuntimeException("Failed to look up table", e)); } From f532383218b8badbe356dfef8a4b627d40c3ed9f Mon Sep 17 00:00:00 2001 From: xuyang Date: Mon, 7 Jul 2025 11:41:39 +0800 Subject: [PATCH 2/2] address comment --- .../join/deltajoin/StreamingDeltaJoinOperatorTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java index fb61c72cc38d2..e09450fd9f27f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java @@ -212,7 +212,7 @@ public void afterEach() throws Exception { leftTableCurrentData.clear(); rightTableCurrentData.clear(); latestException = Optional.empty(); - MyAsyncFunction.clearException(); + MyAsyncFunction.clearExpectedThrownException(); } @Test @@ -550,7 +550,7 @@ void testClearLegacyStateWhenCheckpointing() throws Exception { @Test void testMeetExceptionWhenLookup() throws Exception { Throwable expectedException = new IllegalStateException("Mock to fail"); - MyAsyncFunction.throwException(expectedException); + MyAsyncFunction.setExpectedThrownException(expectedException); StreamRecord record = insertRecord(100, true, "jklk1"); testHarness.processElement1(record); @@ -730,11 +730,11 @@ public static void release() { Objects.requireNonNull(lock).countDown(); } - public static void throwException(Throwable t) { + public static void setExpectedThrownException(Throwable t) { expectedThrownException = Optional.of(t); } - public static void clearException() { + public static void clearExpectedThrownException() { expectedThrownException = Optional.empty(); }