Skip to content

Commit 6c13ecb

Browse files
committed
kafka working
1 parent ba32a8d commit 6c13ecb

File tree

7 files changed

+255
-10
lines changed

7 files changed

+255
-10
lines changed

instrumentation/spring/spring-boot-autoconfigure/build.gradle.kts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ dependencies {
122122
add("javaSpring4CompileOnly", files(sourceSets.main.get().output.classesDirs))
123123
// add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-starter-web:4.0.0")
124124
add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-starter-kafka:4.0.0")
125-
// add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-autoconfigure:4.0.0")
126-
// add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-jdbc:4.0.0")
125+
add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-autoconfigure:4.0.0")
126+
add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-jdbc:4.0.0")
127127
add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-starter-jdbc:4.0.0")
128128
add("javaSpring4CompileOnly", "org.springframework.boot:spring-boot-restclient:4.0.0")
129129
// add("javaSpring4CompileOnly", "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
@@ -202,14 +202,18 @@ testing {
202202
val testSpring4 by registering(JvmTestSuite::class) {
203203
dependencies {
204204
implementation(project())
205-
implementation("org.springframework.boot:spring-boot-starter-web:4.0.0")
205+
// implementation("org.springframework.boot:spring-boot-starter-web:4.0.0")
206+
implementation("org.springframework.boot:spring-boot-starter-jdbc:4.0.0")
206207
implementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
207-
implementation(project(":instrumentation:spring:spring-web:spring-web-3.1:library"))
208-
implementation(project(":instrumentation:spring:spring-webmvc:spring-webmvc-6.0:library"))
209-
implementation("jakarta.servlet:jakarta.servlet-api:6.1.0")
208+
implementation("org.springframework.boot:spring-boot-restclient:4.0.0")
209+
implementation("org.springframework.boot:spring-boot-starter-kafka:4.0.0")
210+
// implementation(project(":instrumentation:spring:spring-web:spring-web-3.1:library"))
211+
// implementation(project(":instrumentation:spring:spring-webmvc:spring-webmvc-6.0:library"))
212+
// implementation("jakarta.servlet:jakarta.servlet-api:6.1.0")
210213
implementation("org.springframework.boot:spring-boot-starter-test:4.0.0") {
211214
exclude("org.junit.vintage", "junit-vintage-engine")
212215
}
216+
runtimeOnly("com.h2database:h2:1.4.197")
213217
}
214218
}
215219

instrumentation/spring/spring-boot-autoconfigure/src/main/javaSpring4/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/jdbc/JdbcInstrumentationSpringBoot4AutoConfiguration.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.springframework.beans.factory.ObjectProvider;
1313
import org.springframework.boot.autoconfigure.AutoConfiguration;
1414
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
15+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
1516
import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
1617
import org.springframework.context.annotation.Bean;
1718
import org.springframework.context.annotation.Configuration;
@@ -24,6 +25,8 @@
2425
@AutoConfiguration(after = DataSourceAutoConfiguration.class)
2526
@ConditionalOnBean({DataSource.class})
2627
@Configuration(proxyBeanMethods = false)
28+
@ConditionalOnMissingClass(
29+
"org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration")
2730
public class JdbcInstrumentationSpringBoot4AutoConfiguration {
2831

2932
// For error prone
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka;
7+
8+
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
9+
import java.lang.reflect.Field;
10+
import java.util.function.Supplier;
11+
import javax.annotation.Nullable;
12+
import org.springframework.beans.factory.config.BeanPostProcessor;
13+
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
14+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
15+
import org.springframework.kafka.listener.BatchInterceptor;
16+
import org.springframework.kafka.listener.RecordInterceptor;
17+
18+
class ConcurrentKafkaListenerContainerFactorySpringBoot4PostProcessor implements BeanPostProcessor {
19+
20+
private final Supplier<SpringKafkaTelemetry> springKafkaTelemetry;
21+
22+
ConcurrentKafkaListenerContainerFactorySpringBoot4PostProcessor(
23+
Supplier<SpringKafkaTelemetry> springKafkaTelemetry) {
24+
this.springKafkaTelemetry = springKafkaTelemetry;
25+
}
26+
27+
@SuppressWarnings("unchecked") // we check the bean type before casting
28+
@Override
29+
public Object postProcessAfterInitialization(Object bean, String beanName) {
30+
if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) {
31+
return bean;
32+
}
33+
34+
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory =
35+
(ConcurrentKafkaListenerContainerFactory<Object, Object>) bean;
36+
SpringKafkaTelemetry springKafkaTelemetry = this.springKafkaTelemetry.get();
37+
38+
// use reflection to read existing values to avoid overwriting user configured interceptors
39+
BatchInterceptor<Object, Object> batchInterceptor =
40+
readField(listenerContainerFactory, "batchInterceptor", BatchInterceptor.class);
41+
RecordInterceptor<Object, Object> recordInterceptor =
42+
readField(listenerContainerFactory, "recordInterceptor", RecordInterceptor.class);
43+
listenerContainerFactory.setBatchInterceptor(
44+
springKafkaTelemetry.createBatchInterceptor(batchInterceptor));
45+
listenerContainerFactory.setRecordInterceptor(
46+
springKafkaTelemetry.createRecordInterceptor(recordInterceptor));
47+
48+
return listenerContainerFactory;
49+
}
50+
51+
@Nullable
52+
private static <T> T readField(Object container, String filedName, Class<T> fieldType) {
53+
try {
54+
Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField(filedName);
55+
field.setAccessible(true);
56+
return fieldType.cast(field.get(container));
57+
} catch (Exception exception) {
58+
return null;
59+
}
60+
}
61+
}

instrumentation/spring/spring-boot-autoconfigure/src/main/javaSpring4/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationSpringBoot4AutoConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ static SpringKafkaTelemetry getTelemetry(
6363
havingValue = "true",
6464
matchIfMissing = true)
6565
@ConditionalOnMissingBean
66-
static ConcurrentKafkaListenerContainerFactoryPostProcessor
66+
static ConcurrentKafkaListenerContainerFactorySpringBoot4PostProcessor
6767
otelKafkaListenerContainerFactoryBeanPostProcessor(
6868
ObjectProvider<OpenTelemetry> openTelemetryProvider,
6969
ObjectProvider<InstrumentationConfig> configProvider) {
70-
return new ConcurrentKafkaListenerContainerFactoryPostProcessor(
70+
return new ConcurrentKafkaListenerContainerFactorySpringBoot4PostProcessor(
7171
() -> getTelemetry(openTelemetryProvider, configProvider));
7272
}
7373
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.jdbc;
7+
8+
import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable;
9+
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_STATEMENT;
10+
import static org.assertj.core.api.Assertions.assertThat;
11+
12+
import io.opentelemetry.api.OpenTelemetry;
13+
import io.opentelemetry.instrumentation.api.incubator.config.internal.InstrumentationConfig;
14+
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.properties.ConfigPropertiesBridge;
15+
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
16+
import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties;
17+
import java.sql.Connection;
18+
import java.sql.Statement;
19+
import java.util.Collections;
20+
import javax.sql.DataSource;
21+
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.api.extension.RegisterExtension;
23+
import org.springframework.aop.framework.Advised;
24+
import org.springframework.aop.support.AopUtils;
25+
import org.springframework.boot.autoconfigure.AutoConfigurations;
26+
import org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration;
27+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
28+
29+
class JdbcInstrumentationAutoConfigurationTest {
30+
31+
@RegisterExtension
32+
static final LibraryInstrumentationExtension testing = LibraryInstrumentationExtension.create();
33+
34+
private final ApplicationContextRunner runner =
35+
new ApplicationContextRunner()
36+
.withBean(
37+
InstrumentationConfig.class,
38+
() ->
39+
new ConfigPropertiesBridge(
40+
DefaultConfigProperties.createFromMap(Collections.emptyMap())))
41+
.withConfiguration(
42+
AutoConfigurations.of(
43+
JdbcInstrumentationSpringBoot4AutoConfiguration.class,
44+
DataSourceAutoConfiguration.class))
45+
.withBean("openTelemetry", OpenTelemetry.class, testing::getOpenTelemetry);
46+
47+
@SuppressWarnings("deprecation") // using deprecated semconv
48+
@Test
49+
void statementSanitizerEnabledByDefault() {
50+
runner.run(
51+
context -> {
52+
DataSource dataSource = context.getBean(DataSource.class);
53+
54+
assertThat(AopUtils.isAopProxy(dataSource)).isTrue();
55+
assertThat(dataSource.getClass().getSimpleName()).isNotEqualTo("HikariDataSource");
56+
// unwrap the instrumented data source to get the original data source
57+
Object original = ((Advised) dataSource).getTargetSource().getTarget();
58+
assertThat(AopUtils.isAopProxy(original)).isFalse();
59+
assertThat(original.getClass().getSimpleName()).isEqualTo("HikariDataSource");
60+
61+
try (Connection connection = dataSource.getConnection()) {
62+
try (Statement statement = connection.createStatement()) {
63+
statement.execute("SELECT 1");
64+
}
65+
}
66+
67+
testing.waitAndAssertTraces(
68+
trace ->
69+
trace.hasSpansSatisfyingExactly(
70+
span -> span.hasAttribute(maybeStable(DB_STATEMENT), "SELECT ?")));
71+
});
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.web;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import io.opentelemetry.api.OpenTelemetry;
11+
import io.opentelemetry.instrumentation.api.incubator.config.internal.InstrumentationConfig;
12+
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.properties.ConfigPropertiesBridge;
13+
import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties;
14+
import java.util.Collections;
15+
import org.junit.jupiter.api.Test;
16+
import org.springframework.boot.autoconfigure.AutoConfigurations;
17+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
18+
import org.springframework.web.client.RestTemplate;
19+
20+
class SpringWebInstrumentationAutoConfigurationTest {
21+
22+
private final ApplicationContextRunner contextRunner =
23+
new ApplicationContextRunner()
24+
.withBean(OpenTelemetry.class, OpenTelemetry::noop)
25+
.withBean(
26+
InstrumentationConfig.class,
27+
() ->
28+
new ConfigPropertiesBridge(
29+
DefaultConfigProperties.createFromMap(Collections.emptyMap())))
30+
.withBean(RestTemplate.class, RestTemplate::new)
31+
.withConfiguration(
32+
AutoConfigurations.of(SpringWebInstrumentationSpringBoot4AutoConfiguration.class));
33+
34+
/**
35+
* Tests that users create {@link RestTemplate} bean is instrumented.
36+
*
37+
* <pre>{@code
38+
* @Bean public RestTemplate restTemplate() {
39+
* return new RestTemplate();
40+
* }
41+
* }</pre>
42+
*/
43+
@Test
44+
void instrumentationEnabled() {
45+
contextRunner
46+
.withPropertyValues("otel.instrumentation.spring-web.enabled=true")
47+
.withPropertyValues("otel.instrumentation.common.default-enabled=false")
48+
.run(
49+
context -> {
50+
assertThat(
51+
context.getBean(
52+
"otelRestTemplateBeanPostProcessor", RestTemplateBeanPostProcessor.class))
53+
.isNotNull();
54+
55+
assertThat(
56+
context.getBean(RestTemplate.class).getInterceptors().stream()
57+
.filter(
58+
rti ->
59+
rti.getClass()
60+
.getName()
61+
.startsWith("io.opentelemetry.instrumentation"))
62+
.count())
63+
.isEqualTo(1);
64+
});
65+
}
66+
67+
@Test
68+
void instrumentationDisabled() {
69+
contextRunner
70+
.withPropertyValues("otel.instrumentation.spring-web.enabled=false")
71+
.run(
72+
context ->
73+
assertThat(context.containsBean("otelRestTemplateBeanPostProcessor")).isFalse());
74+
}
75+
76+
@Test
77+
void instrumentationDisabledButAllEnabled() {
78+
contextRunner
79+
.withPropertyValues("otel.instrumentation.spring-web.enabled=false")
80+
.withPropertyValues("otel.instrumentation.common.default-enabled=true")
81+
.run(
82+
context ->
83+
assertThat(context.containsBean("otelRestTemplateBeanPostProcessor")).isFalse());
84+
}
85+
86+
@Test
87+
void allInstrumentationDisabled() {
88+
contextRunner
89+
.withPropertyValues("otel.instrumentation.common.default-enabled=false")
90+
.run(
91+
context ->
92+
assertThat(context.containsBean("otelRestTemplateBeanPostProcessor")).isFalse());
93+
}
94+
95+
@Test
96+
void defaultConfiguration() {
97+
contextRunner.run(
98+
context ->
99+
assertThat(
100+
context.getBean(
101+
"otelRestTemplateBeanPostProcessor", RestTemplateBeanPostProcessor.class))
102+
.isNotNull());
103+
}
104+
}

smoke-tests-otel-starter/spring-boot-4/src/test/java/io/opentelemetry/spring/smoketest/AbstractJvmKafkaSpringStarterSmokeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.opentelemetry.api.OpenTelemetry;
99
import io.opentelemetry.instrumentation.spring.autoconfigure.OpenTelemetryAutoConfiguration;
10-
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka.KafkaInstrumentationAutoConfiguration;
10+
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka.KafkaInstrumentationSpringBoot4AutoConfiguration;
1111
import io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.thread.ThreadDetailsAutoConfiguration;
1212
import java.time.Duration;
1313
import org.junit.jupiter.api.AfterAll;
@@ -54,7 +54,7 @@ void setUpContext() {
5454
ThreadDetailsAutoConfiguration.class,
5555
SpringSmokeOtelConfiguration.class,
5656
KafkaAutoConfiguration.class,
57-
KafkaInstrumentationAutoConfiguration.class,
57+
KafkaInstrumentationSpringBoot4AutoConfiguration.class,
5858
KafkaConfig.class))
5959
.withPropertyValues(
6060
"otel.instrumentation.kafka.experimental-span-attributes=true",

0 commit comments

Comments
 (0)