Skip to content

Commit

Permalink
[fix][artemis] Fix Artemis asynchronous send and acknowledgement conf…
Browse files Browse the repository at this point in the history
…iguration (#343)

* [fix][artemis] Fix Artemis confirmation windows size configuration

- set to 1MB which is a more sensible default value
- see https://activemq.apache.org/components/artemis/documentation/javadocs/javadoc-latest/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.html
  - configuring confirmation window is required for async message sending with Artemis

* [fix][artemis] Fix reporting of failed sends

* [fix][artemis] Use async sending and acknowledgements

* Reformat with spotless
  • Loading branch information
lhotari authored Oct 24, 2022
1 parent d7cc6c3 commit 7175eca
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ public void initialize(File configurationFile, StatsLogger statsLogger) throws I
log.info("ActiveMQ Artemis driver configuration: {}", writer.writeValueAsString(config));
try {
ServerLocator serverLocator = ActiveMQClient.createServerLocator(config.brokerAddress);
serverLocator.setConfirmationWindowSize(1000);
// confirmation window size is in bytes, set to 1MB
serverLocator.setConfirmationWindowSize(1024 * 1024);
// use asynchronous sending of messages where SendAcknowledgementHandler reports
// success/failure
serverLocator.setBlockOnDurableSend(false);
serverLocator.setBlockOnNonDurableSend(false);
// use async acknowledgement
serverLocator.setBlockOnAcknowledge(false);

sessionFactory = serverLocator.createSessionFactory();
session = sessionFactory.createSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;

public class ArtemisBenchmarkProducer implements BenchmarkProducer {

Expand Down Expand Up @@ -51,8 +53,16 @@ public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
try {
producer.send(
msg,
message -> {
future.complete(null);
new SendAcknowledgementHandler() {
@Override
public void sendAcknowledged(Message message) {
future.complete(null);
}

@Override
public void sendFailed(Message message, Exception e) {
future.completeExceptionally(e);
}
});
} catch (ActiveMQException e) {
future.completeExceptionally(e);
Expand Down

0 comments on commit 7175eca

Please sign in to comment.