Skip to content

Commit a5d8ef1

Browse files
committed
Implement 7.3.6 and 7.3.9 - restriction on Message object
1 parent dbb78fb commit a5d8ef1

File tree

9 files changed

+380
-61
lines changed

9 files changed

+380
-61
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,21 @@ protected void init() throws OpenDataException {
159159
public Map<String, Object> getFields(Object o) throws OpenDataException {
160160
ActiveMQMessage m = (ActiveMQMessage)o;
161161
Map<String, Object> rc = super.getFields(o);
162-
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
163-
rc.put("JMSDestination", "" + m.getJMSDestination());
164-
rc.put("JMSMessageID", m.getJMSMessageID());
165-
rc.put("JMSReplyTo",toString(m.getJMSReplyTo()));
166-
rc.put("JMSType", m.getJMSType());
167-
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
168-
rc.put("JMSExpiration", m.getJMSExpiration());
169-
rc.put("JMSPriority", m.getJMSPriority());
170-
rc.put("JMSRedelivered", m.getJMSRedelivered());
171-
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
162+
try {
163+
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
164+
rc.put("JMSDestination", "" + m.getJMSDestination());
165+
rc.put("JMSMessageID", m.getJMSMessageID());
166+
rc.put("JMSReplyTo",toString(m.getJMSReplyTo()));
167+
rc.put("JMSType", m.getJMSType());
168+
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
169+
rc.put("JMSExpiration", m.getJMSExpiration());
170+
rc.put("JMSPriority", m.getJMSPriority());
171+
rc.put("JMSRedelivered", m.getJMSRedelivered());
172+
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
173+
} catch (JMSException e) {
174+
throw new OpenDataException(e.getMessage());
175+
}
176+
172177
rc.put(CompositeDataConstants.JMSXGROUP_ID, m.getGroupID());
173178
rc.put(CompositeDataConstants.JMSXGROUP_SEQ, m.getGroupSequence());
174179
rc.put(CompositeDataConstants.JMSXUSER_ID, m.getUserID());

activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2136,7 +2136,12 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
21362136
msg.setBrokerPath(null);
21372137

21382138
msg.setTransactionId(txid);
2139-
if (connection.isCopyMessageOnSend()) {
2139+
final ActiveMQMessage originalMessage = msg;
2140+
if (connection.isCopyMessageOnSend() || completionListener != null) {
2141+
// We need to make the message inaccessible per Jakarta Messaging 3.1 - 7.3.6 & 7.3.9
2142+
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#restrictions-on-the-use-of-the-message-object
2143+
// To do that, we need to set a flag in the message referenced in sender thread. To avoid making
2144+
// the message inaccessible once received on the server side (even tho the WireFormat marshaller doesn't marshal that field, so it shouldn't matter)
21402145
msg = (ActiveMQMessage)msg.copy();
21412146
}
21422147
msg.setConnection(connection);
@@ -2167,10 +2172,12 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
21672172
// Make the Message object unaccessible and unmutable
21682173
// per Jakarta Messaging 3.1 spec section 7.3.9 and 7.3.6
21692174
numIncompletedAsyncSend.doIncrement();
2175+
originalMessage.setMessageAccessible(false);
21702176
wrapperCompletionListener = new CompletionListener() {
21712177
@Override
21722178
public void onCompletion(Message message) {
21732179
try {
2180+
originalMessage.setMessageAccessible(true);
21742181
inCompletionListenerCallback.set(true);
21752182
producerInCompletionListenerCallback.set(true);
21762183
numIncompletedAsyncSend.doDecrement();
@@ -2188,6 +2195,7 @@ public void onCompletion(Message message) {
21882195
@Override
21892196
public void onException(Message message, Exception e) {
21902197
try {
2198+
originalMessage.setMessageAccessible(true);
21912199
inCompletionListenerCallback.set(true);
21922200
completionListener.onException(message, e);
21932201
} finally {

0 commit comments

Comments
 (0)