Skip to content

Commit

Permalink
cleanups and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stoty committed Nov 20, 2023
1 parent 3b231d9 commit 1c8efd3
Show file tree
Hide file tree
Showing 25 changed files with 127 additions and 374 deletions.
17 changes: 13 additions & 4 deletions bin/phoenix_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@
import sys
import phoenix_utils

# Since sandbox is used exclusively for development and debugging, it is easiest to
# unconditionally enable tracing
def set_sandbox_tracing():
global sandbox_trace_opts
sandbox_trace_opts = os.environ.get("PHOENIX_TRACE_OPTS")
if sandbox_trace_opts is None or phoenix_trace_opts == "":
sandbox_trace_opts = " -javaagent:" + phoenix_utils.opentelemetry_agent_jar + " -Dotel.metrics.exporter=none -Dotel.instrumentation.jdbc.enabled=false"
return ""

phoenix_utils.setPath()
phoenix_utils.set_sandbox_tracing()
set_sandbox_tracing()

base_dir = os.path.join(phoenix_utils.current_dir, '..')
phoenix_target_dir = os.path.join(base_dir, 'phoenix-core', 'target')
Expand All @@ -45,10 +54,10 @@
with open(cp_file_path, 'r') as cp_file:
cp_components.append(cp_file.read())

java_cmd = ("java -javaagent:%s -Dlog4j2.configurationFile=file:%s " +
' ' + phoenix_utils.phoenix_trace_opts + ' -Dotel.service.name="phoenix-sandbox" ' +
java_cmd = ("java -Dlog4j2.configurationFile=file:%s " +
' ' + sandbox_trace_opts + ' -Dotel.service.name="phoenix-sandbox" ' +
"-cp %s org.apache.phoenix.Sandbox") % (
phoenix_utils.opentelemetry_agent_jar, logging_config, ":".join(cp_components))
logging_config, ":".join(cp_components))

proc = subprocess.Popen(java_cmd, shell=True)
try:
Expand Down
30 changes: 11 additions & 19 deletions bin/phoenix_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def setPath():
OPENTELEMETRY_AGENT_PATTERN = "opentelemetry-javaagent*.jar"
OPENTELEMETRY_AGENT_EXTENSION_PATTERN = "phoenix-opentelemetry-trace-sampler-*[!s].jar"


OVERRIDE_SLF4J_BACKEND = "OVERRIDE_SLF4J_BACKEND_JAR_LOCATION"
OVERRIDE_LOGGING = "OVERRIDE_LOGGING_JAR_LOCATION"
OVERRIDE_SQLLINE = "OVERRIDE_SQLLINE_JAR_LOCATION"
Expand Down Expand Up @@ -163,14 +162,14 @@ def setPath():
testjar = find(PHOENIX_TESTS_JAR_PATTERN, phoenix_class_path)

