Skip to content

Commit

Permalink
Support tracing for async producing, batch sync consuming, and batch …
Browse files Browse the repository at this point in the history
…async consuming in rocketMQ-client-java-5.x-plugin (#665)
  • Loading branch information
CzyerChen authored Jan 21, 2024
1 parent d5b99f9 commit bbb177a
Show file tree
Hide file tree
Showing 19 changed files with 1,163 additions and 131 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Release Notes.
* Fix re-transform bug when plugin enhanced class proxy parent method.
* Fix error HTTP status codes not recording as SLA failures in Vert.x plugins.
* Support for HttpExchange request tracing
* Support tracing for async producing, batch sync consuming, and batch async consuming in rocketMQ-client-java-5.x-plugin.

#### Documentation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
Expand All @@ -37,6 +38,7 @@
public class MessageListenerInterceptor implements InstanceMethodsAroundInterceptor {

public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Expand All @@ -47,7 +49,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + messageView.getTopic()
+ "/Consumer", contextCarrier);
Tags.MQ_TOPIC.set(span, messageView.getTopic());

span.tag(MQ_MESSAGE_ID, messageView.getMessageId().toString());
Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
if (skyWalkingDynamicField != null) {
ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) skyWalkingDynamicField;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;

import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.util.StringUtil;

/**
* {@link MessageSendAsyncInterceptor} create exit span when the method {@link org.apache.rocketmq.client.java.impl.producer.ProducerImpl#sendAsync(org.apache.rocketmq.client.apis.message.Message)}
* execute
*/
public class MessageSendAsyncInterceptor implements InstanceMethodsAroundInterceptor {

public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
public static final StringTag MQ_MESSAGE_KEYS = new StringTag("mq.message.keys");
public static final StringTag MQ_MESSAGE_TAGS = new StringTag("mq.message.tags");

@Override
public void beforeMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Message message = (Message) allArguments[0];
ClientImpl producerImpl = (ClientImpl) objInst;

ContextCarrier contextCarrier = new ContextCarrier();
String namingServiceAddress = producerImpl.getClientConfiguration().getEndpoints();
AbstractSpan span = ContextManager.createExitSpan(
buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, namingServiceAddress);
Tags.MQ_TOPIC.set(span, message.getTopic());
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
Collection<String> keys = message.getKeys();
if (!CollectionUtil.isEmpty(keys)) {
span.tag(MQ_MESSAGE_KEYS, String.join(",", keys));
}
}
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
Optional<String> tag = message.getTag();
tag.ifPresent(s -> span.tag(MQ_MESSAGE_TAGS, s));
}

contextCarrier.extensionInjector().injectSendingTimestamp();
SpanLayer.asMQ(span);

Map<String, String> properties = message.getProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
properties.put(next.getHeadKey(), next.getHeadValue());
}
}

MessageBuilder messageBuilder = new MessageBuilderImpl();
messageBuilder.setTopic(message.getTopic());
if (message.getTag().isPresent()) {
messageBuilder.setTag(message.getTag().get());
}
messageBuilder.setKeys(message.getKeys().toArray(new String[0]));
if (message.getMessageGroup().isPresent()) {
messageBuilder.setMessageGroup(message.getMessageGroup().get());
}

byte[] body = new byte[message.getBody().limit()];
message.getBody().get(body);
messageBuilder.setBody(body);
if (message.getDeliveryTimestamp().isPresent()) {
messageBuilder.setDeliveryTimestamp(message.getDeliveryTimestamp().get());
}

properties.forEach(messageBuilder::addProperty);
allArguments[0] = messageBuilder.build();
}

@Override
public Object afterMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Object ret) throws Throwable {
CompletableFuture<SendReceipt> future = (CompletableFuture<SendReceipt>) ret;
AbstractSpan span = ContextManager.activeSpan();
span.prepareForAsync();
ContextManager.stopSpan();
return future.whenCompleteAsync((sendReceipt, throwable) -> {
if (null != throwable) {
span.log(throwable);
span.errorOccurred();
span.asyncFinish();
return;
}
if (sendReceipt == null || sendReceipt.getMessageId() == null) {
span.asyncFinish();
return;
}
span.tag(MQ_MESSAGE_ID, sendReceipt.getMessageId().toString());
span.asyncFinish();
});
}

@Override
public void handleMethodException(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Throwable t) {
ContextManager.activeSpan().log(t);
}

private String buildOperationName(String topicName) {
return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
Expand All @@ -50,6 +51,9 @@
public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {

public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
public static final StringTag MQ_MESSAGE_KEYS = new StringTag("mq.message.keys");
public static final StringTag MQ_MESSAGE_TAGS = new StringTag("mq.message.tags");

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
Expand All @@ -62,13 +66,17 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, namingServiceAddress);
Tags.MQ_TOPIC.set(span, message.getTopic());
Collection<String> keys = message.getKeys();
if (!CollectionUtil.isEmpty(keys)) {
span.tag(Tags.ofKey("mq.message.keys"), keys.stream().collect(Collectors.joining(",")));
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
Collection<String> keys = message.getKeys();
if (!CollectionUtil.isEmpty(keys)) {
span.tag(MQ_MESSAGE_KEYS, keys.stream().collect(Collectors.joining(",")));
}
}
Optional<String> tag = message.getTag();
if (tag.isPresent()) {
span.tag(Tags.ofKey("mq.message.tags"), tag.get());
if (RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
Optional<String> tag = message.getTag();
if (tag.isPresent()) {
span.tag(MQ_MESSAGE_TAGS, tag.get());
}
}

contextCarrier.extensionInjector().injectSendingTimestamp();
Expand Down Expand Up @@ -108,7 +116,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
SendReceipt sendReceipt = (SendReceipt) ret;
if (sendReceipt != null && sendReceipt.getMessageId() != null) {
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.tag(Tags.ofKey("mq.message.id"), sendReceipt.getMessageId().toString());
activeSpan.tag(MQ_MESSAGE_ID, sendReceipt.getMessageId().toString());
}
ContextManager.stopSpan();
return ret;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;

import org.apache.skywalking.apm.agent.core.boot.PluginConfig;

public class RocketMqClientJavaPluginConfig {
public static class Plugin {
@PluginConfig(root = RocketMqClientJavaPluginConfig.class)
public static class Rocketmqclient {
/**
* This config item controls that whether the RocketMqClientJava plugin should collect the keys of the message.
*/
public static boolean COLLECT_MESSAGE_KEYS = false;
/**
* This config item controls that whether the RocketMqClientJava plugin should collect the tags of the message.
*/
public static boolean COLLECT_MESSAGE_TAGS = false;
}
}
}
Loading

0 comments on commit bbb177a

Please sign in to comment.