Skip to content

[FLINK-38022][table] Throw exception in main thread in tests when any exception is thrown in async thread during looking up #26752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -134,6 +135,8 @@ public class MockEnvironment implements Environment, AutoCloseable {

private Optional<? extends Throwable> actualExternalFailureCause = Optional.empty();

private Optional<Consumer<Throwable>> externalFailureCauseConsumer = Optional.empty();

private final TaskMetricGroup taskMetricGroup;

private final ExternalResourceInfoProvider externalResourceInfoProvider;
Expand Down Expand Up @@ -413,6 +416,11 @@ public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {

@Override
public void failExternally(Throwable cause) {
if (externalFailureCauseConsumer.isPresent()) {
externalFailureCauseConsumer.get().accept(cause);
return;
}

if (!expectedExternalFailureCause.isPresent()) {
throw new UnsupportedOperationException(
"MockEnvironment does not support external task failure.");
Expand Down Expand Up @@ -483,4 +491,8 @@ public void setExpectedExternalFailureCause(Class<? extends Throwable> expectedT
public Optional<? extends Throwable> getActualExternalFailureCause() {
return actualExternalFailureCause;
}

public void setExternalFailureCauseConsumer(Consumer<Throwable> externalFailureCauseConsumer) {
this.externalFailureCauseConsumer = Optional.of(externalFailureCauseConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public int numKeyedStateEntries() {
}
}

public void endAllInput() throws Exception {
public void endAllInputs() throws Exception {
TwoInputStreamOperator<IN1, IN2, OUT> op = (TwoInputStreamOperator<IN1, IN2, OUT>) operator;
if (op instanceof BoundedMultiInput) {
((BoundedMultiInput) op).endInput(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ public class StreamingDeltaJoinOperator

private final boolean[] isInputEnded;

// ---------------------------- Metrics -----------------------------------
private final transient AtomicInteger totalInflightNum = new AtomicInteger(0);

// ---------------------------- Metrics -----------------------------------

private final transient AtomicLong asyncIOTime = new AtomicLong(Long.MIN_VALUE);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -71,6 +72,7 @@

import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test class for {@link StreamingDeltaJoinOperator}. */
public class StreamingDeltaJoinOperatorTest {
Expand Down Expand Up @@ -155,12 +157,22 @@ VarCharType.STRING_TYPE, new IntType(), new BooleanType()

private RowDataHarnessAssertor assertor;

private Optional<Throwable> latestException = Optional.empty();

@BeforeEach
public void beforeEach() throws Exception {
testHarness = createDeltaJoinOperatorTestHarness();
testHarness.setup();
testHarness.open();
StreamingDeltaJoinOperator operator = unwrapOperator(testHarness);
// set external failure cause consumer to prevent hang
testHarness
.getEnvironment()
.setExternalFailureCauseConsumer(
error -> {
latestException = Optional.of(error);
// DO NOT throw exception up again to avoid hang
});
operator.setAsyncExecutionController(
new MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController()));
prepareOperatorRuntimeInfo(operator);
Expand Down Expand Up @@ -199,6 +211,8 @@ public void afterEach() throws Exception {
testHarness.close();
leftTableCurrentData.clear();
rightTableCurrentData.clear();
latestException = Optional.empty();
MyAsyncFunction.clearExpectedThrownException();
}

@Test
Expand Down Expand Up @@ -232,7 +246,7 @@ void testJoinBothAppendOnlyTables() throws Exception {
testHarness.processElement2(rightRecord4);
testHarness.processElement2(rightRecord5);

testHarness.endAllInput();
waitAllDataProcessed();

final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

Expand Down Expand Up @@ -310,7 +324,7 @@ void testBlockingWithSameJoinKey() throws Exception {

MyAsyncFunction.release();

testHarness.endAllInput();
waitAllDataProcessed();
final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(insertRecord(100, true, "jklk1", "jklk1", 300, true));
expectedOutput.add(insertRecord(100, false, "jklk2", "jklk2", 300, false));
Expand Down Expand Up @@ -366,7 +380,7 @@ void testTableDataVisibleBeforeJoin() throws Exception {
testHarness.processElement1(leftRecord3);
testHarness.processElement2(rightRecord3);

testHarness.endAllInput();
waitAllDataProcessed();

final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

Expand Down Expand Up @@ -434,9 +448,12 @@ void testCheckpointAndRestore() throws Exception {

// checkpointing
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);

// release async function to avoid timeout when closing
MyAsyncFunction.release();
testHarness.close();
assertThat(testHarness.getOutput()).isEmpty();

MyAsyncFunction.block();
// restoring
testHarness = createDeltaJoinOperatorTestHarness();

Expand All @@ -446,6 +463,7 @@ void testCheckpointAndRestore() throws Exception {
operator.setAsyncExecutionController(
new MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController()));

latestException = Optional.empty();
testHarness.initializeState(snapshot);

testHarness.open();
Expand All @@ -462,7 +480,7 @@ void testCheckpointAndRestore() throws Exception {

MyAsyncFunction.release();

testHarness.endAllInput();
waitAllDataProcessed();
final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(insertRecord(100, true, "jklk1", "jklk1", 300, true));
expectedOutput.add(insertRecord(200, true, "jklk1", "jklk1", 300, true));
Expand Down Expand Up @@ -504,7 +522,7 @@ void testClearLegacyStateWhenCheckpointing() throws Exception {
assertThat(testHarness.numKeyedStateEntries()).isEqualTo(2);

MyAsyncFunction.release();
testHarness.endAllInput();
waitAllDataProcessed();

MyAsyncFunction.block();

Expand All @@ -515,7 +533,7 @@ void testClearLegacyStateWhenCheckpointing() throws Exception {
assertThat(testHarness.numKeyedStateEntries()).isEqualTo(1);

MyAsyncFunction.release();
testHarness.endAllInput();
waitAllDataProcessed();

testHarness.snapshot(2L, 0L);
assertThat(testHarness.numKeyedStateEntries()).isEqualTo(0);
Expand All @@ -529,6 +547,33 @@ void testClearLegacyStateWhenCheckpointing() throws Exception {
"result mismatch", expectedOutput, testHarness.getOutput());
}

@Test
void testMeetExceptionWhenLookup() throws Exception {
Throwable expectedException = new IllegalStateException("Mock to fail");
MyAsyncFunction.setExpectedThrownException(expectedException);

StreamRecord<RowData> record = insertRecord(100, true, "jklk1");
testHarness.processElement1(record);

// IllegalStateException(Failed to wait all data processed)
// +- Exception(Could not complete the stream element ...)
// +- RuntimeException(Failed to lookup table)
// +- Actual Exception
assertThatThrownBy(this::waitAllDataProcessed)
.cause()
.cause()
.cause()
.isEqualTo(expectedException);
}

private void waitAllDataProcessed() throws Exception {
testHarness.endAllInputs();
if (latestException.isPresent()) {
throw new IllegalStateException(
"Failed to wait all data processed", latestException.get());
}
}

private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData>
createDeltaJoinOperatorTestHarness() throws Exception {
TaskMailbox mailbox = new TaskMailboxImpl();
Expand Down Expand Up @@ -639,9 +684,13 @@ private static void insertTableData(StreamRecord<RowData> record, boolean insert
RowData rowData = record.getValue();
try {
if (insertLeftTable) {
leftTableCurrentData.add(rowData);
synchronized (leftTableCurrentData) {
leftTableCurrentData.add(rowData);
}
} else {
rightTableCurrentData.add(rowData);
synchronized (rightTableCurrentData) {
rightTableCurrentData.add(rowData);
}
}
} catch (Exception e) {
throw new IllegalStateException("Failed to insert table data", e);
Expand All @@ -664,6 +713,8 @@ public static class MyAsyncFunction extends RichAsyncFunction<RowData, Object> {

private static final AtomicInteger rightInvokeCount = new AtomicInteger(0);

private static Optional<Throwable> expectedThrownException = Optional.empty();

// ===== runtime info =====
private Boolean treatRightAsLookupTable;

Expand All @@ -679,11 +730,23 @@ public static void release() {
Objects.requireNonNull(lock).countDown();
}

public static void setExpectedThrownException(Throwable t) {
expectedThrownException = Optional.of(t);
}

public static void clearExpectedThrownException() {
expectedThrownException = Optional.empty();
}

@Override
public void asyncInvoke(final RowData input, final ResultFuture<Object> resultFuture) {
executorService.submit(
() -> {
try {
if (expectedThrownException.isPresent()) {
throw expectedThrownException.get();
}

if (lock != null) {
lock.await();
}
Expand All @@ -692,13 +755,17 @@ public void asyncInvoke(final RowData input, final ResultFuture<Object> resultFu
RowDataKeySelector streamSideJoinKeySelector;
RowDataKeySelector lookupSideJoinKeySelector;
if (Objects.requireNonNull(treatRightAsLookupTable)) {
lookupTableData = new LinkedList<>(rightTableCurrentData);
synchronized (rightTableCurrentData) {
lookupTableData = new LinkedList<>(rightTableCurrentData);
}

streamSideJoinKeySelector = leftJoinKeySelector.copy();
lookupSideJoinKeySelector = rightJoinKeySelector.copy();
leftInvokeCount.incrementAndGet();
} else {
lookupTableData = new LinkedList<>(leftTableCurrentData);
synchronized (leftTableCurrentData) {
lookupTableData = new LinkedList<>(leftTableCurrentData);
}

streamSideJoinKeySelector = rightJoinKeySelector.copy();
lookupSideJoinKeySelector = leftJoinKeySelector.copy();
Expand All @@ -715,7 +782,7 @@ public void asyncInvoke(final RowData input, final ResultFuture<Object> resultFu
}

resultFuture.complete(results);
} catch (Exception e) {
} catch (Throwable e) {
resultFuture.completeExceptionally(
new RuntimeException("Failed to look up table", e));
}
Expand Down