Skip to content

Commit

Permalink
WIP Custom sampler works
Browse files Browse the repository at this point in the history
  • Loading branch information
stoty committed Nov 20, 2023
1 parent 12b9664 commit bc2cb80
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 256 deletions.
2 changes: 1 addition & 1 deletion bin/phoenix_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import phoenix_utils

phoenix_utils.setPath()
phoenix_utils.set_tracing()
phoenix_utils.set_sandbox_tracing()

base_dir = os.path.join(phoenix_utils.current_dir, '..')
phoenix_target_dir = os.path.join(base_dir, 'phoenix-core', 'target')
Expand Down
37 changes: 30 additions & 7 deletions bin/phoenix_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,14 @@ def setPath():
LOGGING_JAR_PATTERN3 = "log4j-1.2-api*.jar"
SQLLINE_WITH_DEPS_PATTERN = "sqlline-*-jar-with-dependencies.jar"
OPENTELEMETRY_AGENT_PATTERN = "opentelemetry-javaagent*.jar"
OPENTELEMETRY_AGENT_EXTENSION_PATTERN = "phoenix-opentelemetry-trace-sampler-*[!s].jar"


OVERRIDE_SLF4J_BACKEND = "OVERRIDE_SLF4J_BACKEND_JAR_LOCATION"
OVERRIDE_LOGGING = "OVERRIDE_LOGGING_JAR_LOCATION"
OVERRIDE_SQLLINE = "OVERRIDE_SQLLINE_JAR_LOCATION"
OVERRIDE_OPENTELEMETRY_AGENT = "OVERRIDE_OPENTELEMETRY_AGENT_JAR_LOCATION"
OVERRIDE_OPENTELEMETRY_AGENT_EXTENSION = "OVERRIDE_OPENTELEMETRY_AGENT_EXTENSION_JAR_LOCATION"

# Backward support old env variable PHOENIX_LIB_DIR replaced by PHOENIX_CLASS_PATH
global phoenix_class_path
Expand Down Expand Up @@ -160,14 +163,14 @@ def setPath():
testjar = find(PHOENIX_TESTS_JAR_PATTERN, phoenix_class_path)

global phoenix_traceserver_jar
phoenix_traceserver_jar = find(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-tracing-webapp", "target", "*"))
phoenix_traceserver_jar = find(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-tracing-webapp", "target"))
if phoenix_traceserver_jar == "":
phoenix_traceserver_jar = findFileInPathWithoutRecursion(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, "..", "lib"))
if phoenix_traceserver_jar == "":
phoenix_traceserver_jar = findFileInPathWithoutRecursion(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, ".."))

