Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ tasks.withType<JavaCompile>().configureEach {
"-Xlint:-serial"
)
)
if (System.getProperty("dev") != "true") {
// Fail build on any warning
compilerArgs.add("-Werror")
}
// if (System.getProperty("dev") != "true") {
// // Fail build on any warning
// compilerArgs.add("-Werror")
// }
}

encoding = "UTF-8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,11 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.internal.ContextPropagationDebug;

public final class ContextPropagatingRunnable implements Runnable {

public static boolean shouldDecorateRunnable(Runnable task) {
// We wrap only lambdas' anonymous classes and if given object has not already been wrapped.
// Anonymous classes have '/' in class name which is not allowed in 'normal' classes.
// note: it is always safe to decorate lambdas since downstream code cannot be expecting a
// specific runnable implementation anyways
return task.getClass().getName().contains("/") && !(task instanceof ContextPropagatingRunnable);
}
public final class ContextPropagatingRunnable implements Runnable, WrappedRunnable {

public static Runnable propagateContext(Runnable task, Context context) {
return new ContextPropagatingRunnable(task, context);
}

private final Runnable delegate;
private final Context context;

Expand All @@ -38,7 +29,8 @@ public void run() {
}
}

public Runnable unwrap() {
@Override
public Runnable getDelegate() {
return delegate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.opentelemetry.javaagent.bootstrap.executors;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.DoubleHistogram;

// this should be removed??
class PendingTaskMetrics {
private static final DoubleHistogram queueWaitHistogram =
GlobalOpenTelemetry
.getMeterProvider()
.get("io.opentelemetry.executor.queue.wait")
.histogramBuilder("executor.queue.wait")
.setUnit("s")
.setDescription("Time spent waiting in executor queue")
.build();

public static void recordTime(Long startTime) {
queueWaitHistogram.record(System.nanoTime() - startTime);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.opentelemetry.javaagent.bootstrap.executors;

import java.time.Duration;

// needed for lambdas
public class PendingTaskRunnable implements Runnable, WrappedRunnable {

public static Runnable measurePendingTime(Runnable task) {
return new PendingTaskRunnable(task);
}

private final Long startObservation;
private final Runnable delegate;

public PendingTaskRunnable(Runnable delegate) {
this.delegate = delegate;
this.startObservation = System.nanoTime();
}

@Override
public void run() {
Duration queueWaitDuration = Duration.ofNanos(System.nanoTime() - startObservation);
System.out.println("This class name: " + this.getClass().getName());
// PendingTaskMetrics.recordTime(startObservation);
delegate.run();
}

@Override
public Runnable getDelegate() {
return delegate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.opentelemetry.javaagent.bootstrap.executors;

import io.opentelemetry.context.Context;

public interface WrappedRunnable {

static boolean shouldDecorateRunnable(Runnable task) {
// We wrap only lambdas' anonymous classes and if given object has not already been wrapped.
// Anonymous classes have '/' in class name which is not allowed in 'normal' classes.
// note: it is always safe to decorate lambdas since downstream code cannot be expecting a
// specific runnable implementation anyways
return task.getClass().getName().contains("/") && !(task instanceof WrappedRunnable);
}

static Runnable decorate(Runnable task, Context context) {
return PendingTaskRunnable.measurePendingTime(
ContextPropagatingRunnable.propagateContext(task, context)
);
}

Runnable getDelegate();

default Runnable unwrap() {
Runnable delegate = getDelegate();

return delegate instanceof WrappedRunnable
? ((WrappedRunnable) delegate).unwrap()
: delegate;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public static class CallableAdvice {
public static Scope enter(@Advice.This Callable<?> task) {
VirtualField<Callable<?>, PropagatedContext> virtualField =
VirtualField.find(Callable.class, PropagatedContext.class);

Long startTime = VirtualField.find(Callable.class, Long.class).get(task);
if (startTime != null) {
PendingTaskMetrics.recordTime(startTime);
System.out.println("CallableAdvice: Got time :)");
}

return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

final class ExecutorMatchers {
final class BBBBBExecutorMatchers {

private static final Logger logger = Logger.getLogger(ExecutorMatchers.class.getName());
private static final Logger logger = Logger.getLogger(BBBBBExecutorMatchers.class.getName());

/**
* Only apply executor instrumentation to allowed executors. To apply to all executors, use
Expand Down Expand Up @@ -145,5 +145,5 @@ static ElementMatcher<TypeDescription> isThreadPoolExecutor() {
return extendsClass(named(ThreadPoolExecutor.class.getName()));
}

private ExecutorMatchers() {}
private BBBBBExecutorMatchers() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.CallDepth;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingRunnable;
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.bootstrap.executors.WrappedRunnable;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Collection;
Expand Down Expand Up @@ -101,8 +101,11 @@ public static PropagatedContext enterJobSubmit(
if (!ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
return null;
}
if (ContextPropagatingRunnable.shouldDecorateRunnable(task)) {
task = ContextPropagatingRunnable.propagateContext(task, context);

VirtualField.find(Runnable.class, Long.class).set(task, System.nanoTime());

if (WrappedRunnable.shouldDecorateRunnable(task)) {
task = WrappedRunnable.decorate(task, context);
return null;
}
VirtualField<Runnable, PropagatedContext> virtualField =
Expand Down Expand Up @@ -130,6 +133,9 @@ public static class SetJavaForkJoinStateAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static PropagatedContext enterJobSubmit(@Advice.Argument(0) ForkJoinTask<?> task) {

VirtualField.find(ForkJoinTask.class, Long.class).set(task, System.nanoTime());

Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<ForkJoinTask<?>, PropagatedContext> virtualField =
Expand Down Expand Up @@ -162,6 +168,8 @@ public static PropagatedContext enterJobSubmit(
if (callDepth.getAndIncrement() > 0) {
return null;
}
VirtualField.find(Runnable.class, Long.class).set(task, System.nanoTime());

Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<Runnable, PropagatedContext> virtualField =
Expand Down Expand Up @@ -204,6 +212,8 @@ public static PropagatedContext enterJobSubmit(
if (callDepth.getAndIncrement() > 0) {
return null;
}
VirtualField.find(Callable.class, Long.class).set(task, System.nanoTime());

Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<Callable<?>, PropagatedContext> virtualField =
Expand Down Expand Up @@ -253,6 +263,8 @@ public static Collection<?> submitEnter(

Context context = Java8BytecodeBridge.currentContext();
for (Callable<?> task : tasks) {
VirtualField.find(Callable.class, Long.class).set(task, System.nanoTime());

if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<Callable<?>, PropagatedContext> virtualField =
VirtualField.find(Callable.class, PropagatedContext.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public static class ForkAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static PropagatedContext enterFork(@Advice.This ForkJoinTask<?> task) {
Long startTime = VirtualField.find(ForkJoinTask.class, Long.class).get(task);
if (startTime != null) {
PendingTaskMetrics.recordTime(startTime);
System.out.println("ForkAdvice: Got time :)");
}

Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<ForkJoinTask<?>, PropagatedContext> virtualField =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.opentelemetry.javaagent.instrumentation.executors;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.DoubleHistogram;

// not sure about this
class PendingTaskMetrics {
private static final DoubleHistogram queueWaitHistogram =
GlobalOpenTelemetry
.getMeterProvider()
.get("io.opentelemetry.executor.queue.wait")
.histogramBuilder("executor.queue.wait")
.setUnit("s")
.setDescription("Time spent waiting in executor queue")
.build();

public static void recordTime(Long startTime) {
if (startTime != null) {
queueWaitHistogram.record(System.nanoTime() - startTime);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
import io.opentelemetry.javaagent.bootstrap.executors.WrappedRunnable;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
Expand All @@ -39,6 +40,19 @@ public static class RunnableAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope enter(@Advice.This Runnable thiz) {

System.out.println("Execute runnableAdvice dla: " + thiz.getClass().getName());

if (thiz instanceof WrappedRunnable) {
System.out.println("Don't have wrapped");
}

Long startTime = VirtualField.find(Runnable.class, Long.class).get(thiz);
if (startTime != null) {
PendingTaskMetrics.recordTime(startTime);
System.out.println("RunnableAdvice: Got time :)");
}

VirtualField<Runnable, PropagatedContext> virtualField =
VirtualField.find(Runnable.class, PropagatedContext.class);
return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, thiz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingRunnable;
import io.opentelemetry.javaagent.bootstrap.executors.WrappedRunnable;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -39,8 +39,9 @@ public static class BeforeExecuteAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
if (runnable instanceof ContextPropagatingRunnable) {
runnable = ((ContextPropagatingRunnable) runnable).unwrap();
if (runnable instanceof WrappedRunnable) {
System.out.println("Before: I'm present in BeforeExecuteAdvice");
runnable = ((WrappedRunnable) runnable).unwrap();
}
}
}
Expand All @@ -50,8 +51,9 @@ public static class AfterExecuteAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Runnable runnable) {
if (runnable instanceof ContextPropagatingRunnable) {
runnable = ((ContextPropagatingRunnable) runnable).unwrap();
if (runnable instanceof WrappedRunnable) {
System.out.println("After: I'm present in AfterExecuteAdvice");
runnable = ((WrappedRunnable) runnable).unwrap();
}
}
}
Expand Down
Loading
Loading