Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# OpenTracing Reactor Instrumentation
OpenTracing instrumentation for Reactor. This instrumentation library is based on [spring-cloud-sleuth's reactor instrumentation](https://github.com/spring-cloud/spring-cloud-sleuth/tree/master/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor).
OpenTracing instrumentation for Reactor.
Basically a copy of what was done in https://github.com/rsocket/rsocket-rpc-java/tree/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/tracing

## OpenTracing Agents
When using a runtime agent like [java-specialagent](https://github.com/opentracing-contrib/java-specialagent) `TracedSubscriber`s will be automatically added using `Hook.onEachOperator` and `Hooks.onLastOperator`.
Allows wrapping any Mono or Flux in a span that starts on subscription and ends on complete, cancel or error signal.
Reference to an active span is stored in subscription context during subscribe() invocation and the next span upstream
will use the current one as a parent regardless of what thread its subscribe() method is called on.
Thus, this library is compatible with other integrations like io.opentracing.contrib.spring.web.client.TracingWebClientBeanPostProcessor
which look for the enclosing span in Context at subscribe time.
When no span is found in Context, falls back to Tracer.activeSpan().

Refer to the agent documentation for how to include this library as an instrumentation plugin.

## Non-Agent Configuration
When not using any of the OpenTracing Agents the `Hooks` must be added directly:
Usage example (see TracedSubscriberTest for more):

```java
Hooks.onEachOperator(TracedSubscriber.asOperator(tracer));
Hooks.onLastOperator(TracedSubscriber.asOperator(tracer));
myFlux.transform(Tracing.trace(tracer, "span name", "span kind", myDecorator))

```

...
```
Supports pluggable decorator that can augment span with tags, logs, etc. upon creation as well as upon traced publisher's termination.
29 changes: 28 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@
<version.jacoco-maven-plugin>0.8.2</version.jacoco-maven-plugin>
<version.junit>4.12</version.junit>
<version.maven-surefire-plugin>2.22.1</version.maven-surefire-plugin>
<version.reactor-core>3.2.3.RELEASE</version.reactor-core>
<version.reactor-core>3.4.1</version.reactor-core>
<version.vavr>0.10.3</version.vavr>
</properties>

<dependencies>
Expand All @@ -99,6 +100,12 @@
<version>${version.reactor-core}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<version>${version.vavr}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
Expand Down Expand Up @@ -129,6 +136,26 @@
<version>${version.awaitility}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.19.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/io/opentracing/contrib/reactor/SpanDecorator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.opentracing.contrib.reactor;

import io.opentracing.Span;
import io.opentracing.Tracer.SpanBuilder;
import io.vavr.control.Try;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;

public interface SpanDecorator {
/**
* You can decorate a new span with tags or other bells and whistles based on some values
* passed from downstream via Context, for example
* It's a good idea to return the same SpanBuilder as was passed to this callback
*/
SpanBuilder onCreate(Context ctx, SpanBuilder builder);

/**
* Similar callback for adding more data to Span upon termination of the Publisher
* Also a good idea to return the same Span that callback was passed as argument
*/
Span onFinish(Try<SignalType> result, Span span);
}
128 changes: 0 additions & 128 deletions src/main/java/io/opentracing/contrib/reactor/TracedSubscriber.java

This file was deleted.

26 changes: 26 additions & 0 deletions src/main/java/io/opentracing/contrib/reactor/Tracing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.opentracing.contrib.reactor;

import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Operators;

import java.util.function.Function;

/**
* @see https://github.com/rsocket/rsocket-rpc-java/tree/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/tracing
*/
public class Tracing {
public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> trace(
Tracer tracer, String spanName, SpanDecorator decorator
) {
return trace(tracer, spanName, Tags.SPAN_KIND_CLIENT, decorator);
}

public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> trace(
Tracer tracer, String spanName, String spanKind, SpanDecorator decorator
) {
return Operators.lift((scannable, subscriber) ->
new TracingSubscriber<T>(subscriber, tracer, spanName, spanKind, decorator));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.opentracing.contrib.reactor;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import io.opentracing.tag.Tags;
import io.vavr.control.Try;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.SignalType;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

import java.util.concurrent.atomic.AtomicBoolean;

public class TracingSubscriber<T> implements SpanSubscription<T> {
private final CoreSubscriber<? super T> actual;
private final Span span;
private final Context context;
private final SpanDecorator decorator;
private final AtomicBoolean finished;

private volatile Subscription subscription;

public TracingSubscriber(CoreSubscriber<? super T> actual, Tracer tracer, String spanName, String spanKind,
@Nullable SpanDecorator decorator) {
this.actual = actual;
this.decorator = decorator;
this.finished = new AtomicBoolean(false);
Context context = actual.currentContext();

Span parent = context.<Span>getOrEmpty(Span.class).orElseGet(tracer::activeSpan);

SpanBuilder spanBuilder = tracer.buildSpan(spanName)
.asChildOf(parent)
.withTag(Tags.SPAN_KIND.getKey(), spanKind);

this.span = (decorator == null ? spanBuilder : decorator.onCreate(context, spanBuilder)).start();
this.context = context.put(Span.class, span);
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;

actual.onSubscribe(this);
}

@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void onNext(T o) {
actual.onNext(o);
}

@Override
public void cancel() {
finishSpan(Try.success(SignalType.CANCEL));
subscription.cancel();
}

@Override
public void onError(Throwable t) {
finishSpan(Try.failure(t));
actual.onError(t);
}

@Override
public void onComplete() {
finishSpan(Try.success(SignalType.ON_COMPLETE));
actual.onComplete();
}

@Override
public Context currentContext() {
return context;
}

private void finishSpan(Try<SignalType> result) {
if (finished.compareAndSet(false, true)) {
(decorator == null ? span : decorator.onFinish(result, span)).finish();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
@NonNullApi
package io.opentracing.contrib.reactor;

import reactor.util.annotation.NonNullApi;
Loading