diff --git a/README.md b/README.md index 64158a9..410a47c 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,19 @@ # OpenTracing Reactor Instrumentation -OpenTracing instrumentation for Reactor. This instrumentation library is based on [spring-cloud-sleuth's reactor instrumentation](https://github.com/spring-cloud/spring-cloud-sleuth/tree/master/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor). +OpenTracing instrumentation for Reactor. +Basically a copy of what was done in https://github.com/rsocket/rsocket-rpc-java/tree/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/tracing -## OpenTracing Agents -When using a runtime agent like [java-specialagent](https://github.com/opentracing-contrib/java-specialagent) `TracedSubscriber`s will be automatically added using `Hook.onEachOperator` and `Hooks.onLastOperator`. +Allows wrapping any Mono or Flux in a span that starts on subscription and ends on complete, cancel or error signal. +Reference to an active span is stored in subscription context during subscribe() invocation and the next span upstream +will use the current one as a parent regardless of what thread its subscribe() method is called on. +Thus, this library is compatible with other integrations like io.opentracing.contrib.spring.web.client.TracingWebClientBeanPostProcessor +which look for the enclosing span in Context at subscribe time. +When no span is found in Context, falls back to Tracer.activeSpan(). -Refer to the agent documentation for how to include this library as an instrumentation plugin. - -## Non-Agent Configuration -When not using any of the OpenTracing Agents the `Hooks` must be added directly: +Usage example (see TracedSubscriberTest for more): ```java -Hooks.onEachOperator(TracedSubscriber.asOperator(tracer)); -Hooks.onLastOperator(TracedSubscriber.asOperator(tracer)); + myFlux.transform(Tracing.trace(tracer, "span name", "span kind", myDecorator)) + +``` -... -``` \ No newline at end of file +Supports pluggable decorator that can augment span with tags, logs, etc. upon creation as well as upon traced publisher's termination. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9e9a622..d293d80 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,8 @@ 0.8.2 4.12 2.22.1 - 3.2.3.RELEASE + 3.4.1 + 0.10.3 @@ -99,6 +100,12 @@ ${version.reactor-core} provided + + io.vavr + vavr + ${version.vavr} + provided + net.bytebuddy byte-buddy @@ -129,6 +136,26 @@ ${version.awaitility} test + + org.assertj + assertj-core + 3.19.0 + test + + + org.slf4j + slf4j-api + 1.7.30 + test + + + org.slf4j + slf4j-simple + 1.7.30 + test + + + diff --git a/src/main/java/io/opentracing/contrib/reactor/SpanDecorator.java b/src/main/java/io/opentracing/contrib/reactor/SpanDecorator.java new file mode 100644 index 0000000..2d4e483 --- /dev/null +++ b/src/main/java/io/opentracing/contrib/reactor/SpanDecorator.java @@ -0,0 +1,22 @@ +package io.opentracing.contrib.reactor; + +import io.opentracing.Span; +import io.opentracing.Tracer.SpanBuilder; +import io.vavr.control.Try; +import reactor.core.publisher.SignalType; +import reactor.util.context.Context; + +public interface SpanDecorator { + /** + * You can decorate a new span with tags or other bells and whistles based on some values + * passed from downstream via Context, for example + * It's a good idea to return the same SpanBuilder as was passed to this callback + */ + SpanBuilder onCreate(Context ctx, SpanBuilder builder); + + /** + * Similar callback for adding more data to Span upon termination of the Publisher + * Also a good idea to return the same Span that callback was passed as argument + */ + Span onFinish(Try result, Span span); +} diff --git a/src/main/java/io/opentracing/contrib/reactor/TracedSubscriber.java b/src/main/java/io/opentracing/contrib/reactor/TracedSubscriber.java deleted file mode 100644 index 4b7df5f..0000000 --- a/src/main/java/io/opentracing/contrib/reactor/TracedSubscriber.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2018 The OpenTracing Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package io.opentracing.contrib.reactor; - -import java.util.function.BiFunction; -import java.util.function.Function; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.Tracer; -import reactor.core.CoreSubscriber; -import reactor.core.Fuseable; -import reactor.core.publisher.Operators; -import reactor.util.context.Context; - -/** - * Based on Spring Sleuth's Reactor instrumentation. - * A trace representation of the {@link Subscriber} - * - * @author Jose Montoya - */ -public class TracedSubscriber implements SpanSubscription { - private final Span span; - private final Subscriber subscriber; - private final Context context; - private final Tracer tracer; - private Subscription subscription; - - public TracedSubscriber(Subscriber subscriber, Context ctx, Tracer tracer) { - this.subscriber = subscriber; - this.tracer = tracer; - this.span = ctx != null ? - ctx.getOrDefault(Span.class, this.tracer.activeSpan()) : null; - - this.context = ctx != null && this.span != null ? - ctx.put(Span.class, this.span) : ctx != null ? - ctx : Context.empty(); - } - - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - withActiveSpan(() -> subscriber.onSubscribe(this)); - } - - @Override - public void request(long n) { - withActiveSpan(() -> subscription.request(n)); - } - - @Override - public void onNext(T o) { - withActiveSpan(() -> subscriber.onNext(o)); - } - - @Override - public void cancel() { - withActiveSpan(() -> subscription.cancel()); - } - - @Override - public void onError(Throwable throwable) { - withActiveSpan(() -> subscriber.onError(throwable)); - } - - @Override - public void onComplete() { - withActiveSpan(() -> subscriber.onComplete()); - } - - @Override - public Context currentContext() { - return context; - } - - private void withActiveSpan(Runnable runnable) { - if (span != null) - try (Scope inScope = tracer.scopeManager().activate(span)) { - runnable.run(); - } - else - runnable.run(); - } - - - /** - * Based on Spring Sleuth's Reactor instrumentation. - *

