Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
stoty committed Nov 15, 2023
1 parent e9001ff commit a7cec6c
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private void overrideDelegate() throws IOException {
// currentSpan.stop() and that should happen only when we are closing the scanner.
//FIXME I don't think the above is true for OpenTelemetry.
//Just use the standard pattern, and see if it works.
Span span = TraceUtil.createSpan(SCANNER_OPENED_TRACE_INFO);
Span span = TraceUtil.createServerSideSpan(SCANNER_OPENED_TRACE_INFO);
try (Scope scope = span.makeCurrent();){
RegionScanner scanner = doPostScannerOpen(c, scan, delegate);
scanner = new DelegateRegionScanner(scanner) {
Expand All @@ -317,8 +317,6 @@ public void close() throws IOException {
delegate.close();
} finally {
span.end();
//FIXME what happens it close is called from a different thread ?
scope.close();
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch, long[]
Map<TableInfo, List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap();

// add tracing for this operation
Span span = TraceUtil.createSpan("Committing mutations to tables");
Span span = TraceUtil.createSpan(connection, "Committing mutations to tables");
try (Scope ignored = span.makeCurrent()) {
ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
for (Map.Entry<TableRef, MultiRowMutationState> entry : commitBatch.entrySet()) {
Expand Down Expand Up @@ -1331,7 +1331,7 @@ private void sendMutations(Iterator<Entry<TableInfo, List<Mutation>>> mutationsI
boolean shouldRetryIndexedMutation = false;
IndexWriteException iwe = null;
do {
Span span = TraceUtil.createSpan("Writing mutation batch for table: " + Bytes.toString(htableName));
Span span = TraceUtil.createSpan(connection, "Writing mutation batch for table: " + Bytes.toString(htableName));
try (Scope scope = span.makeCurrent()) {
TableRef origTableRef = tableInfo.getOrigTableRef();
PTable table = origTableRef.getTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ private void preparePreIndexMutations(BatchMutateContext context,
PhoenixIndexMetaData indexMetaData) throws Throwable {
List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
// get the current span, or just use a null-span to avoid a bunch of if statements
Span span = TraceUtil.createSpan("Starting to build index updates");
Span span = TraceUtil.createServerSideSpan("Starting to build index updates");
try (Scope ignored = span.makeCurrent()) {
span.addEvent("Built index updates, doing preStep");
// The rest of this method is for handling global index updates
Expand Down Expand Up @@ -1353,7 +1353,7 @@ private void doIndexWritesWithExceptions(BatchMutateContext context, boolean pos
return;
}

Span span = TraceUtil.createSpan("Completing " + (post ? "post" : "pre") + " index writes");
Span span = TraceUtil.createServerSideSpan("Completing " + (post ? "post" : "pre") + " index writes");
try (Scope ignored = span.makeCurrent()) {
span.addEvent("Actually doing " + (post ? "post" : "pre") + " index update for first time");
if (post) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
}

// get the current span, or just use a null-span to avoid a bunch of if statements
Span span = TraceUtil.createSpan("Starting to build index updates");
Span span = TraceUtil.createServerSideSpan("Starting to build index updates");
try (Scope ignored = span.makeCurrent()) {
long start = EnvironmentEdgeManager.currentTimeMillis();

Expand Down Expand Up @@ -608,7 +608,7 @@ private void doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment>
return;
}

Span span = TraceUtil.createSpan("Completing index writes");
Span span = TraceUtil.createServerSideSpan("Completing index writes");
try (Scope ignored = span.makeCurrent()) {
long start = EnvironmentEdgeManager.currentTimeMillis();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws IOExce
RowLockImpl result = null;

boolean success = false;
Span span = TraceUtil.createSpan("LockManager.getRowLock");
Span span = TraceUtil.createServerSideSpan("LockManager.getRowLock");
try (Scope ignored = span.makeCurrent()){
span.addEvent("Getting a lock");
// Keep trying until we have a lock or error out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
setBatchMutateContext(c, context);

Collection<Pair<Mutation, byte[]>> indexUpdates = null;
Span span = TraceUtil.createSpan("Starting to build index updates");
Span span = TraceUtil.createServerSideSpan("Starting to build index updates");
try (Scope ignored = span.makeCurrent()) {
RegionCoprocessorEnvironment env = c.getEnvironment();
PhoenixTransactionContext txnContext = indexMetaData.getTransactionContext();
Expand Down Expand Up @@ -204,7 +204,7 @@ public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnviro
if (context == null || context.indexUpdates == null) {
return;
}
Span span = TraceUtil.createSpan("Starting to write index updates");
Span span = TraceUtil.createServerSideSpan("Starting to write index updates");
try (Scope scope = span.makeCurrent()) {
if (success) { // if miniBatchOp was successfully written, write index updates
if (!context.indexUpdates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ synchronized public void close() throws SQLException {
@Override
public void commit() throws SQLException {
checkOpen();
Span span = TraceUtil.createSpan("committing mutations");
Span span = TraceUtil.createSpan(this, "committing mutations");
try (Scope ignored = span.makeCurrent()) {
mutationState.commit();
span.setStatus(StatusCode.OK);
Expand Down Expand Up @@ -1071,7 +1071,7 @@ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
public void rollback() throws SQLException {
if (!mutationState.isEmpty()) {
checkOpen();
Span span = TraceUtil.createSpan("rolling back");
Span span = TraceUtil.createSpan(this, "rolling back");
try (Scope scope = span.makeCurrent()) {
mutationState.rollback();
span.setStatus(StatusCode.OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ private boolean match(String str, String pattern) throws SQLException {
public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern,
String columnNamePattern) throws SQLException {
List<Tuple> tuples = Lists.newArrayListWithExpectedSize(10);
Span span = TraceUtil.createSpan("PhoenixDataBaseMetaData.getColumns() collrcing data");
Span span = TraceUtil.createSpan(connection, "PhoenixDataBaseMetaData.getColumns() collecting data");
try (Scope ignored = span.makeCurrent()){
boolean isTenantSpecificConnection = connection.getTenantId() != null;
// Allow a "." in columnNamePattern for column family match
Expand Down Expand Up @@ -1185,7 +1185,7 @@ public ResultSet getPrimaryKeys(String catalog, String schemaName, String tableN
return getEmptyResultSet();
}
List<Tuple> tuples = Lists.newArrayListWithExpectedSize(10);
Span span = TraceUtil.createSpan("PhoenixDataBaseMetaData.getPrimaryKeys() collecting data");
Span span = TraceUtil.createSpan(connection, "PhoenixDataBaseMetaData.getPrimaryKeys() collecting data");
try (Scope ignored = span.makeCurrent()) {
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
PTable table = PhoenixRuntime.getTableNoCache(connection, fullTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void executeForBatch() throws SQLException {
SQLExceptionCode.EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET)
.build().buildException();
}
executeMutation(statement, createAuditQueryLogger(statement, query));
executeMutation(statement, createAuditQueryLogger(statement, query), query);
}

@Override
Expand All @@ -178,7 +178,7 @@ public boolean execute() throws SQLException {
.build().buildException();
}
if (statement.getOperation().isMutation()) {
executeMutation(statement, createAuditQueryLogger(statement,query));
executeMutation(statement, createAuditQueryLogger(statement,query), query);
return false;
}
executeQuery(statement, createQueryLogger(statement,query));
Expand All @@ -205,7 +205,7 @@ public int executeUpdate() throws SQLException {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
.build().buildException();
}
return executeMutation(statement, createAuditQueryLogger(statement,query));
return executeMutation(statement, createAuditQueryLogger(statement,query), query);
}

public QueryPlan optimizeQuery() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.UPSERT_SQL_QUERY_TIME;
import static org.apache.phoenix.monitoring.MetricType.UPSERT_SUCCESS_SQL_COUNTER;
import static org.apache.phoenix.trace.PhoenixSemanticAttributes.DB_STATEMENT;


import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -232,6 +234,7 @@
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.math.IntMath;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;

/**
*
* JDBC Statement implementation of Phoenix.
Expand Down Expand Up @@ -351,22 +354,20 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt,
String tableName = null;
PhoenixResultSet rs = null;
clearResultSet();
lastQuerySpan = TraceUtil.createSpan("executing query " + stmt);
lastQuerySpan = TraceUtil.createSpan(connection, "Executing Query");
lastQuerySpan.setAttribute(DB_STATEMENT, stmt.toString());
try (Scope ignored = lastQuerySpan.makeCurrent()) {
PhoenixConnection conn = getConnection();
lastQuerySpan.addEvent("Opened connection");
conn.checkOpen();
lastQuerySpan.addEvent("Ran checkOpen");

if (conn.getQueryServices().isUpgradeRequired() && !conn
.isRunningUpgrade()
&& stmt.getOperation() != Operation.UPGRADE) {
throw new UpgradeRequiredException();
}
lastQuerySpan.addEvent("After upgrade check");

QueryPlan plan;
Span compileSpan = TraceUtil.createSpan("Compiling and optimizing plan for " + stmt);
Span compileSpan = TraceUtil.createSpan(connection, "Compiling and optimizing plan for " + stmt);
try (Scope compileScope = compileSpan.makeCurrent()) {
plan =
stmt.compilePlan(PhoenixStatement.this,
Expand All @@ -382,7 +383,6 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt,
} finally {
compileSpan.end();
}
lastQuerySpan.addEvent("After compile and optimize Plan");

// Send mutations to hbase, so they are visible to subsequent reads.
// Use original plan for data table so that data and immutable indexes will be sent
Expand All @@ -396,15 +396,12 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt,
tableName = plan.getTableRef().getTable().getPhysicalName()
.toString();
}
lastQuerySpan.addEvent("Got tableName");

if (plan.getContext().getScanRanges().isPointLookup()) {
pointLookup = true;
}
Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
lastQuerySpan.addEvent("Got tableRefs");
connection.getMutationState().sendUncommitted(tableRefs);
lastQuerySpan.addEvent("After sendUncommitted");

// this will create its own trace internally, so we don't wrap this
// whole thing in tracing
Expand All @@ -424,16 +421,13 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt,
context.getScan().toString() :
null);
}
lastQuerySpan.addEvent("After queryLogger log");

context.getOverallQueryMetrics().startQuery();
lastQuerySpan.addEvent("After metrics startQuery");

rs =
newResultSet(resultIterator, plan.getProjector(),
plan.getContext());
// newResultset sets lastResultset
lastQuerySpan.addEvent("After resultset created");
setLastQueryPlan(plan);
setLastUpdateCount(NO_UPDATE);
setLastUpdateTable(tableName == null ? TABLE_UNKNOWN : tableName);
Expand All @@ -442,7 +436,6 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt,
if (connection.getAutoCommit() && !noCommit) {
connection.commit();
}
lastQuerySpan.addEvent("After conn.commit");

connection.incrementStatementExecutionCounter();
success = true;
Expand All @@ -457,7 +450,7 @@ private PhoenixResultSet executeQuery(final CompilableStatement stmt,
}
boolean wasUpdated;
try (Scope ignored = lastQuerySpan.makeCurrent()) {
lastQuerySpan.addEvent("Trying to reloading table " + e.getTableName() + " data from server");
lastQuerySpan.addEvent("Trying to reload table " + e.getTableName() + " data from server");
wasUpdated = new MetaDataClient(connection)
.updateCache(connection.getTenantId(),
e.getSchemaName(), e.getTableName(), true)
Expand Down Expand Up @@ -568,11 +561,11 @@ public String getTargetForAudit(CompilableStatement stmt) {
}


protected int executeMutation(final CompilableStatement stmt, final AuditQueryLogger queryLogger) throws SQLException {
return executeMutation(stmt, true, queryLogger);
protected int executeMutation(final CompilableStatement stmt, final AuditQueryLogger queryLogger, String originalSQL) throws SQLException {
return executeMutation(stmt, true, queryLogger, originalSQL);
}

private int executeMutation(final CompilableStatement stmt, final boolean doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger) throws SQLException {
private int executeMutation(final CompilableStatement stmt, final boolean doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger, String originalSQL) throws SQLException {
if (connection.isReadOnly()) {
throw new SQLExceptionInfo.Builder(
SQLExceptionCode.READ_ONLY_CONNECTION).
Expand All @@ -593,7 +586,10 @@ public Integer call() throws SQLException {
MutationPlan plan = null;
final long startExecuteMutationTime = EnvironmentEdgeManager.currentTimeMillis();
clearResultSet();
Span span = TraceUtil.createSpan("execute Mutation " + stmt);
// TODO for queries we use re-constructed SQLs. We don't have code to do that
// for DLMs, so we just the original
Span span = TraceUtil.createSpan(connection, "Executing Mutation");
span.setAttribute(DB_STATEMENT, originalSQL);
try (Scope scope = span.makeCurrent()) {
PhoenixConnection conn = getConnection();
if (conn.getQueryServices().isUpgradeRequired() && !conn.isRunningUpgrade()
Expand Down Expand Up @@ -653,7 +649,7 @@ public Integer call() throws SQLException {
span.addEvent("Reloading table " + e.getTableName() + " data from server");
if (new MetaDataClient(connection).updateCache(connection.getTenantId(),
e.getSchemaName(), e.getTableName(), true).wasUpdated()) {
return executeMutation(stmt, false, queryLogger);
return executeMutation(stmt, false, queryLogger, originalSQL);
}
}
}
Expand Down Expand Up @@ -2280,7 +2276,7 @@ public int executeUpdate(String sql) throws SQLException {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
.build().buildException();
}
int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, sql));
int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, sql), sql);
flushIfNecessary();
return updateCount;
}
Expand All @@ -2299,7 +2295,7 @@ public boolean execute(String sql) throws SQLException {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
.build().buildException();
}
executeMutation(stmt, createAuditQueryLogger(stmt, sql));
executeMutation(stmt, createAuditQueryLogger(stmt, sql), sql);
flushIfNecessary();
return false;
}
Expand Down
Loading

0 comments on commit a7cec6c

Please sign in to comment.