Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private static void run(final StatsDClientManager statsDClientManager, final Con
.refreshBeansPeriod(refreshBeansPeriod)
.globalTags(globalTags)
.reporter(reporter)
.jmxfetchTelemetry(config.isTelemetryJmxEnabled())
.connectionFactory(new AgentConnectionFactory());

if (config.isJmxFetchMultipleRuntimeServicesEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public final class GeneralConfig {
public static final String TELEMETRY_DEPENDENCY_RESOLUTION_QUEUE_SIZE =
"telemetry.dependency-resolution.queue.size";
public static final String TELEMETRY_DEBUG_REQUESTS_ENABLED = "telemetry.debug.requests.enabled";
public static final String TELEMETRY_JMX_ENABLED = "telemetry.jmx.enabled";
public static final String AGENTLESS_LOG_SUBMISSION_ENABLED = "agentless.log.submission.enabled";
public static final String AGENTLESS_LOG_SUBMISSION_QUEUE_SIZE =
"agentless.log.submission.queue.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.core.DDSpan;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.zip.ZipOutputStream;

public class TraceDumpJsonExporter implements Writer {

Expand All @@ -17,11 +14,9 @@ public class TraceDumpJsonExporter implements Writer {
.add(DDSpanJsonAdapter.buildFactory(false))
.build()
.adapter(Types.newParameterizedType(Collection.class, DDSpan.class));
private StringBuilder dumpText;
private ZipOutputStream zip;
private final StringBuilder dumpText;

public TraceDumpJsonExporter(ZipOutputStream zip) {
this.zip = zip;
public TraceDumpJsonExporter() {
dumpText = new StringBuilder();
}

Expand All @@ -32,7 +27,8 @@ public void write(final Collection<DDSpan> trace) {

@Override
public void write(List<DDSpan> trace) {
// Do nothing
Collection<DDSpan> collectionTrace = trace;
write(collectionTrace);
}

@Override
Expand All @@ -42,14 +38,14 @@ public void start() {

@Override
public boolean flush() {
try {
TracerFlare.addText(zip, "pending_traces.txt", dumpText.toString());
} catch (IOException e) {
// do nothing
}
// do nothing
return true;
}

public String getDumpJson() {
return dumpText.toString();
}

@Override
public void close() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
package datadog.trace.core;

import static java.util.Comparator.comparingLong;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.config.TracerConfig;
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.common.writer.TraceDumpJsonExporter;
import datadog.trace.core.monitor.HealthMetrics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LongRunningTracesTracker implements TracerFlare.Reporter {
private static final Logger LOGGER = LoggerFactory.getLogger(LongRunningTracesTracker.class);
private static final int MAX_DUMPED_TRACES = 50;
private static final Comparator<PendingTrace> TRACE_BY_START_TIME =
comparingLong(PendingTrace::getRunningTraceStartTime);

public class LongRunningTracesTracker {
private final DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;
private long lastFlushMilli = 0;
Expand All @@ -21,6 +36,7 @@ public class LongRunningTracesTracker {
private int dropped = 0;
private int write = 0;
private int expired = 0;
private int droppedSampling = 0;

public static final int NOT_TRACKED = -1;
public static final int UNDEFINED = 0;
Expand All @@ -41,6 +57,18 @@ public LongRunningTracesTracker(
(int) TimeUnit.SECONDS.toMillis(config.getLongRunningTraceFlushInterval());
this.features = sharedCommunicationObjects.featuresDiscovery(config);
this.healthMetrics = healthMetrics;

if (!features.supportsLongRunning()) {
LOGGER.warn(
"Long running trace tracking is enabled via {}, however the Datadog Agent version {} does not support receiving long running traces. "
+ "Long running traces will be tracked locally in memory (up to {} traces) but will NOT be automatically reported to the agent. "
+ "Long running traces are included in tracer flares.",
"dd." + TracerConfig.TRACE_LONG_RUNNING_ENABLED,
features.getVersion() != null ? features.getVersion() : "unknown",
maxTrackedTraces);
}

TracerFlare.addReporter(this);
}

public boolean add(PendingTraceBuffer.Element element) {
Expand All @@ -56,7 +84,7 @@ public boolean add(PendingTraceBuffer.Element element) {
return true;
}

private void addTrace(PendingTrace trace) {
private synchronized void addTrace(PendingTrace trace) {
if (trace.empty()) {
return;
}
Expand All @@ -67,7 +95,7 @@ private void addTrace(PendingTrace trace) {
traceArray.add(trace);
}

public void flushAndCompact(long nowMilli) {
public synchronized void flushAndCompact(long nowMilli) {
if (nowMilli < lastFlushMilli + TimeUnit.SECONDS.toMillis(1)) {
return;
}
Expand All @@ -78,7 +106,7 @@ public void flushAndCompact(long nowMilli) {
cleanSlot(i);
continue;
}
if (trace.empty() || !features.supportsLongRunning()) {
if (trace.empty()) {
trace.compareAndSetLongRunningState(WRITE_RUNNING_SPANS, NOT_TRACKED);
cleanSlot(i);
continue;
Expand All @@ -92,12 +120,15 @@ public void flushAndCompact(long nowMilli) {
if (shouldFlush(nowMilli, trace)) {
if (negativeOrNullPriority(trace)) {
trace.compareAndSetLongRunningState(TRACKED, NOT_TRACKED);
droppedSampling++;
cleanSlot(i);
continue;
}
trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS);
write++;
trace.write();
if (features.supportsLongRunning()) {
trace.compareAndSetLongRunningState(TRACKED, WRITE_RUNNING_SPANS);
write++;
trace.write();
}
}
i++;
}
Expand Down Expand Up @@ -134,9 +165,28 @@ private boolean negativeOrNullPriority(PendingTrace trace) {
}

private void flushStats() {
healthMetrics.onLongRunningUpdate(dropped, write, expired);
healthMetrics.onLongRunningUpdate(dropped, write, expired, droppedSampling);
dropped = 0;
write = 0;
expired = 0;
droppedSampling = 0;
}

public synchronized String getTracesAsJson() {
try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) {
List<PendingTrace> traces = new ArrayList<>(traceArray);
traces.sort(TRACE_BY_START_TIME);

int limit = Math.min(traces.size(), MAX_DUMPED_TRACES);
for (int i = 0; i < limit; i++) {
writer.write(traces.get(i).getSpans());
}
return writer.getDumpJson();
}
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TracerFlare.addText(zip, "long_running_traces.txt", getTracesAsJson());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public interface Element {
}

private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayingPendingTraceBuffer.class);
private static final long FORCE_SEND_DELAY_MS = TimeUnit.SECONDS.toMillis(5);
private static final long SEND_DELAY_NS = TimeUnit.MILLISECONDS.toNanos(500);
private static final long SLEEP_TIME_MS = 100;
Expand All @@ -64,13 +65,31 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
private final TimeSource timeSource;
private final HealthMetrics healthMetrics;

private volatile boolean closed = false;
private final AtomicInteger flushCounter = new AtomicInteger(0);
private final AtomicInteger dumpCounter = new AtomicInteger(0);

private final LongRunningTracesTracker runningTracesTracker;

private void dumpTraces() {
if (worker.isAlive()) {
int count = dumpCounter.get();
int loop = 1;
boolean signaled = queue.offer(DUMP_ELEMENT);
while (!closed && !signaled) {
yieldOrSleep(loop++);
signaled = queue.offer(DUMP_ELEMENT);
}
int newCount = dumpCounter.get();
while (!closed && count >= newCount) {
yieldOrSleep(loop++);
newCount = dumpCounter.get();
}
}
}

public boolean longRunningSpansEnabled() {
return runningTracesTracker != null;
}
Expand All @@ -85,6 +104,7 @@ public void enqueue(Element pendingTrace) {
if (!pendingTrace.writeOnBufferFull()) {
return;
}
healthMetrics.onPendingWriteAround();
pendingTrace.write();
}
}
Expand Down Expand Up @@ -136,6 +156,18 @@ public void flush() {
}
}

private String getDumpJson() {
try (TraceDumpJsonExporter writer = new TraceDumpJsonExporter()) {
for (Element e : DumpDrain.DUMP_DRAIN.collectTraces()) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(trace.getSpans());
}
}
return writer.getDumpJson();
}
}

private static final class WriteDrain implements MessagePassingQueue.Consumer<Element> {
private static final WriteDrain WRITE_DRAIN = new WriteDrain();

Expand Down Expand Up @@ -286,6 +318,17 @@ public void run() {
}
}

@Override
public String getTracesAsJson() throws IOException {
dumpTraces();
String json = getDumpJson();
if (json.isEmpty()) {
return "[]";
} else {
return json;
}
}

public DelayingPendingTraceBuffer(
int bufferSize,
TimeSource timeSource,
Expand All @@ -295,6 +338,7 @@ public DelayingPendingTraceBuffer(
this.queue = new MpscBlockingConsumerArrayQueue<>(bufferSize);
this.worker = newAgentThread(TRACE_MONITOR, new Worker());
this.timeSource = timeSource;
this.healthMetrics = healthMetrics;
boolean runningSpansEnabled = config.isLongRunningTraceEnabled();
this.runningTracesTracker =
runningSpansEnabled
Expand All @@ -321,6 +365,11 @@ public void enqueue(Element pendingTrace) {
log.debug(
"PendingTrace enqueued but won't be reported. Root span: {}", pendingTrace.getRootSpan());
}

@Override
public String getTracesAsJson() {
return "[]";
}
}

public static PendingTraceBuffer delaying(
Expand All @@ -345,6 +394,8 @@ public static PendingTraceBuffer discarding() {

public abstract void enqueue(Element pendingTrace);

public abstract String getTracesAsJson() throws IOException;

private static class TracerDump implements TracerFlare.Reporter {
private final DelayingPendingTraceBuffer buffer;

Expand All @@ -354,32 +405,12 @@ private TracerDump(DelayingPendingTraceBuffer buffer) {

@Override
public void prepareForFlare() {
if (buffer.worker.isAlive()) {
int count = buffer.dumpCounter.get();
int loop = 1;
boolean signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
while (!buffer.closed && !signaled) {
buffer.yieldOrSleep(loop++);
signaled = buffer.queue.offer(DelayingPendingTraceBuffer.DUMP_ELEMENT);
}
int newCount = buffer.dumpCounter.get();
while (!buffer.closed && count >= newCount) {
buffer.yieldOrSleep(loop++);
newCount = buffer.dumpCounter.get();
}
}
buffer.dumpTraces();
}

@Override
public void addReportToFlare(ZipOutputStream zip) throws IOException {
TraceDumpJsonExporter writer = new TraceDumpJsonExporter(zip);
for (Element e : DelayingPendingTraceBuffer.DumpDrain.DUMP_DRAIN.collectTraces()) {
if (e instanceof PendingTrace) {
PendingTrace trace = (PendingTrace) e;
writer.write(trace.getSpans());
}
}
writer.flush();
TracerFlare.addText(zip, "pending_traces.txt", buffer.getDumpJson());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ public void onSend(
public void onFailedSend(
final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {}

public void onLongRunningUpdate(final int dropped, final int write, final int expired) {}
public void onPendingWriteAround() {}

public void onLongRunningUpdate(
final int dropped, final int write, final int expired, final int droppedSampling) {}

/**
* Report that a trace has been used to compute client stats.
Expand Down
Loading