Skip to content

Commit 2bb3448

Browse files
committed
POC for trace_based log record processor
1 parent 07dbde4 commit 2bb3448

File tree

8 files changed

+526
-6
lines changed

8 files changed

+526
-6
lines changed

sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
/**
1818
* ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration.
1919
*
20-
* <p>This provider creates a {@link SeverityBasedLogRecordProcessor} that filters log records
21-
* based on minimum severity level. Only log records with a severity level greater than or
22-
* equal to the configured minimum are forwarded to the configured downstream processors.
23-
*
2420
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
2521
* at any time.
2622
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.fileconfig.internal;
7+
8+
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
9+
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
10+
import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration;
11+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
12+
import io.opentelemetry.sdk.logs.TraceBasedLogRecordProcessor;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
16+
/**
17+
* ComponentProvider for TraceBasedLogRecordProcessor to support declarative configuration.
18+
*
19+
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
20+
* at any time.
21+
*/
22+
public class TraceBasedLogRecordProcessorComponentProvider
23+
implements ComponentProvider<LogRecordProcessor> {
24+
25+
@Override
26+
public Class<LogRecordProcessor> getType() {
27+
return LogRecordProcessor.class;
28+
}
29+
30+
@Override
31+
public String getName() {
32+
return "trace_based";
33+
}
34+
35+
@Override
36+
public LogRecordProcessor create(DeclarativeConfigProperties config) {
37+
List<DeclarativeConfigProperties> processorConfigs = config.getStructuredList("processors");
38+
if (processorConfigs == null || processorConfigs.isEmpty()) {
39+
throw new IllegalArgumentException(
40+
"At least one processor is required for trace_based log processors");
41+
}
42+
43+
List<LogRecordProcessor> processors = new ArrayList<>();
44+
for (DeclarativeConfigProperties processorConfig : processorConfigs) {
45+
LogRecordProcessor processor =
46+
DeclarativeConfiguration.createLogRecordProcessor(processorConfig);
47+
processors.add(processor);
48+
}
49+
50+
return TraceBasedLogRecordProcessor.builder().addProcessors(processors).build();
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
io.opentelemetry.sdk.extension.incubator.fileconfig.ServiceResourceDetector
22
io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider
3+
io.opentelemetry.sdk.extension.incubator.fileconfig.internal.TraceBasedLogRecordProcessorComponentProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.extension.incubator.fileconfig;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
10+
11+
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
12+
import io.opentelemetry.common.ComponentLoader;
13+
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.TraceBasedLogRecordProcessorComponentProvider;
14+
import io.opentelemetry.sdk.logs.LogRecordProcessor;
15+
import io.opentelemetry.sdk.logs.TraceBasedLogRecordProcessor;
16+
import java.io.ByteArrayInputStream;
17+
import java.nio.charset.StandardCharsets;
18+
import java.util.Collections;
19+
import org.junit.jupiter.api.Test;
20+
21+
class TraceBasedLogRecordProcessorComponentProviderTest {
22+
23+
@Test
24+
void createTraceBasedProcessor_DirectComponentProvider() {
25+
TraceBasedLogRecordProcessorComponentProvider provider =
26+
new TraceBasedLogRecordProcessorComponentProvider();
27+
28+
assertThat(provider.getType()).isEqualTo(LogRecordProcessor.class);
29+
assertThat(provider.getName()).isEqualTo("trace_based");
30+
}
31+
32+
@Test
33+
void createTraceBasedProcessor_ValidConfig() {
34+
DeclarativeConfigProperties config =
35+
getConfig(
36+
"processors:\n" // this comment exists only to influence spotless formatting
37+
+ " - simple:\n"
38+
+ " exporter:\n"
39+
+ " console: {}\n");
40+
41+
TraceBasedLogRecordProcessorComponentProvider provider =
42+
new TraceBasedLogRecordProcessorComponentProvider();
43+
44+
LogRecordProcessor processor = provider.create(config);
45+
46+
assertThat(processor).isInstanceOf(TraceBasedLogRecordProcessor.class);
47+
48+
assertThat(processor.toString())
49+
.contains("TraceBasedLogRecordProcessor")
50+
.contains("delegate=SimpleLogRecordProcessor")
51+
.contains("logRecordExporter=SystemOutLogRecordExporter");
52+
}
53+
54+
@Test
55+
void createTraceBasedProcessor_MissingProcessors() {
56+
DeclarativeConfigProperties config = getConfig("");
57+
58+
TraceBasedLogRecordProcessorComponentProvider provider =
59+
new TraceBasedLogRecordProcessorComponentProvider();
60+
61+
assertThatThrownBy(() -> provider.create(config))
62+
.isInstanceOf(IllegalArgumentException.class)
63+
.hasMessage("At least one processor is required for trace_based log processors");
64+
}
65+
66+
@Test
67+
void createTraceBasedProcessor_EmptyProcessors() {
68+
DeclarativeConfigProperties config = getConfig("processors: []\n");
69+
70+
TraceBasedLogRecordProcessorComponentProvider provider =
71+
new TraceBasedLogRecordProcessorComponentProvider();
72+
73+
assertThatThrownBy(() -> provider.create(config))
74+
.isInstanceOf(IllegalArgumentException.class)
75+
.hasMessage("At least one processor is required for trace_based log processors");
76+
}
77+
78+
@Test
79+
void createTraceBasedProcessor_MultipleProcessors() {
80+
DeclarativeConfigProperties config =
81+
getConfig(
82+
"processors:\n"
83+
+ " - simple:\n"
84+
+ " exporter:\n"
85+
+ " console: {}\n"
86+
+ " - simple:\n"
87+
+ " exporter:\n"
88+
+ " console: {}\n");
89+
90+
TraceBasedLogRecordProcessorComponentProvider provider =
91+
new TraceBasedLogRecordProcessorComponentProvider();
92+
93+
LogRecordProcessor processor = provider.create(config);
94+
95+
assertThat(processor).isInstanceOf(TraceBasedLogRecordProcessor.class);
96+
assertThat(processor.toString()).contains("TraceBasedLogRecordProcessor");
97+
}
98+
99+
private static DeclarativeConfigProperties getConfig(String yaml) {
100+
Object yamlObj =
101+
DeclarativeConfiguration.loadYaml(
102+
new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)),
103+
Collections.emptyMap());
104+
105+
return DeclarativeConfiguration.toConfigProperties(
106+
yamlObj,
107+
ComponentLoader.forClassLoader(
108+
TraceBasedLogRecordProcessorComponentProviderTest.class.getClassLoader()));
109+
}
110+
}

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
* Implementation of {@link LogRecordProcessor} that filters log records based on minimum severity
1717
* level and delegates to downstream processors.
1818
*
19-
* <p>This processor only forwards log records to downstream processors if the log record's severity
20-
* level is greater than or equal to the configured minimum severity level.
19+
* <p>Only log records with severity greater than or equal to the configured minimum are forwarded.
2120
*/
2221
public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor {
2322

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.logs;
7+
8+
import static java.util.Objects.requireNonNull;
9+
10+
import io.opentelemetry.context.Context;
11+
import io.opentelemetry.sdk.common.CompletableResultCode;
12+
import java.util.List;
13+
14+
/**
15+
* Implementation of {@link LogRecordProcessor} that filters log records based on trace sampling
16+
* status and delegates to downstream processors.
17+
*
18+
* <p>Only log records associated with sampled traces are forwarded.
19+
*/
20+
public final class TraceBasedLogRecordProcessor implements LogRecordProcessor {
21+
22+
private final LogRecordProcessor delegate;
23+
24+
TraceBasedLogRecordProcessor(List<LogRecordProcessor> processors) {
25+
requireNonNull(processors, "processors");
26+
this.delegate = LogRecordProcessor.composite(processors);
27+
}
28+
29+
/**
30+
* Returns a new {@link TraceBasedLogRecordProcessorBuilder} to construct a {@link
31+
* TraceBasedLogRecordProcessor}.
32+
*
33+
* @return a new {@link TraceBasedLogRecordProcessorBuilder}
34+
*/
35+
public static TraceBasedLogRecordProcessorBuilder builder() {
36+
return new TraceBasedLogRecordProcessorBuilder();
37+
}
38+
39+
@Override
40+
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
41+
if (logRecord.getSpanContext().isSampled()) {
42+
delegate.onEmit(context, logRecord);
43+
}
44+
}
45+
46+
@Override
47+
public CompletableResultCode shutdown() {
48+
return delegate.shutdown();
49+
}
50+
51+
@Override
52+
public CompletableResultCode forceFlush() {
53+
return delegate.forceFlush();
54+
}
55+
56+
@Override
57+
public String toString() {
58+
return "TraceBasedLogRecordProcessor{" + "delegate=" + delegate + '}';
59+
}
60+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.sdk.logs;
7+
8+
import static java.util.Objects.requireNonNull;
9+
10+
import java.util.ArrayList;
11+
import java.util.Arrays;
12+
import java.util.List;
13+
14+
/** Builder class for {@link TraceBasedLogRecordProcessor}. */
15+
public final class TraceBasedLogRecordProcessorBuilder {
16+
17+
private final List<LogRecordProcessor> processors = new ArrayList<>();
18+
19+
TraceBasedLogRecordProcessorBuilder() {}
20+
21+
/**
22+
* Adds multiple {@link LogRecordProcessor}s to the list of downstream processors.
23+
*
24+
* @param processors the processors to add
25+
* @return this builder
26+
*/
27+
public TraceBasedLogRecordProcessorBuilder addProcessors(LogRecordProcessor... processors) {
28+
requireNonNull(processors, "processors");
29+
addProcessors(Arrays.asList(processors));
30+
return this;
31+
}
32+
33+
/**
34+
* Adds multiple {@link LogRecordProcessor}s to the list of downstream processors.
35+
*
36+
* @param processors the processors to add
37+
* @return this builder
38+
*/
39+
public TraceBasedLogRecordProcessorBuilder addProcessors(
40+
Iterable<LogRecordProcessor> processors) {
41+
42+
requireNonNull(processors, "processors");
43+
for (LogRecordProcessor processor : processors) {
44+
requireNonNull(processor, "processor");
45+
this.processors.add(processor);
46+
}
47+
return this;
48+
}
49+
50+
/**
51+
* Returns a new {@link TraceBasedLogRecordProcessor} with the configuration of this builder.
52+
*
53+
* @return a new {@link TraceBasedLogRecordProcessor}
54+
* @throws IllegalArgumentException if no processors have been added
55+
*/
56+
public TraceBasedLogRecordProcessor build() {
57+
if (processors.isEmpty()) {
58+
throw new IllegalArgumentException("At least one processor must be added");
59+
}
60+
return new TraceBasedLogRecordProcessor(processors);
61+
}
62+
}

0 commit comments

Comments
 (0)