Skip to content

Commit 2eb0d66

Browse files
authored
AMQ-9747 - Handle IOExceptionHandler thrown exceptions in KahaDB (#1474)
Update the KahaDB scheduled tasks to catch any runtime exceptions thrown by the configured IOExceptionHandler. This will prevent the tasks from being killed and no longer running if the IOExceptionHandler does not shut down the broker.
1 parent d8e9000 commit 2eb0d66

File tree

2 files changed

+97
-7
lines changed

2 files changed

+97
-7
lines changed

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,10 @@ public void run() {
448448
}
449449
} catch (IOException ioe) {
450450
LOG.error("Checkpoint failed", ioe);
451-
brokerService.handleIOException(ioe);
451+
handleIOException("CheckpointRunner", ioe);
452452
} catch (Throwable e) {
453453
LOG.error("Checkpoint failed", e);
454-
brokerService.handleIOException(IOExceptionSupport.create(e));
454+
handleIOException("CheckpointRunner", IOExceptionSupport.create(e));
455455
}
456456
}
457457
}
@@ -2120,10 +2120,10 @@ public void run() {
21202120
forwarded = true;
21212121
} catch (IOException ioe) {
21222122
LOG.error("Forwarding of acks failed", ioe);
2123-
brokerService.handleIOException(ioe);
2123+
handleIOException("AckCompactionRunner", ioe);
21242124
} catch (Throwable e) {
21252125
LOG.error("Forwarding of acks failed", e);
2126-
brokerService.handleIOException(IOExceptionSupport.create(e));
2126+
handleIOException("AckCompactionRunner", IOExceptionSupport.create(e));
21272127
}
21282128
} finally {
21292129
checkpointLock.readLock().unlock();
@@ -2136,10 +2136,10 @@ public void run() {
21362136
}
21372137
} catch (IOException ioe) {
21382138
LOG.error("Checkpoint failed", ioe);
2139-
brokerService.handleIOException(ioe);
2139+
handleIOException("AckCompactionRunner", ioe);
21402140
} catch (Throwable e) {
21412141
LOG.error("Checkpoint failed", e);
2142-
brokerService.handleIOException(IOExceptionSupport.create(e));
2142+
handleIOException("AckCompactionRunner", IOExceptionSupport.create(e));
21432143
}
21442144
}
21452145
}
@@ -4274,4 +4274,27 @@ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, Clas
42744274
}
42754275

42764276
}
4277+
4278+
/*
4279+
* Execute the configured IOExceptionHandler when an IOException is thrown during
4280+
* task execution and handle any runtime exceptions that the handler itself might throw.
4281+
*
4282+
* By default, the DefaultIOExceptionHandler will stop the broker when handling an IOException,
4283+
* however, if DefaultIOExceptionHandler is configured with startStopConnectors to be true
4284+
* it will throw a SuppressReplyException and not stop the broker. It's also possible another
4285+
* custom implementation of IOExceptionHandler could throw a runtime exception.
4286+
*
4287+
* This method will now handle and log those runtime exceptions so that the task will not
4288+
* die and will continue to execute future iterations if the broker is not shut down.
4289+
*/
4290+
private void handleIOException(String taskName, IOException ioe) {
4291+
try {
4292+
brokerService.handleIOException(ioe);
4293+
} catch (RuntimeException e) {
4294+
LOG.warn("IOException handler threw exception in task {} with "
4295+
+ "error: {}, continuing.", taskName,
4296+
e.getMessage());
4297+
LOG.debug(e.getMessage(), e);
4298+
}
4299+
}
42774300
}

activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
import jakarta.jms.MessageProducer;
2828
import jakarta.jms.Session;
2929

30+
import java.util.concurrent.atomic.AtomicInteger;
3031
import junit.framework.TestCase;
3132

3233
import org.apache.activemq.ActiveMQConnectionFactory;
3334
import org.apache.activemq.broker.BrokerService;
3435
import org.apache.activemq.command.ActiveMQQueue;
36+
import org.apache.activemq.util.DefaultIOExceptionHandler;
37+
import org.apache.activemq.util.IOExceptionHandler;
38+
import org.apache.activemq.util.Wait;
3539
import org.apache.logging.log4j.Level;
3640
import org.apache.logging.log4j.LogManager;
3741
import org.apache.logging.log4j.core.LogEvent;
@@ -236,6 +240,69 @@ public void append(LogEvent event) {
236240
assertFalse("Did not replay any records from the journal", didSomeRecovery.get());
237241
}
238242

243+
244+
/**
245+
* Test the checkpoint runner task continues to run if the configured
246+
* IOExceptionHandler throws a runtime exception while processing
247+
* the IOException it is handling and the broker is not shut down. This
248+
* could happen if using the DefaultIOExceptionHandler and startStopConnectors
249+
* is set to true, or if a user provides their own IOExceptionHandler that
250+
* throws an exception.
251+
*/
252+
public void testCheckpointExceptionKeepRunning() throws Exception {
253+
testCheckpointIOException(true);
254+
}
255+
256+
/**
257+
* Test the broker shuts down by when DefaultIOExceptionHandler
258+
* handles an IOException thrown by the checkpoint runner task. This is the
259+
* default behavior of the broker if not configured with a custom
260+
* IOExceptionHandler and startStopConnectors is false
261+
*/
262+
public void testCheckpointExceptionShutdown() throws Exception {
263+
testCheckpointIOException(false);
264+
}
265+
266+
private void testCheckpointIOException(boolean startStopConnectors) throws Exception {
267+
final AtomicInteger iterations = new AtomicInteger();
268+
// Create a store that always throws an IOException when checkpoint is called
269+
final KahaDBStore kaha = new KahaDBStore() {
270+
@Override
271+
protected void checkpointCleanup(boolean cleanup) throws IOException {
272+
iterations.incrementAndGet();
273+
throw new IOException("fail");
274+
}
275+
};
276+
kaha.setDirectory(new File("target/activemq-data/kahadb"));
277+
kaha.deleteAllMessages();
278+
// Set the checkpoint interval to be very short so we can quickly
279+
// check number of iterations
280+
kaha.setCheckpointInterval(100);
281+
282+
BrokerService broker = createBroker(kaha);
283+
DefaultIOExceptionHandler ioExceptionHandler = new DefaultIOExceptionHandler();
284+
ioExceptionHandler.setStopStartConnectors(startStopConnectors);
285+
broker.setIoExceptionHandler(ioExceptionHandler);
286+
broker.start();
287+
288+
try {
289+
if (startStopConnectors) {
290+
// If startStopConnectors is true, the task should continue with future iterations
291+
// as the SuppressReplyException that will be thrown is now handled so just verify
292+
// we see 10 iterations which should happen quickly
293+
assertTrue(Wait.waitFor(() -> iterations.get() == 10, 2000, 100));
294+
// broker should not be stopped
295+
assertFalse(broker.isStopped());
296+
} else {
297+
// If startStopConnectors is false, an IOException should shut down the broker
298+
// which is the normal behavior
299+
assertTrue(Wait.waitFor(broker::isStopped, 2000, 100));
300+
}
301+
} finally {
302+
broker.stop();
303+
}
304+
}
305+
239306
private void assertExistsAndDelete(File file) {
240307
assertTrue(file.exists());
241308
file.delete();
@@ -281,4 +348,4 @@ private String createContent(int i) {
281348
return sb.toString();
282349
}
283350

284-
}
351+
}

0 commit comments

Comments
 (0)