Skip to content

Commit

Permalink
Add thrift plugin support thrift TMultiplexedProcessor. (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
zifeihan authored Sep 14, 2021
1 parent a27cf2a commit 0e45ed0
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 7 deletions.
6 changes: 3 additions & 3 deletions .github/PULL_REQUEST_TEMPLATE
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

<!-- ==== 🔌 Remove this line WHEN AND ONLY WHEN you're adding a new plugin, follow the checklist 👇 ====
### Add an agent plugin to support <framework name>
- [ ] Add a test case for the new plugin, refer to [the doc](https://github.com/apache/skywalking/blob/master/docs/en/guides/Plugin-test.md)
- [ ] Add a component id in [the component-libraries.yml](https://github.com/apache/skywalking/blob/master/oap-server/server-bootstrap/src/main/resources/component-libraries.yml)
- [ ] Add a test case for the new plugin, refer to [the doc](https://github.com/apache/skywalking-java/blob/main/docs/en/setup/service-agent/java-agent/Plugin-test.md)
- [ ] Add a component id in [the component-libraries.yml](https://github.com/apache/skywalking/blob/master/oap-server/server-starter/src/main/resources/component-libraries.yml)
- [ ] Add a logo in [the UI repo](https://github.com/apache/skywalking-rocketbot-ui/tree/master/src/views/components/topology/assets)
==== 🔌 Remove this line WHEN AND ONLY WHEN you're adding a new plugin, follow the checklist 👆 ==== -->

<!-- ==== 📈 Remove this line WHEN AND ONLY WHEN you're improving the performance, follow the checklist 👇 ====
### Improve the performance of <class or module or ...>
- [ ] Add a benchmark for the improvement, refer to [the existing ones](https://github.com/apache/skywalking/blob/master/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/LinkedArrayBenchmark.java)
- [ ] Add a benchmark for the improvement, refer to [the existing ones](https://github.com/apache/skywalking-java/blob/main/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/LinkedArrayBenchmark.java)
- [ ] The benchmark result.
```text
<Paste the benchmark results here>
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Release Notes.
* Support mTLS for gRPC channel.
* fix the bug that plugin record wrong time elapse for lettuce plugin
* fix the bug that the wrong db.instance value displayed on Skywalking-UI when existing multi-database-instance on same host port pair.
* Add thrift plugin support thrift TMultiplexedProcessor.

#### Documentation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ public void beforeMethod(EnhancedInstance objInst,
Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ServerInProtocolWrapper in = (ServerInProtocolWrapper) allArguments[0];
in.initial(new Context(processMap));
if (allArguments[0] instanceof ServerInProtocolWrapper) {
ServerInProtocolWrapper in = (ServerInProtocolWrapper) allArguments[0];
in.initial(new Context(processMap));
}
}

@Override
Expand All @@ -61,7 +63,9 @@ public Object afterMethod(EnhancedInstance objInst,
Object[] allArguments,
Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
if (allArguments[0] instanceof ServerInProtocolWrapper) {
ContextManager.stopSpan();
}
return ret;
}

Expand All @@ -71,6 +75,8 @@ public void handleMethodException(EnhancedInstance objInst,
Object[] allArguments,
Class<?>[] argumentsTypes,
Throwable t) {
ContextManager.activeSpan().log(t);
if (allArguments[0] instanceof ServerInProtocolWrapper) {
ContextManager.activeSpan().log(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.thrift;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.thrift.wrapper.Context;
import org.apache.skywalking.apm.plugin.thrift.wrapper.ServerInProtocolWrapper;
import org.apache.thrift.ProcessFunction;
import org.apache.thrift.TBaseAsyncProcessor;

/**
* To wrap the ProcessFunction for getting arguments of method.
*
* @see TBaseAsyncProcessor
* @see TBaseProcessorInterceptor
* @see TMultiplexedProcessorInterceptor
*/
public class TMultiplexedProcessorInterceptor implements InstanceConstructorInterceptor, InstanceMethodsAroundInterceptor {
private Map<String, ProcessFunction> processMap = new HashMap<>();

private static final ILog LOGGER = LogManager.getLogger(TMultiplexedProcessorInterceptor.class);

@Override
public void onConstruct(final EnhancedInstance objInst, final Object[] allArguments) throws Throwable {
objInst.setSkyWalkingDynamicField(processMap);
}

@Override
public void beforeMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {

ServerInProtocolWrapper in = (ServerInProtocolWrapper) allArguments[0];
in.initial(new Context(processMap));
}

@Override
public Object afterMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Throwable t) {
ContextManager.activeSpan().log(t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.thrift;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.thrift.ProcessFunction;
import org.apache.thrift.TBaseAsyncProcessor;
import org.apache.thrift.TBaseProcessor;
import org.apache.thrift.TProcessor;

public class TMultiplexedProcessorRegisterDefaultInterceptor implements InstanceMethodsAroundInterceptor {

private static final ILog LOGGER = LogManager.getLogger(TMultiplexedProcessorRegisterDefaultInterceptor.class);

@Override
public void beforeMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {

Map<String, ProcessFunction> processMap = (Map<String, ProcessFunction>) objInst.getSkyWalkingDynamicField();
TProcessor processor = (TProcessor) allArguments[0];
processMap.putAll(getProcessMap(processor));
}

@Override
public Object afterMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Throwable t) {
}

private Map<String, ProcessFunction> getProcessMap(TProcessor processor) {
Map<String, ProcessFunction> hashMap = new HashMap<>();
if (processor instanceof TBaseProcessor) {
Map<String, ProcessFunction> processMapView = ((TBaseProcessor) processor).getProcessMapView();
hashMap.putAll(processMapView);
} else if (processor instanceof TBaseAsyncProcessor) {
Map<String, ProcessFunction> processMapView = ((TBaseProcessor) processor).getProcessMapView();
hashMap.putAll(processMapView);
} else {
LOGGER.warn("Not support this processor:{}", processor.getClass().getName());
}
return hashMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.thrift;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.thrift.ProcessFunction;
import org.apache.thrift.TBaseAsyncProcessor;
import org.apache.thrift.TBaseProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TMultiplexedProtocol;

public class TMultiplexedProcessorRegisterInterceptor implements InstanceMethodsAroundInterceptor {

private static final ILog LOGGER = LogManager.getLogger(TMultiplexedProcessorRegisterInterceptor.class);

@Override
public void beforeMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {

Map<String, ProcessFunction> processMap = (Map<String, ProcessFunction>) objInst.getSkyWalkingDynamicField();
String serviceName = (String) allArguments[0];
TProcessor processor = (TProcessor) allArguments[1];
processMap.putAll(getProcessMap(serviceName, processor));
}

@Override
public Object afterMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Object ret) throws Throwable {
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Throwable t) {
}

private Map<String, ProcessFunction> getProcessMap(String serviceName, TProcessor processor) {
Map<String, ProcessFunction> hashMap = new HashMap<>();
if (processor instanceof TBaseProcessor) {
Map<String, ProcessFunction> processMapView = ((TBaseProcessor) processor).getProcessMapView();
processMapView.forEach((k, v) -> hashMap.put(serviceName + TMultiplexedProtocol.SEPARATOR + k, v));
} else if (processor instanceof TBaseAsyncProcessor) {
Map<String, ProcessFunction> processMapView = ((TBaseAsyncProcessor) processor).getProcessMapView();
processMapView.forEach((k, v) -> hashMap.put(serviceName + TMultiplexedProtocol.SEPARATOR + k, v));
} else {
LOGGER.warn("Not support this processor:{}", processor.getClass().getName());
}
return hashMap;
}
}
Loading

0 comments on commit 0e45ed0

Please sign in to comment.