global phoenix_pherf_jar
phoenix_pherf_jar = find(PHOENIX_PHERF_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-pherf", "target", "*"))
phoenix_pherf_jar = find(PHOENIX_PHERF_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-pherf", "target"))
if phoenix_pherf_jar == "":
phoenix_pherf_jar = findFileInPathWithoutRecursion(PHOENIX_PHERF_JAR_PATTERN, os.path.join(current_dir, "..", "lib"))
if phoenix_pherf_jar == "":
Expand All @@ -186,18 +189,38 @@ def setPath():
global logging_jar
logging_jar = os.environ.get(OVERRIDE_LOGGING)
if logging_jar is None or logging_jar == "":
logging_jar = findFileInPathWithoutRecursion(LOGGING_JAR_PATTERN, os.path.join(current_dir, "..","lib"))
logging_jar += ":"+findFileInPathWithoutRecursion(LOGGING_JAR_PATTERN2, os.path.join(current_dir, "..","lib"))
logging_jar += ":"+findFileInPathWithoutRecursion(LOGGING_JAR_PATTERN3, os.path.join(current_dir, "..","lib"))
logging_jar = findFileInPathWithoutRecursion(LOGGING_JAR_PATTERN, os.path.join(current_dir, "..", "lib"))
logging_jar += ":"+findFileInPathWithoutRecursion(LOGGING_JAR_PATTERN2, os.path.join(current_dir, "..", "lib"))
logging_jar += ":"+findFileInPathWithoutRecursion(LOGGING_JAR_PATTERN3, os.path.join(current_dir, "..", "lib"))

global opentelemetry_agent_jar
opentelemetry_agent_jar = os.environ.get(OVERRIDE_OPENTELEMETRY_AGENT)
if opentelemetry_agent_jar is None or opentelemetry_agent_jar == "":
opentelemetry_agent_jar = findFileInPathWithoutRecursion(OPENTELEMETRY_AGENT_PATTERN, os.path.join(current_dir, "..","lib/tracing"))

opentelemetry_agent_jar = findFileInPathWithoutRecursion(OPENTELEMETRY_AGENT_PATTERN, os.path.join(current_dir, "..", "lib/tracing"))

global opentelemetry_agent_extension_jar
opentelemetry_agent_extension_jar = os.environ.get(OVERRIDE_OPENTELEMETRY_AGENT_EXTENSION)
if opentelemetry_agent_extension_jar is None or opentelemetry_agent_extension_jar == "":
opentelemetry_agent_extension_jar = findFileInPathWithoutRecursion(OPENTELEMETRY_AGENT_EXTENSION_PATTERN, os.path.join(current_dir, "..", "lib/tracing"))
if opentelemetry_agent_extension_jar is None or opentelemetry_agent_extension_jar == "":
opentelemetry_agent_extension_jar = findFileInPathWithoutRecursion(OPENTELEMETRY_AGENT_EXTENSION_PATTERN, os.path.join(current_dir, "..", "phoenix-opentelemetry-trace-sampler", "target"))
return ""

def set_tracing():
global phoenix_trace_opts
phoenix_trace_opts = os.environ.get("PHOENIX_TRACE_OPTS")
#FIXME detect the OTEL Agent extension Jar
if phoenix_trace_opts is None or phoenix_trace_opts == "":
phoenix_trace_opts = " -Dotel.metrics.exporter=none -Dotel.instrumentation.jdbc.enabled=false -Dotel.javaagent.extensions="+opentelemetry_agent_extension_jar+" -Dotel.traces.sampler=phoenix_hintable_sampler -Dotel.traces.sampler.arg=0.0 "
# -Dotel.traces.exporter=phoenix_hintable_sampler
# -Dotel.javaagent.extensions=build/libs/opentelemetry-java-instrumentation-extension-demo-1.0-all.jar
# -Dotel.traces.exporter=logging
# -Dotel.instrumentation.jdbc.enabled=false
# -Dotel.javaagent.debug=true
print("TRACE_OPTS="+phoenix_trace_opts)
return ""

def set_sandbox_tracing():
global phoenix_trace_opts
phoenix_trace_opts = os.environ.get("PHOENIX_TRACE_OPTS")
if phoenix_trace_opts is None or phoenix_trace_opts == "":
Expand Down
4 changes: 4 additions & 0 deletions phoenix-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-client-embedded-${hbase.suffix}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-opentelemetry-trace-sampler</artifactId>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-pherf</artifactId>
Expand Down
9 changes: 9 additions & 0 deletions phoenix-assembly/src/build/components/all-common-jars.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,14 @@
<include>phoenix-pherf.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../phoenix-opentelemetry-trace-sampler/target</directory>
<!-- Should we copy this to the root dir like the other phoenix JARs ? -->
<outputDirectory>/lib/tracing/</outputDirectory>
<includes>
<!-- Would a versionless sampler JAR makes sense ? -->
<include>phoenix-opentelemetry-trace-sampler-${project.version}.jar</include>
</includes>
</fileSet>
</fileSets>
</component>
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
Expand All @@ -75,21 +76,24 @@ public class TraceQueryPlan implements QueryPlan {
private StatementContext context = null;
private boolean first = true;

// 8 bytes represented by 16 hex characters
private static int SPAN_ID_CHAR_LENGTH=16;

private static final RowProjector TRACE_PROJECTOR;
static {
List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>();
PName colName = PNameFactory.newName(MetricInfo.TRACE.columnName);
PColumn column =
new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null,
PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
PChar.INSTANCE, SPAN_ID_CHAR_LENGTH, null, false, 0, SortOrder.getDefault(), 0, null,
false, null, false, false, colName.getBytes(), HConstants.LATEST_TIMESTAMP);
List<PColumn> columns = new ArrayList<PColumn>();
columns.add(column);
Expression expression =
new RowKeyColumnExpression(column, new RowKeyValueAccessor(columns, 0));
projectedColumns.add(new ExpressionProjector(MetricInfo.TRACE.columnName, MetricInfo.TRACE.columnName, "", expression,
true));
int estimatedByteSize = SizedUtil.KEY_VALUE_SIZE + PLong.INSTANCE.getByteSize();
int estimatedByteSize = SizedUtil.KEY_VALUE_SIZE + SPAN_ID_CHAR_LENGTH;
TRACE_PROJECTOR = new RowProjector(projectedColumns, estimatedByteSize, false);
}

Expand Down Expand Up @@ -127,15 +131,15 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
final PhoenixConnection conn = stmt.getConnection();
if (conn.getTraceSpan() == null && !traceStatement.isTraceOn()) {
if (conn.getManualTraceSpanId() == null && !traceStatement.isTraceOn()) {
return ResultIterator.EMPTY_ITERATOR;
}
return new TraceQueryResultIterator(conn);
}

@Override
public long getEstimatedSize() {
return PLong.INSTANCE.getByteSize();
return SPAN_ID_CHAR_LENGTH;
}

@Override
Expand Down Expand Up @@ -266,13 +270,13 @@ public Tuple next() throws SQLException {
conn.endManualTraceSpan();
}
first = false;
String traceSpanId = connection.getManualTraceSpanId();
String traceSpanId = conn.getManualTraceSpanId();
if (traceSpanId != null) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ParseNodeFactory factory = new ParseNodeFactory();
LiteralParseNode literal = factory.literal(traceSpanId);
LiteralExpression expression =
LiteralExpression.newConstant(literal.getValue(), PLong.INSTANCE,
LiteralExpression.newConstant(literal.getValue(), PChar.INSTANCE,
Determinism.ALWAYS);
expression.evaluate(null, ptr);
byte[] rowKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.sql.Statement;
import java.sql.Struct;
import java.text.Format;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -67,7 +66,6 @@

import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.phoenix.call.CallRunner;
import org.apache.phoenix.exception.FailoverSQLException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
Expand Down Expand Up @@ -139,7 +137,7 @@
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap.Builder;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.trace.NullScope;
import org.apache.phoenix.trace.TraceUtil;
/**
*
Expand Down Expand Up @@ -171,7 +169,7 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
private final String timePattern;
private final String timestampPattern;
private int statementExecutionCounter;
private Span manualTraceSpan = TraceUtil.cr;
private Span manualTraceSpan;
private volatile boolean isClosed = false;
private volatile boolean isClosing = false;
private boolean readOnly = false;
Expand Down Expand Up @@ -1340,14 +1338,6 @@ public void incrementStatementExecutionCounter() {
}
}

public Span getTraceSpan() {
return manualTraceSpan;
}

public void setTraceSpan(Span traceSpan) {
this.manualTraceSpan = traceSpan;
}

@Override
public Map<String, Map<MetricType, Long>> getMutationMetrics() {
return mutationState.getMutationMetricQueue().aggregate();
Expand Down Expand Up @@ -1483,10 +1473,44 @@ public void setActivityLogger(ConnectionActivityLogger connectionActivityLogger)
this.connectionActivityLogger = connectionActivityLogger;
}

public void startManualTraceSpan() {
public synchronized void startManualTraceSpan() {
if (manualTraceSpan != null) {
// TODO What to do if we try turn on tracing for a connection that is already on ?
// For now we just ignore it, but it may be better to throw an exception ?
return;
}
manualTraceSpan = TraceUtil.createSpan(this, "PhoenixConnection manual trace", true);
}

public synchronized void endManualTraceSpan() {
if (manualTraceSpan == null) {
// TODO What to do if we try turn off tracing for a connection that is already off ?
// For now we just ignore it, but it may be better to throw an exception ?
return;
}
manualTraceSpan = TraceUtil.createSpan(null, dateFormatTimeZoneId, true);
// FIXME: If we still have traced queries running (i.e. open ResultSets), then events after
// this will be parent less, or at least those spans will be truncated.
// I don't see a way to avoid that, though.
try {
manualTraceSpan.end();
} finally {
manualTraceSpan = null;
}
}

public Scope makeCurrent() {
if (manualTraceSpan == null) {
return NullScope.INSTANCE;
} else {
return manualTraceSpan.makeCurrent();
}
}

public String getManualTraceSpanId() {
if (manualTraceSpan == null) {
return null;
} else {
return manualTraceSpan.getSpanContext().getSpanId();
}
}
}
Loading

0 comments on commit bc2cb80

Please sign in to comment.