Skip to content

Commit

Permalink
[Feature] Support for tracking in spring gateway versions 4.1.2 and a…
Browse files Browse the repository at this point in the history
…bove #12925
  • Loading branch information
yuqianwei committed Jan 10, 2025
1 parent b358267 commit d037668
Show file tree
Hide file tree
Showing 48 changed files with 3,536 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/plugins-jdk17-test.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
- spring-6.x-scenario
- resteasy-6.x-scenario
- gateway-4.x-scenario
- gateway-4.1.2x-scenario
- httpexchange-scenario
- activemq-artemis-2.x-scenario
- c3p0-0.9.0.x-0.9.1.x-scenario
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Release Notes.
field as new propagation mechanism, to better support async scenarios.
* Add Caffeine plugin as optional.
* Add Undertow 2.1.7.final+ worker thread pool metrics.
* Support for tracking in spring gateway versions 4.1.2 and above.

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/222?closed=1)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v412x;

import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;
import org.springframework.web.server.adapter.DefaultServerWebExchange;

import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.SPRING_CLOUD_GATEWAY;

public class GatewayFilterV412Interceptor implements InstanceMethodsAroundInterceptor {

private static final ThreadLocal<AtomicInteger> STACK_DEEP = ThreadLocal.withInitial(() -> new AtomicInteger(0));

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
if (isEntry()) {
ServerWebExchange exchange = (ServerWebExchange) allArguments[0];

EnhancedInstance enhancedInstance = getInstance(exchange);

AbstractSpan span = ContextManager.createLocalSpan("SpringCloudGateway/GatewayFilter");
if (enhancedInstance != null && enhancedInstance.getSkyWalkingDynamicField() != null) {
ContextManager.continued((ContextSnapshot) enhancedInstance.getSkyWalkingDynamicField());
}
span.setComponent(SPRING_CLOUD_GATEWAY);
}
}

public static EnhancedInstance getInstance(Object o) {
EnhancedInstance instance = null;
if (o instanceof DefaultServerWebExchange) {
instance = (EnhancedInstance) o;
} else if (o instanceof ServerWebExchangeDecorator) {
ServerWebExchange delegate = ((ServerWebExchangeDecorator) o).getDelegate();
return getInstance(delegate);
}
return instance;
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
if (isExit()) {
if (ContextManager.isActive()) {
ContextManager.stopSpan();
}
}
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().log(t);
}

private boolean isEntry() {
return STACK_DEEP.get().getAndIncrement() == 0;
}

private boolean isExit() {
boolean isExit = STACK_DEEP.get().decrementAndGet() == 0;
if (isExit) {
STACK_DEEP.remove();
}
return isExit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v412x;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v4x.define.EnhanceObjectCache;

import java.lang.reflect.Method;

public class HttpClientConnectDuplicateV412Interceptor implements InstanceMethodsAroundInterceptor {

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {

}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
if (objInst.getSkyWalkingDynamicField() != null) {
EnhanceObjectCache enhanceObjectCache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
if (ret instanceof EnhancedInstance) {
EnhancedInstance retEnhancedInstance = (EnhancedInstance) ret;
retEnhancedInstance.setSkyWalkingDynamicField(enhanceObjectCache);
}
}
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v412x;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v4x.define.EnhanceObjectCache;

import java.lang.reflect.Method;

public class HttpClientConnectRequestV412Interceptor implements InstanceMethodsAroundInterceptor {

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {

}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
if (objInst.getSkyWalkingDynamicField() != null) {
if (ret instanceof EnhancedInstance) {
EnhancedInstance retEnhancedInstance = (EnhancedInstance) ret;
Object retSkyWalkingDynamicField = retEnhancedInstance.getSkyWalkingDynamicField();
if (retSkyWalkingDynamicField != null) {
EnhanceObjectCache retEnhanceObjectCache = (EnhanceObjectCache) retSkyWalkingDynamicField;
EnhanceObjectCache objEnhanceObjectCache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
retEnhanceObjectCache.setContextSnapshot(objEnhanceObjectCache.getContextSnapshot());
}
}
}
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v412x;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v4x.define.EnhanceObjectCache;
import reactor.netty.http.client.HttpClientConfig;

/**
* Intercept the constructor and inject {@link EnhanceObjectCache}.
* <p>
* The first constructor argument is {@link HttpClientConfig} class instance which can get the
* request uri string.
*/
public class HttpClientFinalizerConstructorV412Interceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
final HttpClientConfig httpClientConfig = (HttpClientConfig) allArguments[0];
if (httpClientConfig == null) {
return;
}
final EnhanceObjectCache enhanceObjectCache = new EnhanceObjectCache();
enhanceObjectCache.setUrl(httpClientConfig.uri());
objInst.setSkyWalkingDynamicField(enhanceObjectCache);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v412x;

import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v4x.define.EnhanceObjectCache;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClientResponse;

import java.lang.reflect.Method;
import java.util.function.BiFunction;

/**
* This class intercept <code>responseConnection</code> method.
* <p>
* After downstream service response, finish the span in the {@link EnhanceObjectCache}.
*/
public class HttpClientFinalizerResponseConnectionV412Interceptor implements InstanceMethodsAroundInterceptor {

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) {
BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher> finalReceiver = (BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher>) allArguments[0];
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
allArguments[0] = (BiFunction<HttpClientResponse, Connection, Publisher>) (response, connection) -> {
Publisher publisher = finalReceiver.apply(response, connection);
if (cache == null) {
return publisher;
}
// receive the response.
if (cache.getSpan() != null) {
if (response.status().code() >= HttpResponseStatus.BAD_REQUEST.code()) {
cache.getSpan().errorOccurred();
}
Tags.HTTP_RESPONSE_STATUS_CODE.set(cache.getSpan(), response.status().code());
}

return publisher;
};
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) {
Flux<?> responseFlux = (Flux<?>) ret;

responseFlux = responseFlux
.doOnError(e -> {
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
if (cache == null) {
return;
}

if (cache.getSpan() != null) {
cache.getSpan().errorOccurred();
cache.getSpan().log(e);
}
})
.doFinally(signalType -> {
EnhanceObjectCache cache = (EnhanceObjectCache) objInst.getSkyWalkingDynamicField();
if (cache == null) {
return;
}
// do finally. Finish the span.
if (cache.getSpan() != null) {
if (signalType == SignalType.CANCEL) {
cache.getSpan().errorOccurred();
}
cache.getSpan().asyncFinish();
}

if (cache.getSpan1() != null) {
cache.getSpan1().asyncFinish();
}
});

return responseFlux;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {

}
}
Loading

0 comments on commit d037668

Please sign in to comment.