diff --git a/conventions/src/main/kotlin/otel.java-conventions.gradle.kts b/conventions/src/main/kotlin/otel.java-conventions.gradle.kts index e21d4acebb8f..1bfed3575e24 100644 --- a/conventions/src/main/kotlin/otel.java-conventions.gradle.kts +++ b/conventions/src/main/kotlin/otel.java-conventions.gradle.kts @@ -75,10 +75,10 @@ tasks.withType().configureEach { "-Xlint:-serial" ) ) - if (System.getProperty("dev") != "true") { - // Fail build on any warning - compilerArgs.add("-Werror") - } +// if (System.getProperty("dev") != "true") { +// // Fail build on any warning +// compilerArgs.add("-Werror") +// } } encoding = "UTF-8" diff --git a/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ContextPropagatingRunnable.java b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ContextPropagatingRunnable.java index 99d3bd54ddf2..5d295d8a559b 100644 --- a/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ContextPropagatingRunnable.java +++ b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/ContextPropagatingRunnable.java @@ -9,20 +9,11 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.internal.ContextPropagationDebug; -public final class ContextPropagatingRunnable implements Runnable { - - public static boolean shouldDecorateRunnable(Runnable task) { - // We wrap only lambdas' anonymous classes and if given object has not already been wrapped. - // Anonymous classes have '/' in class name which is not allowed in 'normal' classes. - // note: it is always safe to decorate lambdas since downstream code cannot be expecting a - // specific runnable implementation anyways - return task.getClass().getName().contains("/") && !(task instanceof ContextPropagatingRunnable); - } +public final class ContextPropagatingRunnable implements Runnable, WrappedRunnable { public static Runnable propagateContext(Runnable task, Context context) { return new ContextPropagatingRunnable(task, context); } - private final Runnable delegate; private final Context context; @@ -38,7 +29,8 @@ public void run() { } } - public Runnable unwrap() { + @Override + public Runnable getDelegate() { return delegate; } } diff --git a/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/PendingTaskMetrics.java b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/PendingTaskMetrics.java new file mode 100644 index 000000000000..7ff113f23001 --- /dev/null +++ b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/PendingTaskMetrics.java @@ -0,0 +1,22 @@ +package io.opentelemetry.javaagent.bootstrap.executors; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.metrics.DoubleHistogram; + +// this should be removed?? +class PendingTaskMetrics { + private static final DoubleHistogram queueWaitHistogram = + GlobalOpenTelemetry + .getMeterProvider() + .get("io.opentelemetry.executor.queue.wait") + .histogramBuilder("executor.queue.wait") + .setUnit("s") + .setDescription("Time spent waiting in executor queue") + .build(); + + public static void recordTime(Long startTime) { + queueWaitHistogram.record(System.nanoTime() - startTime); + + } + +} diff --git a/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/PendingTaskRunnable.java b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/PendingTaskRunnable.java new file mode 100644 index 000000000000..de97b3d0e6de --- /dev/null +++ b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/PendingTaskRunnable.java @@ -0,0 +1,32 @@ +package io.opentelemetry.javaagent.bootstrap.executors; + +import java.time.Duration; + +// needed for lambdas +public class PendingTaskRunnable implements Runnable, WrappedRunnable { + + public static Runnable measurePendingTime(Runnable task) { + return new PendingTaskRunnable(task); + } + + private final Long startObservation; + private final Runnable delegate; + + public PendingTaskRunnable(Runnable delegate) { + this.delegate = delegate; + this.startObservation = System.nanoTime(); + } + + @Override + public void run() { + Duration queueWaitDuration = Duration.ofNanos(System.nanoTime() - startObservation); + System.out.println("This class name: " + this.getClass().getName()); +// PendingTaskMetrics.recordTime(startObservation); + delegate.run(); + } + + @Override + public Runnable getDelegate() { + return delegate; + } +} diff --git a/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/WrappedRunnable.java b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/WrappedRunnable.java new file mode 100644 index 000000000000..16e7b0d2b1ad --- /dev/null +++ b/instrumentation/executors/bootstrap/src/main/java/io/opentelemetry/javaagent/bootstrap/executors/WrappedRunnable.java @@ -0,0 +1,31 @@ +package io.opentelemetry.javaagent.bootstrap.executors; + +import io.opentelemetry.context.Context; + +public interface WrappedRunnable { + + static boolean shouldDecorateRunnable(Runnable task) { + // We wrap only lambdas' anonymous classes and if given object has not already been wrapped. + // Anonymous classes have '/' in class name which is not allowed in 'normal' classes. + // note: it is always safe to decorate lambdas since downstream code cannot be expecting a + // specific runnable implementation anyways + return task.getClass().getName().contains("/") && !(task instanceof WrappedRunnable); + } + + static Runnable decorate(Runnable task, Context context) { + return PendingTaskRunnable.measurePendingTime( + ContextPropagatingRunnable.propagateContext(task, context) + ); + } + + Runnable getDelegate(); + + default Runnable unwrap() { + Runnable delegate = getDelegate(); + + return delegate instanceof WrappedRunnable + ? ((WrappedRunnable) delegate).unwrap() + : delegate; + } + +} diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/CallableInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/CallableInstrumentation.java index b511730da80d..3be3900f8258 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/CallableInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/CallableInstrumentation.java @@ -42,6 +42,13 @@ public static class CallableAdvice { public static Scope enter(@Advice.This Callable task) { VirtualField, PropagatedContext> virtualField = VirtualField.find(Callable.class, PropagatedContext.class); + + Long startTime = VirtualField.find(Callable.class, Long.class).get(task); + if (startTime != null) { + PendingTaskMetrics.recordTime(startTime); + System.out.println("CallableAdvice: Got time :)"); + } + return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, task); } diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorMatchers.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorMatchers.java index 717f088e4fcd..90455781cc00 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorMatchers.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorMatchers.java @@ -24,9 +24,9 @@ import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -final class ExecutorMatchers { +final class BBBBBExecutorMatchers { - private static final Logger logger = Logger.getLogger(ExecutorMatchers.class.getName()); + private static final Logger logger = Logger.getLogger(BBBBBExecutorMatchers.class.getName()); /** * Only apply executor instrumentation to allowed executors. To apply to all executors, use @@ -145,5 +145,5 @@ static ElementMatcher isThreadPoolExecutor() { return extendsClass(named(ThreadPoolExecutor.class.getName())); } - private ExecutorMatchers() {} + private BBBBBExecutorMatchers() {} } diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java index 3d9276fe0eaf..49f4bdafdefe 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaExecutorInstrumentation.java @@ -19,9 +19,9 @@ import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.javaagent.bootstrap.CallDepth; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; -import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingRunnable; import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper; import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext; +import io.opentelemetry.javaagent.bootstrap.executors.WrappedRunnable; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.Collection; @@ -101,8 +101,11 @@ public static PropagatedContext enterJobSubmit( if (!ExecutorAdviceHelper.shouldPropagateContext(context, task)) { return null; } - if (ContextPropagatingRunnable.shouldDecorateRunnable(task)) { - task = ContextPropagatingRunnable.propagateContext(task, context); + + VirtualField.find(Runnable.class, Long.class).set(task, System.nanoTime()); + + if (WrappedRunnable.shouldDecorateRunnable(task)) { + task = WrappedRunnable.decorate(task, context); return null; } VirtualField virtualField = @@ -130,6 +133,9 @@ public static class SetJavaForkJoinStateAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterJobSubmit(@Advice.Argument(0) ForkJoinTask task) { + + VirtualField.find(ForkJoinTask.class, Long.class).set(task, System.nanoTime()); + Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField, PropagatedContext> virtualField = @@ -162,6 +168,8 @@ public static PropagatedContext enterJobSubmit( if (callDepth.getAndIncrement() > 0) { return null; } + VirtualField.find(Runnable.class, Long.class).set(task, System.nanoTime()); + Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField virtualField = @@ -204,6 +212,8 @@ public static PropagatedContext enterJobSubmit( if (callDepth.getAndIncrement() > 0) { return null; } + VirtualField.find(Callable.class, Long.class).set(task, System.nanoTime()); + Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField, PropagatedContext> virtualField = @@ -253,6 +263,8 @@ public static Collection submitEnter( Context context = Java8BytecodeBridge.currentContext(); for (Callable task : tasks) { + VirtualField.find(Callable.class, Long.class).set(task, System.nanoTime()); + if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField, PropagatedContext> virtualField = VirtualField.find(Callable.class, PropagatedContext.class); diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaForkJoinTaskInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaForkJoinTaskInstrumentation.java index f62c321eaea4..5ca92b70c71e 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaForkJoinTaskInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaForkJoinTaskInstrumentation.java @@ -106,6 +106,12 @@ public static class ForkAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static PropagatedContext enterFork(@Advice.This ForkJoinTask task) { + Long startTime = VirtualField.find(ForkJoinTask.class, Long.class).get(task); + if (startTime != null) { + PendingTaskMetrics.recordTime(startTime); + System.out.println("ForkAdvice: Got time :)"); + } + Context context = Java8BytecodeBridge.currentContext(); if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) { VirtualField, PropagatedContext> virtualField = diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/PendingTaskMetrics.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/PendingTaskMetrics.java new file mode 100644 index 000000000000..b80403b1a365 --- /dev/null +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/PendingTaskMetrics.java @@ -0,0 +1,24 @@ +package io.opentelemetry.javaagent.instrumentation.executors; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.metrics.DoubleHistogram; + +// not sure about this +class PendingTaskMetrics { + private static final DoubleHistogram queueWaitHistogram = + GlobalOpenTelemetry + .getMeterProvider() + .get("io.opentelemetry.executor.queue.wait") + .histogramBuilder("executor.queue.wait") + .setUnit("s") + .setDescription("Time spent waiting in executor queue") + .build(); + + public static void recordTime(Long startTime) { + if (startTime != null) { + queueWaitHistogram.record(System.nanoTime() - startTime); + } + + } + +} diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/RunnableInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/RunnableInstrumentation.java index 54d06b657c0a..fc81f1bfb4ce 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/RunnableInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/RunnableInstrumentation.java @@ -14,6 +14,7 @@ import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext; import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper; +import io.opentelemetry.javaagent.bootstrap.executors.WrappedRunnable; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; @@ -39,6 +40,19 @@ public static class RunnableAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope enter(@Advice.This Runnable thiz) { + + System.out.println("Execute runnableAdvice dla: " + thiz.getClass().getName()); + + if (thiz instanceof WrappedRunnable) { + System.out.println("Don't have wrapped"); + } + + Long startTime = VirtualField.find(Runnable.class, Long.class).get(thiz); + if (startTime != null) { + PendingTaskMetrics.recordTime(startTime); + System.out.println("RunnableAdvice: Got time :)"); + } + VirtualField virtualField = VirtualField.find(Runnable.class, PropagatedContext.class); return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, thiz); diff --git a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ThreadPoolExtendingExecutorInstrumentation.java b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ThreadPoolExtendingExecutorInstrumentation.java index febd9fdfbcb8..950076ef910e 100644 --- a/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ThreadPoolExtendingExecutorInstrumentation.java +++ b/instrumentation/executors/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/ThreadPoolExtendingExecutorInstrumentation.java @@ -10,7 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import io.opentelemetry.javaagent.bootstrap.executors.ContextPropagatingRunnable; +import io.opentelemetry.javaagent.bootstrap.executors.WrappedRunnable; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; @@ -39,8 +39,9 @@ public static class BeforeExecuteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter(@Advice.Argument(value = 1, readOnly = false) Runnable runnable) { - if (runnable instanceof ContextPropagatingRunnable) { - runnable = ((ContextPropagatingRunnable) runnable).unwrap(); + if (runnable instanceof WrappedRunnable) { + System.out.println("Before: I'm present in BeforeExecuteAdvice"); + runnable = ((WrappedRunnable) runnable).unwrap(); } } } @@ -50,8 +51,9 @@ public static class AfterExecuteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Runnable runnable) { - if (runnable instanceof ContextPropagatingRunnable) { - runnable = ((ContextPropagatingRunnable) runnable).unwrap(); + if (runnable instanceof WrappedRunnable) { + System.out.println("After: I'm present in AfterExecuteAdvice"); + runnable = ((WrappedRunnable) runnable).unwrap(); } } } diff --git a/instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorInstrumentationQueueWaitTest.java b/instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorInstrumentationQueueWaitTest.java new file mode 100644 index 000000000000..5884d1b6ff17 --- /dev/null +++ b/instrumentation/executors/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/ExecutorInstrumentationQueueWaitTest.java @@ -0,0 +1,235 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.executors; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +// copy-paste from ExecutorInstrumentationTest +abstract class ExecutorInstrumentationQueueWaitTest + extends AbstractExecutorServiceQueueWaitTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + ExecutorInstrumentationQueueWaitTest(T executor) { + super(executor, testing); + } + + @Override + protected JavaAsyncQueueWaitChild newTask(Long sleepForMillisSeconds) { + return new JavaAsyncQueueWaitChild(sleepForMillisSeconds); + } + + static class ThreadPoolExecutorTest extends ExecutorInstrumentationQueueWaitTest { + ThreadPoolExecutorTest() { + super(new ThreadPoolExecutor(5, 5, 10_000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<>(20))); + } + } + + static class WorkStealingPoolTest extends ExecutorInstrumentationQueueWaitTest { + public WorkStealingPoolTest() { + super(Executors.newWorkStealingPool(2)); + } + } + + static class ScheduledThreadPoolExecutorTest + extends ExecutorInstrumentationQueueWaitTest { + ScheduledThreadPoolExecutorTest() { + super(new ScheduledThreadPoolExecutor(1)); + } + + @Test + void scheduleRunnable() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor().schedule((Runnable) task, 10, TimeUnit.MICROSECONDS)); + } + + @Test + void scheduleCallable() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor().schedule((Callable) task, 10, TimeUnit.MICROSECONDS)); + } + + @Test + void scheduleLambdaRunnable() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor().schedule(() -> task.run(), 10, TimeUnit.MICROSECONDS)); + } + + @Test + void scheduleLambdaCallable() { + executeAndMeasureQueueWaitForTwoTasks( + task -> + executor() + .schedule( + () -> { + task.run(); + return null; + }, + 10, + TimeUnit.MICROSECONDS)); + } + + @Test + void scheduleRunnableAndCancel() { + executeAndCancelTasks( + task -> executor().schedule((Runnable) task, 10, TimeUnit.MICROSECONDS)); + } + + @Test + void scheduleCallableAndCancel() { + executeAndCancelTasks( + task -> executor().schedule((Callable) task, 10, TimeUnit.MICROSECONDS)); + } + } + + static class ForkJoinPoolTest extends ExecutorInstrumentationQueueWaitTest { + ForkJoinPoolTest() { + super(new ForkJoinPool(20)); + } + + @Test + void invokeForkJoinTask() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor().invoke((ForkJoinTask) task)); + } + + @Test + void submitForkJoinTask() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor().submit((ForkJoinTask) task)); + } + } + + // // CustomThreadPoolExecutor would normally be disabled except enabled by system property. + static class CustomThreadPoolExecutorTest + extends ExecutorInstrumentationQueueWaitTest { + CustomThreadPoolExecutorTest() { + super(new CustomThreadPoolExecutor()); + } + } + + @SuppressWarnings("RedundantOverride") + private static class CustomThreadPoolExecutor extends AbstractExecutorService { + + private volatile boolean running = true; + + private final BlockingQueue workQueue = new ArrayBlockingQueue<>(20); + + private final Thread workerThread = + new Thread( + () -> { + try { + while (running) { + Runnable runnable = workQueue.take(); + runnable.run(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } catch (Throwable t) { + throw new AssertionError(t); + } + }, + "ExecutorTestThread"); + + private CustomThreadPoolExecutor() { + workerThread.start(); + } + + @Override + public void shutdown() { + running = false; + workerThread.interrupt(); + } + + @Override + public List shutdownNow() { + running = false; + workerThread.interrupt(); + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return !running; + } + + @Override + public boolean isTerminated() { + return workerThread.isAlive(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + workerThread.join(unit.toMillis(timeout)); + return true; + } + + @Override + public Future submit(Callable task) { + RunnableFuture future = newTaskFor(task); + execute(future); + return future; + } + + @Override + public Future submit(Runnable task, T result) { + RunnableFuture future = newTaskFor(task, result); + execute(future); + return future; + } + + @Override + public Future submit(Runnable task) { + RunnableFuture future = newTaskFor(task, null); + execute(future); + return future; + } + + @Override + public List> invokeAll(Collection> tasks) { + return Collections.singletonList(submit(tasks.stream().findFirst().get())); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit) { + return Collections.singletonList(submit(tasks.stream().findFirst().get())); + } + + @Override + public T invokeAny(Collection> tasks) { + submit(tasks.stream().findFirst().get()); + return null; + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) { + submit(tasks.stream().findFirst().get()); + return null; + } + + @Override + public void execute(Runnable command) { + workQueue.add(command); + } + } +} diff --git a/instrumentation/executors/jdk21-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/VirtualThreadExecutorQueueWaitTest.java b/instrumentation/executors/jdk21-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/VirtualThreadExecutorQueueWaitTest.java new file mode 100644 index 000000000000..1c0e38d06c3e --- /dev/null +++ b/instrumentation/executors/jdk21-testing/src/test/java/io/opentelemetry/javaagent/instrumentation/executors/VirtualThreadExecutorQueueWaitTest.java @@ -0,0 +1,29 @@ +///* +// * Copyright The OpenTelemetry Authors +// * SPDX-License-Identifier: Apache-2.0 +// */ +// +//package io.opentelemetry.javaagent.instrumentation.executors; +// +//import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +//import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.Executors; +//import org.junit.jupiter.api.extension.RegisterExtension; +// +//// TODO: fix this +//class VirtualThreadExecutorQueueWaitTest +// extends AbstractExecutorServiceQueueWaitTest { +// +// @RegisterExtension +// static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); +// +// VirtualThreadExecutorQueueWaitTest() { +// super(Executors.newVirtualThreadPerTaskExecutor(), testing); +// } +// +// @Override +// protected JavaAsyncQueueWaitChild newTask(Long startXD) { +// return new JavaAsyncQueueWaitChild(startXD); +// } +//} diff --git a/instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/AbstractExecutorServiceQueueWaitTest.java b/instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/AbstractExecutorServiceQueueWaitTest.java new file mode 100644 index 000000000000..37fe3542f506 --- /dev/null +++ b/instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/AbstractExecutorServiceQueueWaitTest.java @@ -0,0 +1,198 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.executors; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.function.ThrowingConsumer; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractExecutorServiceQueueWaitTest { + + private final T executor; + private final InstrumentationExtension testing; + + protected AbstractExecutorServiceQueueWaitTest(T executor, InstrumentationExtension testing) { + this.executor = executor; + this.testing = testing; + } + + protected abstract U newTask(Long sleepForMillisSeconds); + + protected T executor() { + return executor; + } + + @AfterAll + void shutdown() throws InterruptedException { + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + @Test + void executeRunnable() { + executeAndMeasureQueueWaitForTwoTasks(executor::execute); + } + + @Test + void submitRunnable() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.submit((Runnable) task)); + } + + @Test + void submitCallable() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.submit((Callable) task)); + } + + @Test + void invokeAll() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.invokeAll(Collections.singleton(task))); + } + + @Test + void invokeAllWithTimeout() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.invokeAll(Collections.singleton(task), 10, TimeUnit.SECONDS)); + } + + @Test + void invokeAny() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.invokeAny(Collections.singleton(task))); + } + + @Test + void invokeAnyWithTimeout() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.invokeAny(Collections.singleton(task), 10, TimeUnit.SECONDS)); + } + + // ContextPropagationRunnable is only for lambdas + @Test + void executeLambdaRunnable() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.execute(() -> task.run())); + } + + @Test + void submitLambdaRunnable() { + executeAndMeasureQueueWaitForTwoTasks(task -> executor.submit(() -> task.run())); + } + + @Test + void submitLambdaCallable() { + executeAndMeasureQueueWaitForTwoTasks( + task -> + executor.submit( + () -> { + task.run(); + return null; + })); + } + + @Test + void submitRunnableAndCancel() { + executeAndCancelTasks(task -> executor.submit((Runnable) task)); + } + + @Test + void submitCallableAndCancel() { + executeAndCancelTasks(task -> executor.submit((Callable) task)); + } + + protected final void executeAndMeasureQueueWaitForTwoTasks(ThrowingConsumer task) { + testing.runWithSpan( + "parent", + () -> { + // this child will have a span + U child1 = newTask(1L); + // this child won't + U child2 = newTask(1L); + U child3 = newTask(1L); + U child4 = newTask(1L); + U child5 = newTask(1L); + try { + task.accept(child1); + task.accept(child2); + task.accept(child3); + task.accept(child4); + task.accept(child5); + } catch (Throwable t) { + throw new AssertionError(t); + } + child1.waitForCompletion(); + child2.waitForCompletion(); + child3.waitForCompletion(); + child4.waitForCompletion(); + child5.waitForCompletion(); + }); + + testing.waitAndAssertMetrics( + "io.opentelemetry.executor.queue.wait", + "executor.queue.wait", + metrics -> + metrics.anySatisfy( + metric -> + assertThat(metric) + .hasDescription("Time spent waiting in executor queue") + .hasUnit("s") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> point.hasSumGreaterThan(0.0) + ) + ) + ) + ); +// testing.waitAndAssertTraces( +// trace -> +// trace.hasSpansSatisfyingExactly( +// span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), +// span -> +// span.hasName("asyncChild") +// .hasKind(SpanKind.INTERNAL) +// .hasParent(trace.getSpan(0)))); + } + + protected final void executeAndCancelTasks(Function> task) { + List children = new ArrayList<>(); + List> jobFutures = new ArrayList<>(); + + testing.runWithSpan( + "parent", + () -> { + for (int i = 0; i < 20; i++) { + // Our current instrumentation instrumentation does not behave very well + // if we try to reuse Callable/Runnable. Namely we would be getting 'orphaned' + // child traces sometimes since state can contain only one parent span - and + // we do not really have a good way for attributing work to correct parent span + // if we reuse Callable/Runnable. + // Solution for now is to never reuse a Callable/Runnable. +// U child = newTask(false, true); + U child = newTask(0L); + children.add(child); + Future f = task.apply(child); + jobFutures.add(f); + } + + jobFutures.forEach(f -> f.cancel(false)); + children.forEach(U::unblock); + }); + + // Just check there is a single trace, this test is primarily to make sure that scopes aren't + // leaked on cancellation. + testing.waitAndAssertTraces(trace -> {}); + } +} diff --git a/instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaAsyncQueueWaitChild.java b/instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaAsyncQueueWaitChild.java new file mode 100644 index 000000000000..0d1cd7e5815d --- /dev/null +++ b/instrumentation/executors/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/executors/JavaAsyncQueueWaitChild.java @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.executors; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; + +import java.util.concurrent.ForkJoinTask; + +@SuppressWarnings("serial") +final class JavaAsyncQueueWaitChild extends ForkJoinTask implements TestTask { + private static final Tracer tracer = GlobalOpenTelemetry.getTracer("test"); + + private final Long sleepForMillisSeconds; +// private final boolean doTraceableWork; + + JavaAsyncQueueWaitChild(Long sleepForMillisSeconds) { +// this.doTraceableWork = doTraceableWork; + this.sleepForMillisSeconds = sleepForMillisSeconds; + } + + @Override + public Object getRawResult() { + return null; + } + + @Override + protected void setRawResult(Object value) {} + + @Override + protected boolean exec() { + runImpl(); + return true; + } + +// test task method + @Override + public void unblock() { +// blockThread.set(false); + } + + @Override + public void run() { + runImpl(); + } + + @Override + public Object call() { + runImpl(); + return null; + } + + // test task method + @Override + public void waitForCompletion() { +// try { +// latch.await(); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// throw new AssertionError(e); +// } + } + + private void runImpl() { + try { + Thread.sleep(sleepForMillisSeconds); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + // while (blockThread.get()) { + // busy-wait to block thread +// } +// if (doTraceableWork) { +// asyncChild(); +// } +// latch.countDown(); + } + + private static void asyncChild() { + tracer.spanBuilder("asyncChild").startSpan().end(); + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java index 1cbec8c9c0fd..b5a4f70aac95 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java @@ -32,6 +32,7 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$WrapRunnableAdvice"); } + // what this code is doing?? @SuppressWarnings("unused") public static class WrapRunnableAdvice { diff --git a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java index eec43d3a290e..cbdc19146838 100644 --- a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java +++ b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/junit/http/AbstractHttpClientTest.java @@ -723,6 +723,7 @@ void httpsRequest() throws Exception { }); } + // here are metrics for http client @Test void httpClientMetrics() throws Exception { assumeTrue(options.getHasSendRequest());