- * Return a span operator pointcut given a {@link Tracer}. This can be used in reactor - * via {@link reactor.core.publisher.Flux#transform(Function)}, {@link - * reactor.core.publisher.Mono#transform(Function)}, {@link - * reactor.core.publisher.Hooks#onEachOperator(Function)} or {@link - * reactor.core.publisher.Hooks#onLastOperator(Function)}. The Span operator - * pointcut will pass the Scope of the Span without ever creating any new spans. - * - * @param an arbitrary type that is left unchanged by the span operator - * @return a new span operator pointcut - */ - public static Function, ? extends Publisher> asOperator(Tracer tracer) { - return Operators.liftPublisher(new BiFunction, CoreSubscriber>() { - @Override - public CoreSubscriber apply(Publisher publisher, CoreSubscriber sub) { - // if Flux/Mono #just, #empty, #error - if (publisher instanceof Fuseable.ScalarCallable) { - return sub; - } - - return new TracedSubscriber<>(sub, sub.currentContext(), tracer); - } - }); - } -} diff --git a/src/main/java/io/opentracing/contrib/reactor/Tracing.java b/src/main/java/io/opentracing/contrib/reactor/Tracing.java new file mode 100644 index 0000000..8a5cc67 --- /dev/null +++ b/src/main/java/io/opentracing/contrib/reactor/Tracing.java @@ -0,0 +1,26 @@ +package io.opentracing.contrib.reactor; + +import io.opentracing.Tracer; +import io.opentracing.tag.Tags; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Operators; + +import java.util.function.Function; + +/** + * @see https://github.com/rsocket/rsocket-rpc-java/tree/master/rsocket-rpc-core/src/main/java/io/rsocket/rpc/tracing + */ +public class Tracing { + public static Function, ? extends Publisher> trace( + Tracer tracer, String spanName, SpanDecorator decorator + ) { + return trace(tracer, spanName, Tags.SPAN_KIND_CLIENT, decorator); + } + + public static Function, ? extends Publisher> trace( + Tracer tracer, String spanName, String spanKind, SpanDecorator decorator + ) { + return Operators.lift((scannable, subscriber) -> + new TracingSubscriber(subscriber, tracer, spanName, spanKind, decorator)); + } +} diff --git a/src/main/java/io/opentracing/contrib/reactor/TracingSubscriber.java b/src/main/java/io/opentracing/contrib/reactor/TracingSubscriber.java new file mode 100644 index 0000000..0efe1d3 --- /dev/null +++ b/src/main/java/io/opentracing/contrib/reactor/TracingSubscriber.java @@ -0,0 +1,87 @@ +package io.opentracing.contrib.reactor; + +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.Tracer.SpanBuilder; +import io.opentracing.tag.Tags; +import io.vavr.control.Try; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.SignalType; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class TracingSubscriber implements SpanSubscription { + private final CoreSubscriber actual; + private final Span span; + private final Context context; + private final SpanDecorator decorator; + private final AtomicBoolean finished; + + private volatile Subscription subscription; + + public TracingSubscriber(CoreSubscriber actual, Tracer tracer, String spanName, String spanKind, + @Nullable SpanDecorator decorator) { + this.actual = actual; + this.decorator = decorator; + this.finished = new AtomicBoolean(false); + Context context = actual.currentContext(); + + Span parent = context.getOrEmpty(Span.class).orElseGet(tracer::activeSpan); + + SpanBuilder spanBuilder = tracer.buildSpan(spanName) + .asChildOf(parent) + .withTag(Tags.SPAN_KIND.getKey(), spanKind); + + this.span = (decorator == null ? spanBuilder : decorator.onCreate(context, spanBuilder)).start(); + this.context = context.put(Span.class, span); + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + + actual.onSubscribe(this); + } + + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void onNext(T o) { + actual.onNext(o); + } + + @Override + public void cancel() { + finishSpan(Try.success(SignalType.CANCEL)); + subscription.cancel(); + } + + @Override + public void onError(Throwable t) { + finishSpan(Try.failure(t)); + actual.onError(t); + } + + @Override + public void onComplete() { + finishSpan(Try.success(SignalType.ON_COMPLETE)); + actual.onComplete(); + } + + @Override + public Context currentContext() { + return context; + } + + private void finishSpan(Try result) { + if (finished.compareAndSet(false, true)) { + (decorator == null ? span : decorator.onFinish(result, span)).finish(); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/opentracing/contrib/reactor/package-info.java b/src/main/java/io/opentracing/contrib/reactor/package-info.java new file mode 100644 index 0000000..00f7ce1 --- /dev/null +++ b/src/main/java/io/opentracing/contrib/reactor/package-info.java @@ -0,0 +1,4 @@ +@NonNullApi +package io.opentracing.contrib.reactor; + +import reactor.util.annotation.NonNullApi; diff --git a/src/test/java/io/opentracing/contrib/reactor/TracedSubscriberTest.java b/src/test/java/io/opentracing/contrib/reactor/TracedSubscriberTest.java index ad43be7..d4ae9f7 100644 --- a/src/test/java/io/opentracing/contrib/reactor/TracedSubscriberTest.java +++ b/src/test/java/io/opentracing/contrib/reactor/TracedSubscriberTest.java @@ -13,290 +13,201 @@ */ package io.opentracing.contrib.reactor; -import static org.junit.Assert.*; -import static reactor.core.scheduler.Schedulers.elastic; - -import java.util.concurrent.atomic.AtomicReference; - -import org.awaitility.Awaitility; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscription; - import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.Tracer.SpanBuilder; import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.util.ThreadLocalScopeManager; -import reactor.core.publisher.BaseSubscriber; +import io.vavr.Tuple; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Stream; +import io.vavr.control.Try; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -/** - * Based on Spring Sleuth's Reactor instrumentation - * - * @author Jose Montoya - */ -public class TracedSubscriberTest { - protected static final MockTracer tracer = new MockTracer(new ThreadLocalScopeManager()); - - @BeforeClass - public static void beforeClass() { - Hooks.onEachOperator(TracedSubscriber.asOperator(tracer)); - Hooks.onLastOperator(TracedSubscriber.asOperator(tracer)); - } - - @Test - public void should_pass_tracing_info_when_using_reactor() { - MockSpan span = tracer.buildSpan("foo").start(); - final AtomicReference spanInOperation = new AtomicReference<>(); - Publisher traced = Flux.just(1, 2, 3); - - try (Scope scope = tracer.scopeManager().activate(span)) { - Flux.from(traced) - .map(d -> d + 1) - .map(d -> d + 1) - .map((d) -> { - spanInOperation.set((MockSpan) tracer.activeSpan()); - return d + 1; - }) - .map(d -> d + 1) - .subscribe(System.out::println); - } finally { - span.finish(); - } - - assertNull(tracer.activeSpan()); - assertEquals(spanInOperation.get().context().traceId(), span.context().traceId()); - } - - @Test - public void should_support_reactor_fusion_optimization() { - MockSpan span = tracer.buildSpan("foo").start(); - final AtomicReference spanInOperation = new AtomicReference<>(); - - try (Scope scope = tracer.scopeManager().activate(span)) { - Mono.just(1) - .flatMap(d -> Flux.just(d + 1) - .collectList() - .map(p -> p.get(0))) - .map(d -> d + 1) - .map((d) -> { - spanInOperation.set((MockSpan) tracer.activeSpan()); - return d + 1; - }) - .map(d -> d + 1) - .subscribe(System.out::println); - } finally { - span.finish(); - } - - assertNull(tracer.activeSpan()); - assertEquals(spanInOperation.get().context().traceId(), span.context().traceId()); - } - - @Test - public void should_not_trace_scalar_flows() { - MockSpan span = tracer.buildSpan("foo").start(); - final AtomicReference spanInOperation = new AtomicReference<>(); - - try (Scope scope = tracer.scopeManager().activate(span)) { - Mono.just(1).subscribe(new BaseSubscriber() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - spanInOperation.set(subscription); - } - }); - - assertNotNull(tracer.activeSpan()); - assertFalse(spanInOperation.get() instanceof TracedSubscriber); - - Mono.error(new Exception()) - .subscribe(new BaseSubscriber() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - spanInOperation.set(subscription); - } - - @Override - protected void hookOnError(Throwable throwable) { - } - }); - - assertNotNull(tracer.activeSpan()); - assertFalse(spanInOperation.get() instanceof TracedSubscriber); - - Mono.empty() - .subscribe(new BaseSubscriber() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - spanInOperation.set(subscription); - } - }); - - assertNotNull(tracer.activeSpan()); - assertFalse(spanInOperation.get() instanceof TracedSubscriber); - } finally { - span.finish(); - } - - assertNull(tracer.activeSpan()); - } - - @Test - public void should_pass_tracing_info_when_using_reactor_async() { - MockSpan span = tracer.buildSpan("foo").start(); - final AtomicReference spanInOperation = new AtomicReference<>(); - - try (Scope scope = tracer.scopeManager().activate(span)) { - Flux.just(1, 2, 3).publishOn(Schedulers.single()).log("reactor.1") - .map(d -> d + 1).map(d -> d + 1).publishOn(Schedulers.newSingle("secondThread")).log("reactor.2") - .map((d) -> { - spanInOperation.set((MockSpan) tracer.activeSpan()); - return d + 1; - }).map(d -> d + 1).blockLast(); - - Awaitility.await().untilAsserted(() -> { - assertEquals(spanInOperation.get().context().traceId(), span.context().traceId()); - }); - - assertEquals(tracer.activeSpan(), span); - } finally { - span.finish(); - } - - assertNull(tracer.activeSpan()); - MockSpan foo2 = tracer.buildSpan("foo").start(); - - try (Scope ws = tracer.scopeManager().activate(foo2)) { - Flux.just(1, 2, 3).publishOn(Schedulers.single()).log("reactor.").map(d -> d + 1).map(d -> d + 1).map((d) -> { - spanInOperation.set((MockSpan) tracer.activeSpan()); - return d + 1; - }).map(d -> d + 1).blockLast(); - - assertEquals(tracer.activeSpan(), foo2); - // parent cause there's an async span in the meantime - assertEquals(spanInOperation.get().context().traceId(), foo2.context().traceId()); - } finally { - foo2.finish(); - } - - assertNull(tracer.activeSpan()); - } - - @Test - public void checkSequenceOfOperations() { - MockSpan parentSpan = tracer.buildSpan("foo").start(); - - try (Scope scope = tracer.scopeManager().activate(parentSpan)) { - final Long traceId = Mono.fromCallable(tracer::activeSpan) - .map(span -> ((MockSpan) span).context().traceId()) - .block(); - assertNotNull(traceId); - - final Long secondTraceId = Mono.fromCallable(tracer::activeSpan) - .map(span -> ((MockSpan) span).context().traceId()) - .block(); - assertEquals(secondTraceId, traceId); // different trace ids here - } finally { - parentSpan.finish(); - } - } - - @Test - public void checkTraceIdDuringZipOperation() { - MockSpan initSpan = tracer.buildSpan("foo").start(); - final AtomicReference spanInOperation = new AtomicReference<>(); - final AtomicReference spanInZipOperation = new AtomicReference<>(); - - try (Scope ws = tracer.scopeManager().activate(initSpan)) { - Mono.fromCallable(tracer::activeSpan) - .map(span -> ((MockSpan) span).context().traceId()) - .doOnNext(spanInOperation::set) - .zipWith( - Mono.fromCallable(tracer::activeSpan) - .map(span -> ((MockSpan) span).context().traceId()) - .doOnNext(spanInZipOperation::set)) - .block(); - } finally { - initSpan.finish(); - } - - assertEquals((long) spanInZipOperation.get(), initSpan.context().traceId()); // ok here - assertEquals((long) spanInOperation.get(), initSpan.context().traceId()); // Expecting to have value: <1L> but did not. - } - - // #646 - @Test - public void should_work_for_mono_just_with_flat_map() { - MockSpan initSpan = tracer.buildSpan("foo").start(); - - try (Scope ws = tracer.scopeManager().activate(initSpan)) { - Mono.just("value1") - .flatMap(request -> Mono.just("value2") - .then(Mono.just("foo"))) - .map(a -> "qwe") - .block(); - } finally { - initSpan.finish(); - } - } - - // #1030 - @Test - public void checkTraceIdFromSubscriberContext() { - MockSpan initSpan = tracer.buildSpan("foo").start(); - final AtomicReference spanInSubscriberContext = new AtomicReference<>(); - - try (Scope ws = tracer.scopeManager().activate(initSpan)) { - Mono.subscriberContext() - .map(context -> ((MockSpan) tracer.activeSpan()).context().spanId()) - .doOnNext(spanInSubscriberContext::set).block(); - } finally { - initSpan.finish(); - } - - assertEquals((long) spanInSubscriberContext.get(), initSpan.context().spanId()); // ok here - } - - @Test - public void activeSpanShouldBeAccessibleInOnCompleteCallback() { - MockSpan initSpan = tracer.buildSpan("foo").start(); - - try (Scope ws = tracer.scopeManager().activate(initSpan)) { - Flux.range(1, 5) - .flatMap(i -> Mono.fromCallable(() -> i * 2).subscribeOn(elastic())) - .doOnComplete(() -> assertNotNull(tracer.activeSpan())) - .then() - .block(); - } finally { - initSpan.finish(); - } - } - - @Test - public void activeSpanShouldBeAccessibleInOnErrorCallback() { - MockSpan initSpan = tracer.buildSpan("foo").start(); - - try (Scope ws = tracer.scopeManager().activate(initSpan)) { - Mono.error(RuntimeException::new) - .subscribeOn(elastic()) - .doOnError(e -> assertNotNull(tracer.activeSpan())) - .onErrorResume(RuntimeException.class, e -> Mono.just("fallback")) - .block(); - } finally { - initSpan.finish(); - } - } +import reactor.core.publisher.SignalType; +import reactor.util.Logger; +import reactor.util.Loggers; +import reactor.util.context.Context; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.logging.Level; - @AfterClass - public static void cleanup() { - Hooks.resetOnLastOperator(); - Hooks.resetOnEachOperator(); - Schedulers.resetFactory(); - } +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.Assert.assertNull; +import static reactor.core.publisher.SignalType.ON_COMPLETE; +import static reactor.core.publisher.SignalType.ON_SUBSCRIBE; +import static reactor.core.scheduler.Schedulers.boundedElastic; +public class TracedSubscriberTest { + protected static final MockTracer tracer = new MockTracer(new ThreadLocalScopeManager()); + + static final Logger log = Loggers.getLogger(TracedSubscriberTest.class.getSimpleName()); + + + static Function, ? extends Publisher> traceWithDefaultDecorator( + Tracer tracer, String spanName + ) { + return Tracing.trace(tracer, spanName, DECORATOR_INSTANCE); + } + + static Mono logCtxVars(String prefix, Mono source) { + return source.transformDeferredContextual( + (m, ctx) -> m.log(prefix + "[" + Stream.ofAll(ctx.stream()).toMap(Tuple::fromEntry) + "]", + Level.INFO, ON_SUBSCRIBE, ON_COMPLETE)); + } + + + private static class DefaultSpanDecorator implements SpanDecorator { + protected static final String RESULT_KEY = "result"; + + @Override + public SpanBuilder onCreate(Context ctx, SpanBuilder builder) { + return builder.withTag("someTag", ctx.getOrDefault("someTag", "tag")); + } + + @Override + public Span onFinish(Try result, Span span) { + log.info("Finishing span: {} with {}", span, result); + + return span.log(HashMap.of(RESULT_KEY, result.fold(Throwable::toString, SignalType::toString)) + .toJavaMap()); + } + } + + private static final DefaultSpanDecorator DECORATOR_INSTANCE = new DefaultSpanDecorator(); + + + @Test + public void should_pass_tracing_info_when_using_reactor() { + MockSpan span = tracer.buildSpan("foo").start(); + final AtomicReference nested1 = new AtomicReference<>(); + final AtomicReference> nested2 = new AtomicReference<>(List.of()); + final AtomicReference nested3 = new AtomicReference<>(); + + // pointless to ask ScopeManager about active span most of the time because onNext signals + // are emitted on different threads from the one subscribe() was called on (and span was started) + try (Scope scope = tracer.scopeManager().activate(span)) { + Flux.just(1, 2, 3) + .transformDeferredContextual((f, ctx) -> { + nested3.set(ctx.getOrDefault(Span.class, null)); + return f; + }) + .delayElements(Duration.ofMillis(50)) + .transform(TracedSubscriberTest.traceWithDefaultDecorator(tracer, "nested3")) + .log("source") + .map(d -> + Mono.just(d + 1) + .as(m -> logCtxVars("nested2", m)) + .transformDeferredContextual((m, ctx) -> { + nested2.getAndUpdate( + list -> list.append(ctx.getOrDefault(Span.class, null))); + return m; + }) + .transform(TracedSubscriberTest.traceWithDefaultDecorator(tracer, "nested2")) + .contextWrite(ctx -> ctx.put("someTag", "tag" + d)) + .subscribeOn(boundedElastic())) + .concatMap(m -> m) + .map(d -> d + 1) + .collectList() + .transformDeferredContextual((m, ctx) -> { + nested1.set(ctx.getOrDefault(Span.class, null)); + return m; + }) + .transform(TracedSubscriberTest.>traceWithDefaultDecorator(tracer, "nested1")) + .log("result") + .block(Duration.ofSeconds(1)); + // in order for the spans created by Tracing.trace to be children of the outermost one that we + // create explicitly, outermost subscription must happen on the same thread that it was activated on + } finally { + span.finish(); + } + + assertNull(tracer.activeSpan()); + + assertThat(nested1.get()) + .isNotNull() + .satisfies(s -> { + assertThat(s.operationName()).isEqualTo("nested1"); + assertThat(s.parentId()).isEqualTo(span.context().spanId()); + assertThat(s.context().traceId()).isEqualTo(span.context().traceId()); + assertThat(s.tags().get("someTag")).isEqualTo("tag"); + assertThat(Stream.ofAll(s.logEntries()) + .map(le -> le.fields().get(DefaultSpanDecorator.RESULT_KEY)) + .filter(Objects::nonNull) + .asJava()) + .asList() + .hasSize(1) + .first() + .asInstanceOf(InstanceOfAssertFactories.type(String.class)) + .isEqualTo(ON_COMPLETE.toString()); + }); + + assertThat(nested2.get()) + .isNotNull() + .extracting(List::asJava) + .asList() + .hasSize(3) + .allSatisfy(s -> + assertThat(s) + .isNotNull() + .asInstanceOf(InstanceOfAssertFactories.type(MockSpan.class)) + .extracting(MockSpan::parentId) + .isEqualTo(nested1.get().context().spanId())); + + assertThat(nested2.get().map(s -> s.tags().get("someTag"))) + .extracting(List::asJava) + .asList() + .containsExactly("tag1", "tag2", "tag3"); + + assertThat(nested3.get()) + .isNotNull() + .extracting(MockSpan::parentId) + .isEqualTo(nested1.get().context().spanId()); + } + + @Test + public void should_work_with_errors_too() { + final AtomicReference spanRef = new AtomicReference<>(); + + Throwable t = Mono + .error(new RuntimeException("surprise!")) + .transformDeferredContextual((f, ctx) -> { + spanRef.set(ctx.getOrDefault(Span.class, null)); + return f; + }) + .delaySubscription(Duration.ofMillis(20)) + .transform(TracedSubscriberTest.traceWithDefaultDecorator(tracer, "trace")) + .as(m -> Try.of(m::block).failed().get()); + + assertThat(spanRef.get()) + .isNotNull() + .satisfies(s -> { + long durationMicros = s.finishMicros() - s.startMicros(); + log.info("'{}' span duration: {} mcs", s.operationName(), durationMicros); + assertThat(durationMicros) + .isGreaterThanOrEqualTo(TimeUnit.MILLISECONDS.toMicros(20)); + }) + .extracting(MockSpan::logEntries) + .satisfies(list -> + assertThat(Stream.ofAll(list) + .map(le -> le.fields().get(DefaultSpanDecorator.RESULT_KEY)) + .filter(Objects::nonNull) + .asJava()) + .asList() + .hasSize(1) + .first() + .asInstanceOf(InstanceOfAssertFactories.type(String.class)) + .isEqualTo(t.toString()) + ); + } }