Skip to content

Commit

Permalink
compiles with opentelemetry API
Browse files Browse the repository at this point in the history
  • Loading branch information
stoty committed Oct 26, 2023
1 parent ba32f31 commit 165bb42
Show file tree
Hide file tree
Showing 26 changed files with 1,204 additions and 1,053 deletions.
16 changes: 12 additions & 4 deletions phoenix-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,6 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -569,6 +565,18 @@
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>

<!-- Other test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.impl.MilliSpan;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.DelegateConnection;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.trace.util.Tracing.Frequency;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.After;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.*;
import org.apache.htrace.impl.ProbabilitySampler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
import org.apache.phoenix.jdbc.PhoenixConnection;
Expand Down
4 changes: 2 additions & 2 deletions phoenix-core/src/main/antlr3/PhoenixSQL.g
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ import org.apache.phoenix.schema.types.PUnsignedTime;
import org.apache.phoenix.schema.types.PUnsignedTimestamp;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.parse.LikeParseNode.LikeType;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.trace.TraceUtil;
import org.apache.phoenix.parse.AddJarsStatement;
import org.apache.phoenix.parse.ExplainType;
}
Expand Down Expand Up @@ -656,7 +656,7 @@ alter_index_node returns [AlterIndexStatement ret]
// Parse a trace statement.
trace_node returns [TraceStatement ret]
: TRACE ((flag = ON ( WITH SAMPLING s = sampling_rate)?) | flag = OFF)
{ret = factory.trace(Tracing.isTraceOn(flag.getText()), s == null ? Tracing.isTraceOn(flag.getText()) ? 1.0 : 0.0 : (((BigDecimal)s.getValue())).doubleValue());}
{ret = factory.trace(TraceUtil.isTraceOn(flag.getText()), s == null ? TraceUtil.isTraceOn(flag.getText()) ? 1.0 : 0.0 : (((BigDecimal)s.getValue())).doubleValue());}
;

// Parse a create function statement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.htrace.Sampler;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.compile.ExplainPlanAttributes
.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
Expand Down Expand Up @@ -127,11 +124,11 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw

@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
final PhoenixConnection conn = stmt.getConnection();
if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {
// final PhoenixConnection conn = stmt.getConnection();
// if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {
return ResultIterator.EMPTY_ITERATOR;
}
return new TraceQueryResultIterator(conn);
// }
// return new TraceQueryResultIterator(conn);
}