global phoenix_traceserver_jar
phoenix_traceserver_jar = find(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-tracing-webapp", "target"))
phoenix_traceserver_jar = find(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-tracing-webapp", "target", "*"))
if phoenix_traceserver_jar == "":
phoenix_traceserver_jar = findFileInPathWithoutRecursion(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, "..", "lib"))
if phoenix_traceserver_jar == "":
phoenix_traceserver_jar = findFileInPathWithoutRecursion(PHOENIX_TRACESERVER_JAR_PATTERN, os.path.join(current_dir, ".."))

global phoenix_pherf_jar
phoenix_pherf_jar = find(PHOENIX_PHERF_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-pherf", "target"))
phoenix_pherf_jar = find(PHOENIX_PHERF_JAR_PATTERN, os.path.join(current_dir, "..", "phoenix-pherf", "target", "*"))
if phoenix_pherf_jar == "":
phoenix_pherf_jar = findFileInPathWithoutRecursion(PHOENIX_PHERF_JAR_PATTERN, os.path.join(current_dir, "..", "lib"))
if phoenix_pherf_jar == "":
Expand Down Expand Up @@ -206,28 +205,16 @@ def setPath():
opentelemetry_agent_extension_jar = findFileInPathWithoutRecursion(OPENTELEMETRY_AGENT_EXTENSION_PATTERN, os.path.join(current_dir, "..", "phoenix-opentelemetry-trace-sampler", "target"))
return ""


def set_tracing():
global phoenix_trace_opts
phoenix_trace_opts = os.environ.get("PHOENIX_TRACE_OPTS")
#FIXME detect the OTEL Agent extension Jar
if phoenix_trace_opts is None or phoenix_trace_opts == "":
phoenix_trace_opts = " -Dotel.metrics.exporter=none -Dotel.instrumentation.jdbc.enabled=false -Dotel.javaagent.extensions="+opentelemetry_agent_extension_jar+" -Dotel.traces.sampler=phoenix_hintable_sampler -Dotel.traces.sampler.arg=0.0 "
# -Dotel.traces.exporter=phoenix_hintable_sampler
# -Dotel.javaagent.extensions=build/libs/opentelemetry-java-instrumentation-extension-demo-1.0-all.jar
# -Dotel.traces.exporter=logging
# -Dotel.instrumentation.jdbc.enabled=false
# -Dotel.javaagent.debug=true
print("TRACE_OPTS="+phoenix_trace_opts)
phoenix_trace_opts = " -javaagent:" + opentelemetry_agent_jar + \
" -Dotel.javaagent.extensions=" + opentelemetry_agent_extension_jar + \
" -Dotel.metrics.exporter=none -Dotel.instrumentation.jdbc.enabled=false -Dotel.traces.sampler=phoenix_hintable_sampler "
return ""

def set_sandbox_tracing():
global phoenix_trace_opts
phoenix_trace_opts = os.environ.get("PHOENIX_TRACE_OPTS")
if phoenix_trace_opts is None or phoenix_trace_opts == "":
phoenix_trace_opts = " -Dotel.metrics.exporter=none -Dotel.instrumentation.jdbc.enabled=false"
#-Dotel.traces.exporter=logging
# -Dotel.instrumentation.jdbc.enabled=false
return ""

def shell_quote(args):
"""
Expand All @@ -250,6 +237,9 @@ def common_sqlline_args(parser):
parser.add_argument('-fc', '--fastconnect',
help='Fetch all schemas on initial connection',
action="store_true")
parser.add_argument('--trace', help='Load and set up Opentelemetry agent',
action="store_true")
parser.add_argument('--traceratio', help='Default trace ratio')

if __name__ == "__main__":
setPath()
Expand All @@ -265,3 +255,5 @@ def common_sqlline_args(parser):
print("slf4j_backend_jar:", slf4j_backend_jar)
print("logging_jar:", logging_jar)
print("opentelemetry_agent_jar:", opentelemetry_agent_jar)
print("opentelemetry_agent_extension_jar:", opentelemetry_agent_extension_jar)

20 changes: 11 additions & 9 deletions bin/sqlline.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,25 @@ def kill_child():
else:
disable_jna = ""

java_cmd = java + ' -javaagent:' + phoenix_utils.opentelemetry_agent_jar + ' $PHOENIX_OPTS ' + \
x = ( 'a' 'b')

java_cmd = java + ' $PHOENIX_OPTS ' + \
' -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + \
phoenix_utils.hadoop_conf + os.pathsep + \
phoenix_utils.sqlline_with_deps_jar + os.pathsep + \
phoenix_utils.slf4j_backend_jar + os.pathsep + \
phoenix_utils.logging_jar + os.pathsep + \
phoenix_utils.phoenix_client_embedded_jar + \
'" -Dlog4j2.configurationFile=file:' + os.path.join(phoenix_utils.current_dir, "log4j2.properties") + \
' ' + phoenix_utils.phoenix_trace_opts + ' -Dotel.service.name="phoenix-sqlline" ' + \
disable_jna + \
" sqlline.SqlLine -d org.apache.phoenix.jdbc.PhoenixDriver" + \
(not args.noconnect and " -u " + phoenix_utils.shell_quote([jdbc_url]) or "") + \
" -n none -p none --color=" + \
(args.color and "true" or "false") + \
" --fastConnect=" + (args.fastconnect and "true" or "false") + \
" --verbose=" + (args.verbose and "true" or "false") + \
" --incremental=false --isolation=TRANSACTION_READ_COMMITTED " + sqlfile
((phoenix_utils.phoenix_trace_opts + ' -Dotel.service.name="phoenix-sqlline" ') if args.trace else "" ) + \
("" if args.traceratio is None else "-Dotel.traces.sampler.arg=" + args.traceratio) + \
" sqlline.SqlLine -d org.apache.phoenix.jdbc.PhoenixDriver " + \
("" if args.noconnect else (" -u " + phoenix_utils.shell_quote([jdbc_url]))) + \
" -n none -p none --color=" + ("true" if args.color else "false") + \
" --fastConnect=" + ("true" if args.fastconnect else "false") + \
" --verbose=" + ( "true" if args.verbose else "false") + \
" --incremental=false --isolation=TRANSACTION_READ_COMMITTED " + sqlfile

if args.verbose_command:
print("Executing java command: " + java_cmd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.DelegateConnection;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.After;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.SizedUtil;

import io.opentelemetry.api.trace.Span;

public class TraceQueryPlan implements QueryPlan {

private TraceStatement traceStatement = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.phoenix.coprocessor;

import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter;

import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -61,11 +63,8 @@
import org.apache.phoenix.util.ServerUtil;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;

import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForFilter;

abstract public class BaseScannerRegionObserver implements RegionObserver {

public static final String AGGREGATORS = "_Aggs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.trace.TraceUtil;
import org.apache.phoenix.trace.TracingIterator;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
Expand All @@ -85,9 +84,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;


/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.trace.TraceUtil;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.transaction.TransactionFactory;
Expand Down Expand Up @@ -1211,10 +1210,6 @@ private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
}

private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch, long[] serverTimeStamps, boolean sendAll) throws SQLException {
if (commitBatch.isEmpty()) {
return;
}

int i = 0;
Map<TableInfo, List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap();

Expand Down Expand Up @@ -1300,6 +1295,7 @@ private void sendBatch(Map<TableRef, MultiRowMutationState> commitBatch, long[]
span.setStatus(StatusCode.OK);
} catch (SQLException e) {
TraceUtil.setError(span, e);
throw e;
} finally {
span.end();
}
Expand Down Expand Up @@ -1427,16 +1423,16 @@ public List<Mutation> getMutationList() {
// applied batches from entire list, also we can set
// REPLAY_ONLY_INDEX_WRITES for first batch
// only in case of 1121 SQLException
itrListMutation.remove();
itrListMutation.remove();

batchCount++;
if (LOGGER.isDebugEnabled())
LOGGER.debug("Sent batch of " + mutationBatch.size() + " for "
+ Bytes.toString(htableName));
}
shouldRetry = false;
numFailedMutations = 0;

// Remove batches as we process them
removeMutations(this.mutationsMap, origTableRef);
if (tableInfo.isDataTable()) {
Expand All @@ -1457,13 +1453,13 @@ public List<Mutation> getMutationList() {
// Swallow this exception once, as it's possible that we split after sending the index
// metadata
// and one of the region servers doesn't have it. This will cause it to have it the next
// go around.
// go around.
// If it fails again, we don't retry.
String msg = "Swallowing exception and retrying after clearing meta cache on connection. "
+ inferredE;
LOGGER.warn(LogUtil.addCustomAnnotations(msg, connection));
connection.getQueryServices().clearTableRegionCache(TableName.valueOf(htableName));

// The HTRace implementation started a new child span here.
// Now we're just adding an event to the same span.
span.addEvent(msg);
Expand Down Expand Up @@ -1496,7 +1492,7 @@ public List<Mutation> getMutationList() {
int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
numFailedMutations = uncommittedStatementIndexes.length;
GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);

if (isVerifiedPhase) {
numFailedPhase3Mutations = numFailedMutations;
GLOBAL_MUTATION_INDEX_COMMIT_FAILURE_COUNT.update(numFailedPhase3Mutations);
Expand All @@ -1511,7 +1507,7 @@ public List<Mutation> getMutationList() {
htableNameStr, numFailedMutations,
table.isTransactional());
}

MutationMetric committedMutationsMetric =
getCommittedMutationsMetric(
totalMutationBytesObject,
Expand All @@ -1523,7 +1519,7 @@ public List<Mutation> getMutationList() {
// Combine failure mutation metrics with committed ones for the final picture
committedMutationsMetric.combineMetric(failureMutationMetrics);
mutationMetricQueue.addMetricsForTable(htableNameStr, committedMutationsMetric);

if (allUpsertsMutations ^ allDeletesMutations) {
//success cases are updated for both cases autoCommit=true and conn.commit explicit
if (areAllBatchesSuccessful){
Expand All @@ -1548,7 +1544,7 @@ public List<Mutation> getMutationList() {
latency, allUpsertsMutations);
}
resetAllMutationState();

try {
if (cache != null) cache.close();
} finally {
Expand All @@ -1566,7 +1562,6 @@ public List<Mutation> getMutationList() {
}
} finally {
span.end();

}
} while (shouldRetry && retryCount++ < 1);
}
Expand All @@ -1584,10 +1579,12 @@ public static MutationMetricQueue.MutationMetric updateMutationBatchFailureMetri
String tableName,
long numFailedMutations,
boolean isTransactional) {

if (failedMutationBatch == null || failedMutationBatch.isEmpty() ||
Strings.isNullOrEmpty(tableName)) {
return MutationMetricQueue.MutationMetric.EMPTY_METRIC;
}

long numUpsertMutationsInBatch = 0L;
long numDeleteMutationsInBatch = 0L;

Expand All @@ -1598,12 +1595,25 @@ public static MutationMetricQueue.MutationMetric updateMutationBatchFailureMetri
numDeleteMutationsInBatch++;
}
}

long totalFailedMutation = numUpsertMutationsInBatch + numDeleteMutationsInBatch;
//this case should not happen but the if condition makes sense if this ever happens
if (totalFailedMutation < numFailedMutations) {
LOGGER.warn(
"total failed mutation less than num of failed mutation. This is not expected.");
totalFailedMutation = numFailedMutations;
}

long totalNumFailedMutations = allDeletesMutations && !isTransactional
? numDeleteMutationsInBatch : totalFailedMutation;
GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(totalNumFailedMutations);

// Update the MUTATION_BATCH_FAILED_SIZE counter with the number of failed delete mutations
// in case we are dealing with all deletes for a non-transactional table, since there is a
// bug in sendMutations where we don't get the correct value for numFailedMutations when
// we don't use transactions
return new MutationMetricQueue.MutationMetric(0, 0, 0, 0, 0, 0,
allDeletesMutations && !isTransactional ? numDeleteMutationsInBatch : numFailedMutations,
totalNumFailedMutations,
0, 0, 0, 0,
numUpsertMutationsInBatch,
allUpsertsMutations ? 1 : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1374,8 +1374,7 @@ private void doIndexWritesWithExceptions(BatchMutateContext context, boolean pos
}
span.setStatus(StatusCode.OK);
} catch (IOException e) {
span.setStatus(StatusCode.ERROR);
span.recordException(e);
TraceUtil.setError(span, e);
throw e;
} finally {
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,7 @@ private void doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment>
metricSource.updateIndexWriteTime(dataTableName, duration);
span.end();
} catch (Throwable t) {
span.setStatus(StatusCode.ERROR);
span.recordException(t);
TraceUtil.setError(span, t);
throw t;
} finally {
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws IOExce
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
span.addEvent("Interrupted exception getting row lock");
span.setStatus(StatusCode.ERROR);
span.recordException(ie);
TraceUtil.setError(span, ie);
Thread.currentThread().interrupt();
throw iie;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.trace.TraceUtil;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.ScanUtil;
Expand Down
Loading

0 comments on commit 1c8efd3

Please sign in to comment.