diff --git a/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java b/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java index ea2a79f49c2..6d5b7f21fa4 100644 --- a/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java +++ b/dd-java-agent/agent-jmxfetch/src/main/java/datadog/trace/agent/jmxfetch/JMXFetch.java @@ -116,6 +116,7 @@ private static void run(final StatsDClientManager statsDClientManager, final Con .refreshBeansPeriod(refreshBeansPeriod) .globalTags(globalTags) .reporter(reporter) + .jmxfetchTelemetry(config.isTelemetryJmxEnabled()) .connectionFactory(new AgentConnectionFactory()); if (config.isJmxFetchMultipleRuntimeServicesEnabled()) { diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index c7832b1a224..bd2328d5e3e 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -97,6 +97,7 @@ public final class GeneralConfig { public static final String TELEMETRY_DEPENDENCY_RESOLUTION_QUEUE_SIZE = "telemetry.dependency-resolution.queue.size"; public static final String TELEMETRY_DEBUG_REQUESTS_ENABLED = "telemetry.debug.requests.enabled"; + public static final String TELEMETRY_JMX_ENABLED = "telemetry.jmx.enabled"; public static final String AGENTLESS_LOG_SUBMISSION_ENABLED = "agentless.log.submission.enabled"; public static final String AGENTLESS_LOG_SUBMISSION_QUEUE_SIZE = "agentless.log.submission.queue.size"; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java index e2b3c0fc976..ec830c55cd5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceDumpJsonExporter.java @@ -3,12 +3,9 @@ import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; import com.squareup.moshi.Types; -import datadog.trace.api.flare.TracerFlare; import datadog.trace.core.DDSpan; -import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.zip.ZipOutputStream; public class TraceDumpJsonExporter implements Writer { @@ -17,11 +14,9 @@ public class TraceDumpJsonExporter implements Writer { .add(DDSpanJsonAdapter.buildFactory(false)) .build() .adapter(Types.newParameterizedType(Collection.class, DDSpan.class)); - private StringBuilder dumpText; - private ZipOutputStream zip; + private final StringBuilder dumpText; - public TraceDumpJsonExporter(ZipOutputStream zip) { - this.zip = zip; + public TraceDumpJsonExporter() { dumpText = new StringBuilder(); } @@ -32,7 +27,8 @@ public void write(final Collection trace) { @Override public void write(List trace) { - // Do nothing + Collection collectionTrace = trace; + write(collectionTrace); } @Override @@ -42,14 +38,14 @@ public void start() { @Override public boolean flush() { - try { - TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString()); - } catch (IOException e) { - // do nothing - } + // do nothing return true; } + public String getDumpJson() { + return dumpText.toString(); + } + @Override public void close() { // do nothing diff --git a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java index 5d2e32fecf6..98f06c65024 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/LongRunningTracesTracker.java @@ -1,14 +1,29 @@ package datadog.trace.core; +import static java.util.Comparator.comparingLong; + import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.config.TracerConfig; +import datadog.trace.api.flare.TracerFlare; +import datadog.trace.common.writer.TraceDumpJsonExporter; import datadog.trace.core.monitor.HealthMetrics; +import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.zip.ZipOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LongRunningTracesTracker implements TracerFlare.Reporter { + private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class); + private static final int MAX_DUMPED_TRACES = 50; + private static final Comparator TRACE_BY_START_TIME = + comparingLong(PendingTrace::getRunningTraceStartTime); -public class LongRunningTracesTracker { private final DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; private long lastFlushMilli = 0; @@ -21,6 +36,7 @@ public class LongRunningTracesTracker { private int dropped = 0; private int write = 0; private int expired = 0; + private int droppedSampling = 0; public static final int NOT_TRACKED = -1; public static final int UNDEFINED = 0; @@ -41,6 +57,18 @@ public LongRunningTracesTracker( (int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval()); this.features = sharedCommunicationObjects.featuresDiscovery(config); this.healthMetrics = healthMetrics; + + if (!features.supportsLongRunning()) { + LOGGER.warn( + "Long running trace tracking is enabled via {}, however the Datadog Agent version {} does not support receiving long running traces. " + + "Long running traces will be tracked locally in memory (up to {} traces) but will NOT be automatically reported to the agent. " + + "Long running traces are included in tracer flares.", + "dd." + TracerConfig.TRACE_LONG_RUNNING_ENABLED, + features.getVersion() != null ? features.getVersion() : "unknown", + maxTrackedTraces); + } + + TracerFlare.addReporter(this); } public boolean add(PendingTraceBuffer.Element element) { @@ -56,7 +84,7 @@ public boolean add(PendingTraceBuffer.Element element) { return true; } - private void addTrace(PendingTrace trace) { + private synchronized void addTrace(PendingTrace trace) { if (trace.empty()) { return; } @@ -67,7 +95,7 @@ private void addTrace(PendingTrace trace) { traceArray.add(trace); } - public void flushAndCompact(long nowMilli) { + public synchronized void flushAndCompact(long nowMilli) { if (nowMilli < lastFlushMilli + TimeUnit.SECONDS.toMillis(1)) { return; } @@ -78,7 +106,7 @@ public void flushAndCompact(long nowMilli) { cleanSlot(i); continue; } - if (trace.empty() || !features.supportsLongRunning()) { + if (trace.empty()) { trace.compareAndSetLongRunningState(WRITE_RUNNING_SPANS, NOT_TRACKED); cleanSlot(i); continue; @@ -92,12 +120,15 @@ public void flushAndCompact(long nowMilli) { if (shouldFlush(nowMilli, trace)) { if (negativeOrNullPriority(trace)) { trace.compareAndSetLongRunningState(TRACKED, NOT_TRACKED); + droppedSampling++; cleanSlot(i); continue; } - trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS); - write++; - trace.write(); + if (features.supportsLongRunning()) { + trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS); + write++; + trace.write(); + } } i++; } @@ -134,9 +165,28 @@ private boolean negativeOrNullPriority(PendingTrace trace) { } private void flushStats() { - healthMetrics.onLongRunningUpdate(dropped, write, expired); + healthMetrics.onLongRunningUpdate(dropped, write, expired, droppedSampling); dropped = 0; write = 0; expired = 0; + droppedSampling = 0; + } + + public synchronized String getTracesAsJson() { + try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) { + List traces = new ArrayList<>(traceArray); + traces.sort(TRACE_BY_START_TIME); + + int limit = Math.min(traces.size(), MAX_DUMPED_TRACES); + for (int i = 0; i < limit; i++) { + writer.write(traces.get(i).getSpans()); + } + return writer.getDumpJson(); + } + } + + @Override + public void addReportToFlare(ZipOutputStream zip) throws IOException { + TracerFlare.addText(zip, "long_running_traces.txt", getTracesAsJson()); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 0057eb2ce7d..14825963487 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -54,6 +54,7 @@ public interface Element { } private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { + private static final Logger LOGGER = LoggerFactory.getLogger(DelayingPendingTraceBuffer.class); private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5); private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500); private static final long SLEEP_TIME_MS = 100; @@ -64,6 +65,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private final MpscBlockingConsumerArrayQueue queue; private final Thread worker; private final TimeSource timeSource; + private final HealthMetrics healthMetrics; private volatile boolean closed = false; private final AtomicInteger flushCounter = new AtomicInteger(0); @@ -71,6 +73,23 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private final LongRunningTracesTracker runningTracesTracker; + private void dumpTraces() { + if (worker.isAlive()) { + int count = dumpCounter.get(); + int loop = 1; + boolean signaled = queue.offer(DUMP_ELEMENT); + while (!closed && !signaled) { + yieldOrSleep(loop++); + signaled = queue.offer(DUMP_ELEMENT); + } + int newCount = dumpCounter.get(); + while (!closed && count >= newCount) { + yieldOrSleep(loop++); + newCount = dumpCounter.get(); + } + } + } + public boolean longRunningSpansEnabled() { return runningTracesTracker != null; } @@ -85,6 +104,7 @@ public void enqueue(Element pendingTrace) { if (!pendingTrace.writeOnBufferFull()) { return; } + healthMetrics.onPendingWriteAround(); pendingTrace.write(); } } @@ -136,6 +156,18 @@ public void flush() { } } + private String getDumpJson() { + try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) { + for (Element e : DumpDrain.DUMP_DRAIN.collectTraces()) { + if (e instanceof PendingTrace) { + PendingTrace trace = (PendingTrace) e; + writer.write(trace.getSpans()); + } + } + return writer.getDumpJson(); + } + } + private static final class WriteDrain implements MessagePassingQueue.Consumer { private static final WriteDrain WRITE_DRAIN = new WriteDrain(); @@ -286,6 +318,17 @@ public void run() { } } + @Override + public String getTracesAsJson() throws IOException { + dumpTraces(); + String json = getDumpJson(); + if (json.isEmpty()) { + return "[]"; + } else { + return json; + } + } + public DelayingPendingTraceBuffer( int bufferSize, TimeSource timeSource, @@ -295,6 +338,7 @@ public DelayingPendingTraceBuffer( this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize); this.worker = newAgentThread(TRACE_MONITOR, new Worker()); this.timeSource = timeSource; + this.healthMetrics = healthMetrics; boolean runningSpansEnabled = config.isLongRunningTraceEnabled(); this.runningTracesTracker = runningSpansEnabled @@ -321,6 +365,11 @@ public void enqueue(Element pendingTrace) { log.debug( "PendingTrace enqueued but won't be reported. Root span: {}", pendingTrace.getRootSpan()); } + + @Override + public String getTracesAsJson() { + return "[]"; + } } public static PendingTraceBuffer delaying( @@ -345,6 +394,8 @@ public static PendingTraceBuffer discarding() { public abstract void enqueue(Element pendingTrace); + public abstract String getTracesAsJson() throws IOException; + private static class TracerDump implements TracerFlare.Reporter { private final DelayingPendingTraceBuffer buffer; @@ -354,32 +405,12 @@ private TracerDump(DelayingPendingTraceBuffer buffer) { @Override public void prepareForFlare() { - if (buffer.worker.isAlive()) { - int count = buffer.dumpCounter.get(); - int loop = 1; - boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); - while (!buffer.closed && !signaled) { - buffer.yieldOrSleep(loop++); - signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT); - } - int newCount = buffer.dumpCounter.get(); - while (!buffer.closed && count >= newCount) { - buffer.yieldOrSleep(loop++); - newCount = buffer.dumpCounter.get(); - } - } + buffer.dumpTraces(); } @Override public void addReportToFlare(ZipOutputStream zip) throws IOException { - TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip); - for (Element e : DelayingPendingTraceBuffer.DumpDrain.DUMP_DRAIN.collectTraces()) { - if (e instanceof PendingTrace) { - PendingTrace trace = (PendingTrace) e; - writer.write(trace.getSpans()); - } - } - writer.flush(); + TracerFlare.addText(zip, "pending_traces.txt", buffer.getDumpJson()); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index ea6ffdebf0b..d43bfc9c107 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -71,7 +71,10 @@ public void onSend( public void onFailedSend( final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {} - public void onLongRunningUpdate(final int dropped, final int write, final int expired) {} + public void onPendingWriteAround() {} + + public void onLongRunningUpdate( + final int dropped, final int write, final int expired, final int droppedSampling) {} /** * Report that a trace has been used to compute client stats. diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 6d00ac646a5..47d3de5a122 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -84,9 +84,12 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder scopeCloseErrors = new LongAdder(); private final LongAdder userScopeCloseErrors = new LongAdder(); + private final LongAdder pendingWriteAround = new LongAdder(); + private final LongAdder longRunningTracesWrite = new LongAdder(); private final LongAdder longRunningTracesDropped = new LongAdder(); private final LongAdder longRunningTracesExpired = new LongAdder(); + private final LongAdder longRunningTracesDroppedSampling = new LongAdder(); private final LongAdder clientStatsProcessedSpans = new LongAdder(); private final LongAdder clientStatsProcessedTraces = new LongAdder(); @@ -295,10 +298,17 @@ public void onFailedSend( } @Override - public void onLongRunningUpdate(final int dropped, final int write, final int expired) { + public void onPendingWriteAround() { + pendingWriteAround.increment(); + } + + @Override + public void onLongRunningUpdate( + final int dropped, final int write, final int expired, final int droppedSampling) { longRunningTracesWrite.add(write); longRunningTracesDropped.add(dropped); longRunningTracesExpired.add(expired); + longRunningTracesDroppedSampling.add(droppedSampling); } private void onSendAttempt( @@ -470,12 +480,19 @@ public void run(TracerHealthMetrics target) { reportIfChanged( target.statsd, "scope.user.close.error", target.userScopeCloseErrors, NO_TAGS); + reportIfChanged(target.statsd, "pending.write_around", target.pendingWriteAround, NO_TAGS); + reportIfChanged( target.statsd, "long-running.write", target.longRunningTracesWrite, NO_TAGS); reportIfChanged( target.statsd, "long-running.dropped", target.longRunningTracesDropped, NO_TAGS); reportIfChanged( target.statsd, "long-running.expired", target.longRunningTracesExpired, NO_TAGS); + reportIfChanged( + target.statsd, + "long-running.dropped_sampling", + target.longRunningTracesDroppedSampling, + NO_TAGS); reportIfChanged( target.statsd, "stats.traces_in", target.clientStatsProcessedTraces, NO_TAGS); @@ -599,12 +616,17 @@ public String summary() { + "\nuserScopeCloseErrors=" + userScopeCloseErrors.sum() + "\n" + + "\npendingWriteAround=" + + pendingWriteAround.sum() + + "\n" + "\nlongRunningTracesWrite=" + longRunningTracesWrite.sum() + "\nlongRunningTracesDropped=" + longRunningTracesDropped.sum() + "\nlongRunningTracesExpired=" + longRunningTracesExpired.sum() + + "\nlongRunningTracesDroppedSampling=" + + longRunningTracesDroppedSampling.sum() + "\n" + "\nclientStatsRequests=" + clientStatsRequests.sum() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy index 7edd12cf2ed..915e358305f 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/LongRunningTracesTrackerTest.groovy @@ -114,7 +114,7 @@ class LongRunningTracesTrackerTest extends DDSpecification { trace.longRunningTrackedState == LongRunningTracesTracker.EXPIRED } - def "agent disabled feature"() { + def "trace remains tracked but not written when agent long running feature not available"() { given: def trace = newTraceToTrack() tracker.add(trace) @@ -124,7 +124,9 @@ class LongRunningTracesTrackerTest extends DDSpecification { then: 1 * features.supportsLongRunning() >> false - tracker.traceArray.size() == 0 + tracker.traceArray.size() == 1 + tracker.traceArray[0].longRunningTrackedState == LongRunningTracesTracker.TRACKED + tracker.traceArray[0].getLastWriteTime() == 0 } def flushAt(long timeMilli) { @@ -192,4 +194,44 @@ class LongRunningTracesTrackerTest extends DDSpecification { PrioritySampling.USER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS PrioritySampling.SAMPLER_KEEP | 1 | LongRunningTracesTracker.WRITE_RUNNING_SPANS } + + def "getTracesAsJson with no traces"() { + when: + def json = tracker.getTracesAsJson() + + then: + json == "" + } + + def "getTracesAsJson with traces"() { + given: + def trace = newTraceToTrack() + tracker.add(trace) + + when: + def json = tracker.getTracesAsJson() + + then: + json != null + !json.isEmpty() + json.contains('"service"') + json.contains('"name"') + } + + def "testing tracer flare dump with trace"() { + given: + def trace = newTraceToTrack() + tracker.add(trace) + + when: + def entries = PendingTraceBufferTest.buildAndExtractZip() + + then: + entries.size() == 1 + entries.containsKey("long_running_traces.txt") + + def jsonContent = entries["long_running_traces.txt"] as String + jsonContent.contains('"service"') + jsonContent.contains('"name"') + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy index c9bf72beee7..ccf1759b851 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy @@ -582,7 +582,7 @@ class PendingTraceBufferTest extends DDSpecification { return DDSpan.create("test", 0, context, null) } - def buildAndExtractZip() { + def static buildAndExtractZip() { TracerFlare.prepareForFlare() def out = new ByteArrayOutputStream() try (ZipOutputStream zip = new ZipOutputStream(out)) { diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index d828b06638d..064cfc360ac 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -381,6 +381,7 @@ import static datadog.trace.api.config.GeneralConfig.TELEMETRY_DEPENDENCY_RESOLUTION_QUEUE_SIZE; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_HEARTBEAT_INTERVAL; +import static datadog.trace.api.config.GeneralConfig.TELEMETRY_JMX_ENABLED; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_LOG_COLLECTION_ENABLED; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_METRICS_ENABLED; import static datadog.trace.api.config.GeneralConfig.TELEMETRY_METRICS_INTERVAL; @@ -1205,6 +1206,7 @@ public static String getHostName() { private final boolean isTelemetryDependencyServiceEnabled; private final boolean telemetryMetricsEnabled; private final boolean isTelemetryLogCollectionEnabled; + private final boolean isTelemetryJmxEnabled; private final int telemetryDependencyResolutionQueueSize; private final boolean azureAppServices; @@ -2058,6 +2060,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) && configProvider.getBoolean( TELEMETRY_LOG_COLLECTION_ENABLED, DEFAULT_TELEMETRY_LOG_COLLECTION_ENABLED); + isTelemetryJmxEnabled = configProvider.getBoolean(TELEMETRY_JMX_ENABLED, false); + isTelemetryDependencyServiceEnabled = configProvider.getBoolean( TELEMETRY_DEPENDENCY_COLLECTION_ENABLED, @@ -3577,6 +3581,10 @@ public boolean isTelemetryLogCollectionEnabled() { return isTelemetryLogCollectionEnabled; } + public boolean isTelemetryJmxEnabled() { + return isTelemetryJmxEnabled; + } + public int getTelemetryDependencyResolutionQueueSize() { return telemetryDependencyResolutionQueueSize; } diff --git a/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java b/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java index f2ba39041a4..907024f2cd9 100644 --- a/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java +++ b/internal-api/src/main/java/datadog/trace/api/flare/TracerFlare.java @@ -4,6 +4,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -67,6 +69,19 @@ public static void addReporter(Reporter reporter) { reporters.put(reporter.getClass(), reporter); } + public static Collection getReporters() { + return Collections.unmodifiableCollection(reporters.values()); + } + + public static Reporter getReporter(String className) { + for (Reporter reporter : getReporters()) { + if (reporter.getClass().getName().equals(className)) { + return reporter; + } + } + return null; + } + public static void addText(ZipOutputStream zip, String section, String text) throws IOException { zip.putNextEntry(new ZipEntry(section)); if (null != text) { diff --git a/utils/flare-utils/build.gradle.kts b/utils/flare-utils/build.gradle.kts index f718826c226..1952fea5cea 100644 --- a/utils/flare-utils/build.gradle.kts +++ b/utils/flare-utils/build.gradle.kts @@ -12,4 +12,7 @@ dependencies { implementation(project(":utils:version-utils")) implementation(project(":internal-api")) implementation(libs.slf4j) + + testImplementation(project(":utils:test-utils")) + testImplementation(project(":dd-trace-api")) } diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManager.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManager.java new file mode 100644 index 00000000000..b2928d0cdc2 --- /dev/null +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManager.java @@ -0,0 +1,200 @@ +package datadog.flare; + +import datadog.trace.api.Config; +import datadog.trace.api.flare.TracerFlare; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MBean implementation for managing and accessing tracer flare data. + * + *

This class provides JMX operations to list flare data sources and retrieve flare data either + * for individual sources or a complete flare archive. See {@link TracerFlareManagerMBean} for + * documentation on the exposed operations. + */ +public class TracerFlareManager implements TracerFlareManagerMBean { + private static final Logger LOGGER = LoggerFactory.getLogger(TracerFlareManager.class); + + private final TracerFlareService flareService; + protected ObjectName mbeanName; + + public TracerFlareManager(TracerFlareService flareService) { + this.flareService = flareService; + } + + @Override + public String generateFullFlareZip() throws IOException { + TracerFlare.prepareForFlare(); + + long currentMillis = System.currentTimeMillis(); + boolean dumpThreads = Config.get().isTriageEnabled() || LOGGER.isDebugEnabled(); + byte[] zipBytes = flareService.buildFlareZip(currentMillis, currentMillis, dumpThreads); + return Base64.getEncoder().encodeToString(zipBytes); + } + + @Override + public String listFlareFiles() throws IOException { + TracerFlare.prepareForFlare(); + + StringBuilder result = new StringBuilder(); + + for (Map.Entry entry : TracerFlareService.BUILT_IN_SOURCES.entrySet()) { + String sourceName = entry.getKey(); + String[] files = entry.getValue(); + + for (String filename : files) { + result.append(sourceName).append(" ").append(filename).append("\n"); + } + } + + for (TracerFlare.Reporter reporter : TracerFlare.getReporters()) { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ZipOutputStream zip = new ZipOutputStream(bytes)) { + reporter.addReportToFlare(zip); + zip.finish(); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes.toByteArray()); + ZipInputStream zis = new ZipInputStream(bais)) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + result + .append(reporter.getClass().getName()) + .append(" ") + .append(entry.getName()) + .append("\n"); + zis.closeEntry(); + } + } + } catch (IOException e) { + LOGGER.debug("Failed to inspect reporter {}", reporter.getClass().getName(), e); + } + } + + return result.toString(); + } + + @Override + public String getFlareFile(String sourceName, String filename) throws IOException { + final byte[] zipBytes; + if (isBuiltInSource(sourceName)) { + zipBytes = flareService.getBuiltInSourceZip(sourceName); + } else { + zipBytes = getReporterFile(sourceName); + } + return extractFileFromZip(zipBytes, filename); + } + + private boolean isBuiltInSource(String sourceName) { + return TracerFlareService.BUILT_IN_SOURCES.containsKey(sourceName); + } + + /** + * Generates flare data for a specific reporter. + * + *

The reporter's data is generated as a ZIP file, and the specified filename is extracted. If + * the file is text, it is returned as plain text; if binary, it is returned base64-encoded. + * + * @param reporterClassName the fully qualified class name of the reporter + * @return the zip file containing the reporter's content + * @throws IOException if an error occurs while generating the flare + */ + private byte[] getReporterFile(String reporterClassName) throws IOException { + TracerFlare.Reporter reporter = TracerFlare.getReporter(reporterClassName); + if (reporter == null) { + throw new IOException("Error: Reporter not found: " + reporterClassName); + } + + reporter.prepareForFlare(); + + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ZipOutputStream zip = new ZipOutputStream(bytes)) { + reporter.addReportToFlare(zip); + zip.finish(); + return bytes.toByteArray(); + } + } + + /** + * Extracts a specific file from a ZIP archive. + * + *

Searches through the ZIP entries for the specified filename and returns its content. If the + * file name ends in ".txt", it is returned as plain text; if binary, it is returned + * base64-encoded. + * + * @param zipBytes the ZIP file bytes + * @param filename the name of the file to extract + * @return the file content (plain text or base64-encoded binary) + * @throws IOException if an error occurs while reading the ZIP + */ + private String extractFileFromZip(byte[] zipBytes, String filename) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(zipBytes); + ZipInputStream zis = new ZipInputStream(bais)) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + if (entry.getName().equals(filename)) { + ByteArrayOutputStream content = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = zis.read(buffer)) != -1) { + content.write(buffer, 0, bytesRead); + } + zis.closeEntry(); + + byte[] contentBytes = content.toByteArray(); + if (entry.getName().endsWith(".txt")) { + return new String(contentBytes, StandardCharsets.UTF_8); + } else { + return Base64.getEncoder().encodeToString(contentBytes); + } + } + zis.closeEntry(); + } + + throw new IOException("Failed to extract file: " + filename); + } + } + + void registerMBean() { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + + try { + mbeanName = new ObjectName("datadog.flare:type=TracerFlare"); + mbs.registerMBean(this, mbeanName); + LOGGER.info("Registered TracerFlare MBean at {}", mbeanName); + } catch (MalformedObjectNameException + | InstanceAlreadyExistsException + | MBeanRegistrationException + | NotCompliantMBeanException e) { + LOGGER.warn("Failed to register TracerFlare MBean", e); + mbeanName = null; + } + } + + void unregisterMBean() { + if (mbeanName != null) { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try { + mbs.unregisterMBean(mbeanName); + LOGGER.debug("Unregistered TracerFlare MBean"); + } catch (Exception e) { + LOGGER.warn("Failed to unregister TracerFlare MBean", e); + } + } + } +} diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManagerMBean.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManagerMBean.java new file mode 100644 index 00000000000..010bb7df04b --- /dev/null +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareManagerMBean.java @@ -0,0 +1,53 @@ +package datadog.flare; + +import java.io.IOException; + +/** + * MBean interface for managing and accessing tracer flare data. + * + *

This interface provides JMX operations to inspect flare data sources and generate flare data + * either for individual sources or as a complete ZIP archive. Sources include both registered + * reporters and built-in data (config, runtime, flare prelude, etc.). + */ +public interface TracerFlareManagerMBean { + /** + * Lists all available flare files from all sources. + * + *

Returns a newline-separated string where each line is formatted as "<source> + * <file>". This format makes it easy to pass the source and filename to {@link + * #getFlareFile(String, String)}. + * + *

Example output: + * + *

+   * config initial_config.txt
+   * ...
+   * datadog.trace.agent.core.CoreTracer tracer_health.txt
+   * ...
+   * 
+ * + * @return newline-separated string listing all available files and their source name + * @throws IOException if an error occurs + */ + String listFlareFiles() throws IOException; + + /** + * Returns a specific flare file by source name and filename. + * + *

If the file is text, it is returned as plain text; if binary, it is returned base64-encoded. + * + * @param sourceName the name of the source (reporter class name or built-in source name) + * @param filename the name of the file to retrieve + * @return the file content (plain text or base64-encoded binary) + * @throws IOException if an error occurs while generating or extracting the data + */ + String getFlareFile(String sourceName, String filename) throws IOException; + + /** + * Generates a complete tracer flare as a ZIP file. + * + * @return base64-encoded ZIP file containing the complete flare data + * @throws IOException if an error occurs while generating the flare ZIP + */ + String generateFullFlareZip() throws IOException; +} diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java index 49755cf24d0..b695f0112c5 100644 --- a/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlarePoller.java @@ -54,6 +54,9 @@ private void doStop() { if (null != stopSubmitter) { stopSubmitter.run(); } + if (null != tracerFlareService) { + tracerFlareService.close(); + } } final class Preparer implements ProductListener { diff --git a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java index dcb097287a3..e8b65f513d7 100644 --- a/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java +++ b/utils/flare-utils/src/main/java/datadog/flare/TracerFlareService.java @@ -24,6 +24,8 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.zip.ZipOutputStream; import okhttp3.HttpUrl; @@ -49,11 +51,23 @@ final class TracerFlareService { private static final int MAX_LOGFILE_SIZE_BYTES = MAX_LOGFILE_SIZE_MB << 20; + static final Map BUILT_IN_SOURCES = new HashMap<>(); + + static { + BUILT_IN_SOURCES.put("prelude", new String[] {"flare_info.txt", "tracer_version.txt"}); + BUILT_IN_SOURCES.put("config", new String[] {"initial_config.txt"}); + BUILT_IN_SOURCES.put( + "runtime", + new String[] {"jvm_args.txt", "classpath.txt", "library_path.txt", "boot_classpath.txt"}); + BUILT_IN_SOURCES.put("threads", new String[] {"threads.txt"}); + } + private final AgentTaskScheduler scheduler = new AgentTaskScheduler(TRACER_FLARE); private final Config config; private final OkHttpClient okHttpClient; private final HttpUrl flareUrl; + private final TracerFlareManager jmxManager; private boolean logLevelOverridden; private volatile long flareStartMillis; @@ -65,9 +79,22 @@ final class TracerFlareService { this.okHttpClient = okHttpClient; this.flareUrl = agentUrl.newBuilder().addPathSegments(FLARE_ENDPOINT).build(); + if (config.isTelemetryJmxEnabled()) { + jmxManager = new TracerFlareManager(this); + jmxManager.registerMBean(); + } else { + jmxManager = null; + } + applyTriageReportTrigger(config.getTriageReportTrigger()); } + public void close() { + if (jmxManager != null) { + jmxManager.unregisterMBean(); + } + } + private void applyTriageReportTrigger(String triageTrigger) { if (null != triageTrigger && !triageTrigger.isEmpty()) { long delay = TimeUtils.parseSimpleDelay(triageTrigger); @@ -194,11 +221,13 @@ private String getFlareName(long endMillis) { return REPORT_PREFIX + config.getRuntimeId() + "-" + endMillis + ".zip"; } - private byte[] buildFlareZip(long startMillis, long endMillis, boolean dumpThreads) + public byte[] buildFlareZip(long startMillis, long endMillis, boolean dumpThreads) throws IOException { try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); ZipOutputStream zip = new ZipOutputStream(bytes)) { + // Make sure to update BUILT_IN_SOURCES and getBuiltInSourceZip if this list of functions + // changes addPrelude(zip, startMillis, endMillis); addConfig(zip); addRuntime(zip); @@ -212,17 +241,47 @@ private byte[] buildFlareZip(long startMillis, long endMillis, boolean dumpThrea } } + /** Generates a ZIP file for JMX fetch for a built-in source. */ + byte[] getBuiltInSourceZip(String sourceName) throws IOException { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ZipOutputStream zip = new ZipOutputStream(bytes)) { + + switch (sourceName) { + case "prelude": + addPrelude(zip, flareStartMillis, System.currentTimeMillis()); + break; + case "config": + addConfig(zip); + break; + case "runtime": + addRuntime(zip); + break; + case "threads": + addThreadDump(zip); + break; + default: + throw new IOException("Unknown source name: " + sourceName); + } + + zip.finish(); + return bytes.toByteArray(); + } + } + private void addPrelude(ZipOutputStream zip, long startMillis, long endMillis) throws IOException { + // Make sure to update BUILT_IN_SOURCES if the files change here. TracerFlare.addText(zip, "flare_info.txt", flareInfo(startMillis, endMillis)); TracerFlare.addText(zip, "tracer_version.txt", VERSION); } private void addConfig(ZipOutputStream zip) throws IOException { + // Make sure to update BUILT_IN_SOURCES if the files change here. TracerFlare.addText(zip, "initial_config.txt", config.toString()); } private void addRuntime(ZipOutputStream zip) throws IOException { + // Make sure to update BUILT_IN_SOURCES if the files change here. try { RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); TracerFlare.addText(zip, "jvm_args.txt", String.join(" ", runtimeMXBean.getInputArguments())); diff --git a/utils/flare-utils/src/test/groovy/datadog/flare/TracerFlareJmxTest.groovy b/utils/flare-utils/src/test/groovy/datadog/flare/TracerFlareJmxTest.groovy new file mode 100644 index 00000000000..e675fb8ccb2 --- /dev/null +++ b/utils/flare-utils/src/test/groovy/datadog/flare/TracerFlareJmxTest.groovy @@ -0,0 +1,73 @@ +package datadog.flare + +import static datadog.trace.api.config.GeneralConfig.TELEMETRY_JMX_ENABLED + +import datadog.trace.api.Config +import datadog.trace.test.util.DDSpecification +import okhttp3.HttpUrl +import spock.lang.Timeout + +import javax.management.MBeanServer +import javax.management.ObjectName +import java.lang.management.ManagementFactory + +@Timeout(1) +class TracerFlareJmxTest extends DDSpecification { + static final ObjectName MBEAN_NAME = new ObjectName("datadog.flare:type=TracerFlare") + + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer() + + TracerFlareService tracerFlareService + + def cleanup() { + if (tracerFlareService != null) { + tracerFlareService.close() + } + } + + private void createTracerFlareService() { + tracerFlareService = new TracerFlareService( + Config.get(), + null, // okHttpClient - not needed for JMX test + HttpUrl.get("http://localhost:8126") + ) + } + + def "TracerFlare MBean is registered when telemetry JMX is enabled"() { + given: + injectSysConfig(TELEMETRY_JMX_ENABLED, "true") + + when: + createTracerFlareService() + + then: + mbs.isRegistered(MBEAN_NAME) + } + + def "TracerFlare MBean is not registered when telemetry JMX is disabled"() { + given: + injectSysConfig(TELEMETRY_JMX_ENABLED, "false") + + when: + createTracerFlareService() + + then: + !mbs.isRegistered(MBEAN_NAME) + } + + def "TracerFlare MBean operations work when JMX is enabled"() { + given: + injectSysConfig(TELEMETRY_JMX_ENABLED, "true") + createTracerFlareService() + + when: + def fileList = mbs.invoke(MBEAN_NAME, "listFlareFiles", null, null) as String + + then: + mbs.isRegistered(MBEAN_NAME) + fileList != null + fileList.contains("flare_info.txt") + fileList.contains("tracer_version.txt") + fileList.contains("initial_config.txt") + } +} \ No newline at end of file