@Override
Expand Down Expand Up @@ -258,52 +255,53 @@ public void close() throws SQLException {

@Override
public Tuple next() throws SQLException {
if(!first) return null;
TraceScope traceScope = conn.getTraceScope();
if (traceStatement.isTraceOn()) {
conn.setSampler(Tracing.getConfiguredSampler(traceStatement));
if (conn.getSampler() == Sampler.NEVER) {
closeTraceScope(conn);
}
if (traceScope == null && !conn.getSampler().equals(Sampler.NEVER)) {
traceScope = Tracing.startNewSpan(conn, "Enabling trace");
if (traceScope.getSpan() != null) {
conn.setTraceScope(traceScope);
} else {
closeTraceScope(conn);
}
}
} else {
closeTraceScope(conn);
conn.setSampler(Sampler.NEVER);
}
if (traceScope == null || traceScope.getSpan() == null) return null;
first = false;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ParseNodeFactory factory = new ParseNodeFactory();
LiteralParseNode literal =
factory.literal(traceScope.getSpan().getTraceId());
LiteralExpression expression =
LiteralExpression.newConstant(literal.getValue(), PLong.INSTANCE,
Determinism.ALWAYS);
expression.evaluate(null, ptr);
byte[] rowKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
Cell cell =
PhoenixKeyValueUtil
.newKeyValue(rowKey, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY,
EnvironmentEdgeManager.currentTimeMillis(),
HConstants.EMPTY_BYTE_ARRAY);
List<Cell> cells = new ArrayList<Cell>(1);
cells.add(cell);
return new ResultTuple(Result.create(cells));
return null;
// if(!first) return null;
// TraceScope traceScope = conn.getTraceScope();
// if (traceStatement.isTraceOn()) {
// conn.setSampler(Tracing.getConfiguredSampler(traceStatement));
// if (conn.getSampler() == Sampler.NEVER) {
// closeTraceScope(conn);
// }
// if (traceScope == null && !conn.getSampler().equals(Sampler.NEVER)) {
// traceScope = Tracing.startNewSpan(conn, "Enabling trace");
// if (traceScope.getSpan() != null) {
// conn.setTraceScope(traceScope);
// } else {
// closeTraceScope(conn);
// }
// }
// } else {
// closeTraceScope(conn);
// conn.setSampler(Sampler.NEVER);
// }
// if (traceScope == null || traceScope.getSpan() == null) return null;
// first = false;
// ImmutableBytesWritable ptr = new ImmutableBytesWritable();
// ParseNodeFactory factory = new ParseNodeFactory();
// LiteralParseNode literal =
// factory.literal(traceScope.getSpan().getTraceId());
// LiteralExpression expression =
// LiteralExpression.newConstant(literal.getValue(), PLong.INSTANCE,
// Determinism.ALWAYS);
// expression.evaluate(null, ptr);
// byte[] rowKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
// Cell cell =
// PhoenixKeyValueUtil
// .newKeyValue(rowKey, HConstants.EMPTY_BYTE_ARRAY,
// HConstants.EMPTY_BYTE_ARRAY,
// EnvironmentEdgeManager.currentTimeMillis(),
// HConstants.EMPTY_BYTE_ARRAY);
// List<Cell> cells = new ArrayList<Cell>(1);
// cells.add(cell);
// return new ResultTuple(Result.create(cells));
}

private void closeTraceScope(final PhoenixConnection conn) {
if(conn.getTraceScope()!=null) {
conn.getTraceScope().close();
conn.setTraceScope(null);
}
// if(conn.getTraceScope()!=null) {
// conn.getTraceScope().close();
// conn.setTraceScope(null);
// }
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.filter.PagingFilter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
Expand All @@ -58,9 +56,14 @@
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.types.PUnsignedTinyint;
import org.apache.phoenix.trace.TraceUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;

import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter;

abstract public class BaseScannerRegionObserver implements RegionObserver {
Expand Down Expand Up @@ -300,8 +303,10 @@ private void overrideDelegate() throws IOException {
// and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596
// TraceScope can't be used here because closing the scope will end up calling
// currentSpan.stop() and that should happen only when we are closing the scanner.
final Span savedSpan = Trace.currentSpan();
final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan();
//FIXME I don't think the above is true for OpenTelemetry.
//Just use the standard pattern, and see if it works.
Span span = TraceUtil.createSpan(SCANNER_OPENED_TRACE_INFO);
Scope scope = span.makeCurrent();
try {
RegionScanner scanner = doPostScannerOpen(c, scan, delegate);
scanner = new DelegateRegionScanner(scanner) {
Expand All @@ -312,25 +317,20 @@ public void close() throws IOException {
try {
delegate.close();
} finally {
if (child != null) {
child.stop();
}
span.end();
}
}
};
this.delegate = scanner;
wasOverriden = true;
success = true;
} catch (Throwable t) {
span.setStatus(StatusCode.ERROR);
span.recordException(t);
ServerUtil.throwIOException(c.getEnvironment().getRegionInfo().getRegionNameAsString(), t);
} finally {
try {
if (!success && child != null) {
child.stop();
}
} finally {
Trace.continueSpan(savedSpan);
}
scope.close();
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
Expand Down Expand Up @@ -79,6 +78,7 @@
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.trace.TraceUtil;
import org.apache.phoenix.trace.TracingIterator;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
Expand All @@ -89,6 +89,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.opentelemetry.api.trace.Span;



/**
Expand Down Expand Up @@ -363,13 +365,10 @@ public final ResultIterator iterator(final Map<ImmutableBytesPtr,ServerCache> ca
"Iterator for table " + context.getCurrentTable().getTable().getName() + " ready: " + iterator, connection));
}

// wrap the iterator so we start/end tracing as we expect
if (Tracing.isTracing()) {
TraceScope scope = Tracing.startNewSpan(context.getConnection(),
"Creating basic query for " + getPlanSteps(iterator));
if (scope.getSpan() != null) return new TracingIterator(scope, iterator);
}
return iterator;
//FIXME where to set the root span attributes ?
//FIXME add the plan as an attribute, to avoid cost when not recording
Span span = TraceUtil.createSpan("Creating basic query for " + getPlanSteps(iterator));
return new TracingIterator(span, iterator);
}

private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) throws SQLException {
Expand Down
Loading

0 comments on commit 165bb42

Please sign in to comment.