diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index e95dfd46606b..5415da4cf91f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -95,6 +95,7 @@ public Get(Get get) { this.setFilter(get.getFilter()); this.setReplicaId(get.getReplicaId()); this.setConsistency(get.getConsistency()); + this.setQueryMetricsEnabled(get.isQueryMetricsEnabled()); // from Get this.cacheBlocks = get.getCacheBlocks(); this.maxVersions = get.getMaxVersions(); @@ -453,6 +454,7 @@ public Map toMap(int maxCols) { map.put("colFamTimeRangeMap", colFamTimeRangeMapStr); } map.put("priority", getPriority()); + map.put("queryMetricsEnabled", queryMetricsEnabled); return map; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java index 944a70376829..ca2c02316051 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java @@ -46,6 +46,7 @@ public abstract class Query extends OperationWithAttributes { protected Consistency consistency = Consistency.STRONG; protected Map colFamTimeRangeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected Boolean loadColumnFamiliesOnDemand = null; + protected boolean queryMetricsEnabled = false; public Filter getFilter() { return filter; @@ -157,6 +158,15 @@ public Query setIsolationLevel(IsolationLevel level) { return this; } + public Query setQueryMetricsEnabled(boolean enabled) { + this.queryMetricsEnabled = enabled; + return this; + } + + public boolean isQueryMetricsEnabled() { + return queryMetricsEnabled; + } + /** * Returns The isolation level of this query. If no isolation level was set for this query object, * then it returns READ_COMMITTED. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/QueryMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/QueryMetrics.java new file mode 100644 index 000000000000..ed3818fea01a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/QueryMetrics.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public class QueryMetrics { + private final long blockBytesScanned; + + public QueryMetrics(long blockBytesScanned) { + this.blockBytesScanned = blockBytesScanned; + } + + public long getBlockBytesScanned() { + return blockBytesScanned; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index f4ac525e5b93..22101845ea40 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -99,6 +99,7 @@ public class Result implements ExtendedCellScannable, ExtendedCellScanner { */ private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX; private RegionLoadStats stats; + private QueryMetrics metrics = null; private final boolean readonly; @@ -931,6 +932,11 @@ public void setStatistics(RegionLoadStats loadStats) { this.stats = loadStats; } + @InterfaceAudience.Private + public void setMetrics(QueryMetrics metrics) { + this.metrics = metrics; + } + /** * Returns the associated statistics about the region from which this was returned. Can be * null if stats are disabled. @@ -939,6 +945,11 @@ public RegionLoadStats getStats() { return stats; } + /** Returns the query metrics, or {@code null} if we do not enable metrics. */ + public QueryMetrics getMetrics() { + return metrics; + } + /** * All methods modifying state of Result object must call this method to ensure that special * purpose immutable Results can't be accidentally modified. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index c2caac844a8b..62a65e4e6e13 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -217,6 +217,7 @@ public Scan(Scan scan) throws IOException { setPriority(scan.getPriority()); readType = scan.getReadType(); super.setReplicaId(scan.getReplicaId()); + super.setQueryMetricsEnabled(scan.isQueryMetricsEnabled()); } /** @@ -249,6 +250,7 @@ public Scan(Get get) { this.mvccReadPoint = -1L; setPriority(get.getPriority()); super.setReplicaId(get.getReplicaId()); + super.setQueryMetricsEnabled(get.isQueryMetricsEnabled()); } public boolean isGetScan() { @@ -826,6 +828,7 @@ public Map toMap(int maxCols) { map.put("colFamTimeRangeMap", colFamTimeRangeMapStr); } map.put("priority", getPriority()); + map.put("queryMetricsEnabled", queryMetricsEnabled); return map; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index e9eb085ee7bd..f1f5e2269c86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.OnlineLogRecord; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.QueryMetrics; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionLoadStats; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -659,6 +660,7 @@ public static Get toGet(final ClientProtos.Get proto) throws IOException { if (proto.hasLoadColumnFamiliesOnDemand()) { get.setLoadColumnFamiliesOnDemand(proto.getLoadColumnFamiliesOnDemand()); } + get.setQueryMetricsEnabled(proto.getQueryMetricsEnabled()); return get; } @@ -1096,6 +1098,7 @@ public static ClientProtos.Scan toScan(final Scan scan) throws IOException { if (scan.isNeedCursorResult()) { scanBuilder.setNeedCursorResult(true); } + scanBuilder.setQueryMetricsEnabled(scan.isQueryMetricsEnabled()); return scanBuilder.build(); } @@ -1200,6 +1203,7 @@ public static Scan toScan(final ClientProtos.Scan proto) throws IOException { if (proto.getNeedCursorResult()) { scan.setNeedCursorResult(true); } + scan.setQueryMetricsEnabled(proto.getQueryMetricsEnabled()); return scan; } @@ -1279,6 +1283,7 @@ public static ClientProtos.Get toGet(final Get get) throws IOException { if (loadColumnFamiliesOnDemand != null) { builder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand); } + builder.setQueryMetricsEnabled(get.isQueryMetricsEnabled()); return builder.build(); } @@ -1434,6 +1439,10 @@ public static ClientProtos.Result toResult(final Result result, boolean encodeTa builder.setStale(result.isStale()); builder.setPartial(result.mayHaveMoreCellsInRow()); + if (result.getMetrics() != null) { + builder.setMetrics(toQueryMetrics(result.getMetrics())); + } + return builder.build(); } @@ -1463,6 +1472,9 @@ public static ClientProtos.Result toResultNoData(final Result result) { ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder(); builder.setAssociatedCellCount(size); builder.setStale(result.isStale()); + if (result.getMetrics() != null) { + builder.setMetrics(toQueryMetrics(result.getMetrics())); + } return builder.build(); } @@ -1503,7 +1515,11 @@ public static Result toResult(final ClientProtos.Result proto, boolean decodeTag for (CellProtos.Cell c : values) { cells.add(toCell(builder, c, decodeTags)); } - return Result.create(cells, null, proto.getStale(), proto.getPartial()); + Result r = Result.create(cells, null, proto.getStale(), proto.getPartial()); + if (proto.hasMetrics()) { + r.setMetrics(toQueryMetrics(proto.getMetrics())); + } + return r; } /** @@ -1548,9 +1564,15 @@ public static Result toResult(final ClientProtos.Result proto, final CellScanner } } - return (cells == null || cells.isEmpty()) + Result r = (cells == null || cells.isEmpty()) ? (proto.getStale() ? EMPTY_RESULT_STALE : EMPTY_RESULT) : Result.create(cells, null, proto.getStale()); + + if (proto.hasMetrics()) { + r.setMetrics(toQueryMetrics(proto.getMetrics())); + } + + return r; } /** @@ -3811,6 +3833,15 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) { .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build(); } + public static ClientProtos.QueryMetrics toQueryMetrics(QueryMetrics metrics) { + return ClientProtos.QueryMetrics.newBuilder() + .setBlockBytesScanned(metrics.getBlockBytesScanned()).build(); + } + + public static QueryMetrics toQueryMetrics(ClientProtos.QueryMetrics metrics) { + return new QueryMetrics(metrics.getBlockBytesScanned()); + } + /** * Check whether this IPBE indicates EOF or not. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java index 09cbc460f22e..4da037e88ee9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.QueryMetrics; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.SingleResponse; @@ -417,6 +418,7 @@ public static Result[] getResults(CellScanner cellScanner, ScanResponse response int noOfResults = cellScanner != null ? response.getCellsPerResultCount() : response.getResultsCount(); Result[] results = new Result[noOfResults]; + List queryMetrics = response.getQueryMetricsList(); for (int i = 0; i < noOfResults; i++) { if (cellScanner != null) { // Cells are out in cellblocks. Group them up again as Results. How many to read at a @@ -453,6 +455,12 @@ public static Result[] getResults(CellScanner cellScanner, ScanResponse response // Result is pure pb. results[i] = ProtobufUtil.toResult(response.getResults(i)); } + + // Populate result metrics if they exist + if (queryMetrics.size() > i) { + QueryMetrics metrics = ProtobufUtil.toQueryMetrics(queryMetrics.get(i)); + results[i].setMetrics(metrics); + } } return results; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java index ca1a708e64f6..e6eed365cd09 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestGet.java @@ -182,6 +182,7 @@ public void TestGetRowFromGetCopyConstructor() throws Exception { get.setMaxResultsPerColumnFamily(10); get.setRowOffsetPerColumnFamily(11); get.setCacheBlocks(true); + get.setQueryMetricsEnabled(true); Get copyGet = new Get(get); assertEquals(0, Bytes.compareTo(get.getRow(), copyGet.getRow())); @@ -196,6 +197,7 @@ public void TestGetRowFromGetCopyConstructor() throws Exception { assertEquals(get.getConsistency(), copyGet.getConsistency()); assertEquals(get.getReplicaId(), copyGet.getReplicaId()); assertEquals(get.getIsolationLevel(), copyGet.getIsolationLevel()); + assertTrue(get.isQueryMetricsEnabled()); // from Get class assertEquals(get.isCheckExistenceOnly(), copyGet.isCheckExistenceOnly()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java index 72013b6f294b..4446ab0f7261 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java @@ -44,21 +44,21 @@ public void itSerializesScan() { Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes(123)); scan.withStopRow(Bytes.toBytes(456)); - String expectedOutput = - "{\n" + " \"startTime\": 1,\n" + " \"processingTime\": 2,\n" + " \"queueTime\": 3,\n" - + " \"responseSize\": 4,\n" + " \"blockBytesScanned\": 5,\n" + " \"fsReadTime\": 6,\n" - + " \"multiGetsCount\": 6,\n" + " \"multiMutationsCount\": 7,\n" + " \"scan\": {\n" - + " \"startRow\": \"\\\\x00\\\\x00\\\\x00{\",\n" + " \"targetReplicaId\": -1,\n" - + " \"batch\": -1,\n" + " \"totalColumns\": 0,\n" + " \"maxResultSize\": -1,\n" - + " \"families\": {},\n" + " \"priority\": -1,\n" + " \"caching\": -1,\n" - + " \"includeStopRow\": false,\n" + " \"consistency\": \"STRONG\",\n" - + " \"maxVersions\": 1,\n" + " \"storeOffset\": 0,\n" + " \"mvccReadPoint\": -1,\n" - + " \"includeStartRow\": true,\n" + " \"needCursorResult\": false,\n" - + " \"stopRow\": \"\\\\x00\\\\x00\\\\x01\\\\xC8\",\n" + " \"storeLimit\": -1,\n" - + " \"limit\": -1,\n" + " \"cacheBlocks\": true,\n" - + " \"readType\": \"DEFAULT\",\n" + " \"allowPartialResults\": false,\n" - + " \"reversed\": false,\n" + " \"timeRange\": [\n" + " 0,\n" - + " 9223372036854775807\n" + " ]\n" + " }\n" + "}"; + String expectedOutput = "{\n" + " \"startTime\": 1,\n" + " \"processingTime\": 2,\n" + + " \"queueTime\": 3,\n" + " \"responseSize\": 4,\n" + " \"blockBytesScanned\": 5,\n" + + " \"fsReadTime\": 6,\n" + " \"multiGetsCount\": 6,\n" + " \"multiMutationsCount\": 7,\n" + + " \"scan\": {\n" + " \"totalColumns\": 0,\n" + " \"maxResultSize\": -1,\n" + + " \"caching\": -1,\n" + " \"includeStopRow\": false,\n" + + " \"consistency\": \"STRONG\",\n" + " \"maxVersions\": 1,\n" + + " \"mvccReadPoint\": -1,\n" + " \"includeStartRow\": true,\n" + + " \"stopRow\": \"\\\\x00\\\\x00\\\\x01\\\\xC8\",\n" + " \"limit\": -1,\n" + + " \"timeRange\": [\n" + " 0,\n" + " 9223372036854775807\n" + " ],\n" + + " \"startRow\": \"\\\\x00\\\\x00\\\\x00{\",\n" + " \"targetReplicaId\": -1,\n" + + " \"batch\": -1,\n" + " \"families\": {},\n" + " \"priority\": -1,\n" + + " \"storeOffset\": 0,\n" + " \"queryMetricsEnabled\": false,\n" + + " \"needCursorResult\": false,\n" + " \"storeLimit\": -1,\n" + + " \"cacheBlocks\": true,\n" + " \"readType\": \"DEFAULT\",\n" + + " \"allowPartialResults\": false,\n" + " \"reversed\": false\n" + " }\n" + "}"; OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, 6, null, null, null, null, null, null, null, 6, 7, 0, scan, Collections.emptyMap(), Collections.emptyMap()); String actualOutput = o.toJsonPrettyPrint(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java index 4b124c68f862..40b2263afb4a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScan.java @@ -76,7 +76,8 @@ public void testGetToScan() throws Exception { .setAttribute("att_v0", Bytes.toBytes("att_v0")) .setColumnFamilyTimeRange(Bytes.toBytes("cf"), 0, 123).setReplicaId(3) .setACL("test_user", new Permission(Permission.Action.READ)) - .setAuthorizations(new Authorizations("test_label")).setPriority(3); + .setAuthorizations(new Authorizations("test_label")).setQueryMetricsEnabled(true) + .setPriority(3); Scan scan = new Scan(get); assertEquals(get.getCacheBlocks(), scan.getCacheBlocks()); @@ -100,6 +101,7 @@ public void testGetToScan() throws Exception { assertEquals(get.getACL(), scan.getACL()); assertEquals(get.getAuthorizations().getLabels(), scan.getAuthorizations().getLabels()); assertEquals(get.getPriority(), scan.getPriority()); + assertEquals(get.isQueryMetricsEnabled(), scan.isQueryMetricsEnabled()); } @Test @@ -216,7 +218,7 @@ public void testScanCopyConstructor() throws Exception { .setReplicaId(3).setReversed(true).setRowOffsetPerColumnFamily(5) .setStartStopRowForPrefixScan(Bytes.toBytes("row_")).setScanMetricsEnabled(true) .setReadType(ReadType.STREAM).withStartRow(Bytes.toBytes("row_1")) - .withStopRow(Bytes.toBytes("row_2")).setTimeRange(0, 13); + .withStopRow(Bytes.toBytes("row_2")).setTimeRange(0, 13).setQueryMetricsEnabled(true); // create a copy of existing scan object Scan scanCopy = new Scan(scan); @@ -252,6 +254,7 @@ public void testScanCopyConstructor() throws Exception { assertEquals(scan.getStartRow(), scanCopy.getStartRow()); assertEquals(scan.getStopRow(), scanCopy.getStopRow()); assertEquals(scan.getTimeRange(), scanCopy.getTimeRange()); + assertEquals(scan.isQueryMetricsEnabled(), scanCopy.isQueryMetricsEnabled()); assertTrue("Make sure copy constructor adds all the fields in the copied object", EqualsBuilder.reflectionEquals(scan, scanCopy)); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java index 02476ad1b961..57d7a7a03860 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -132,6 +132,7 @@ public void testGet() throws IOException { getBuilder.setMaxVersions(1); getBuilder.setCacheBlocks(true); getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); + getBuilder.setQueryMetricsEnabled(false); Get get = ProtobufUtil.toGet(proto); assertEquals(getBuilder.build(), ProtobufUtil.toGet(get)); } @@ -260,6 +261,7 @@ public void testScan() throws IOException { scanBuilder.setCaching(1024); scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); scanBuilder.setIncludeStopRow(false); + scanBuilder.setQueryMetricsEnabled(false); ClientProtos.Scan expectedProto = scanBuilder.build(); ClientProtos.Scan actualProto = ProtobufUtil.toScan(ProtobufUtil.toScan(expectedProto)); diff --git a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto index 8a2988abf4a6..d694bda37420 100644 --- a/hbase-protocol-shaded/src/main/protobuf/client/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/client/Client.proto @@ -90,6 +90,7 @@ message Get { optional Consistency consistency = 12 [default = STRONG]; repeated ColumnFamilyTimeRange cf_time_range = 13; optional bool load_column_families_on_demand = 14; /* DO NOT add defaults to load_column_families_on_demand. */ + optional bool query_metrics_enabled = 15 [default = false]; } message Result { @@ -117,6 +118,9 @@ message Result { // to form a complete result. The equivalent flag in o.a.h.h.client.Result is // mayHaveMoreCellsInRow. optional bool partial = 5 [default = false]; + + // Server side metrics about the result + optional QueryMetrics metrics = 6; } /** @@ -274,6 +278,7 @@ message Scan { } optional ReadType readType = 23 [default = DEFAULT]; optional bool need_cursor_result = 24 [default = false]; + optional bool query_metrics_enabled = 25 [default = false]; } /** @@ -366,6 +371,9 @@ message ScanResponse { // If the Scan need cursor, return the row key we are scanning in heartbeat message. // If the Scan doesn't need a cursor, don't set this field to reduce network IO. optional Cursor cursor = 12; + + // List of QueryMetrics that maps 1:1 to the results in the response based on index + repeated QueryMetrics query_metrics = 13; } /** @@ -458,6 +466,13 @@ message RegionAction { optional Condition condition = 4; } +/* +* Statistics about the Result's server-side metrics +*/ +message QueryMetrics { + optional uint64 block_bytes_scanned = 1; +} + /* * Statistics about the current load on the region */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 53b7e5ede377..9e1bcc2c195f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.QueryMetrics; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.Result; @@ -2567,6 +2568,7 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); } RegionScannerImpl scanner = null; + long blockBytesScannedBefore = context.getBlockBytesScanned(); try { scanner = region.getScanner(scan); scanner.next(results); @@ -2594,7 +2596,13 @@ private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCal } region.metricsUpdateForGet(); - return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + Result r = + Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + if (get.isQueryMetricsEnabled()) { + long blockBytesScanned = context.getBlockBytesScanned() - blockBytesScannedBefore; + r.setMetrics(new QueryMetrics(blockBytesScanned)); + } + return r; } private void checkBatchSizeAndLogLargeSize(MultiRequest request) throws ServiceException { @@ -3376,6 +3384,7 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); boolean limitReached = false; + long blockBytesScannedBefore = 0; while (numOfResults < maxResults) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The // batch limit is a limit on the number of cells per Result. Thus, if progress is @@ -3387,6 +3396,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan // Collect values to be returned here moreRows = scanner.nextRaw(values, scannerContext); + + long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore; + blockBytesScannedBefore = scannerContext.getBlockSizeProgress(); + if (rpcCall == null) { // When there is no RpcCallContext,copy EC to heap, then the scanner would close, // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap @@ -3425,6 +3438,12 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan } boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); + + if (request.getScan().getQueryMetricsEnabled()) { + builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder() + .setBlockBytesScanned(blockBytesScanned).build()); + } + results.add(r); numOfResults++; if (!mayHaveMoreCellsInRow && limitOfRows > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableQueryMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableQueryMetrics.java new file mode 100644 index 000000000000..1f1471103975 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableQueryMetrics.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableQueryMetrics { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableQueryMetrics.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private static final TableName TABLE_NAME = TableName.valueOf("ResultMetrics"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + private static final byte[] VALUE = Bytes.toBytes("value"); + + private static final byte[] ROW_1 = Bytes.toBytes("zzz1"); + private static final byte[] ROW_2 = Bytes.toBytes("zzz2"); + private static final byte[] ROW_3 = Bytes.toBytes("zzz3"); + + private static AsyncConnection CONN; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(3); + // Create 3 rows in the table, with rowkeys starting with "zzz*" so that + // scan are forced to hit all the regions. + try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { + table.put(Arrays.asList(new Put(ROW_1).addColumn(CF, CQ, VALUE), + new Put(ROW_2).addColumn(CF, CQ, VALUE), new Put(ROW_3).addColumn(CF, CQ, VALUE))); + } + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + CONN.getAdmin().flush(TABLE_NAME).join(); + } + + @AfterClass + public static void tearDown() throws Exception { + Closeables.close(CONN, true); + UTIL.shutdownMiniCluster(); + } + + @Test + public void itTestsGets() throws Exception { + // Test a single Get + Get g1 = new Get(ROW_1); + g1.setQueryMetricsEnabled(true); + + long bbs = getClusterBlockBytesScanned(); + Result result = CONN.getTable(TABLE_NAME).get(g1).get(); + bbs += result.getMetrics().getBlockBytesScanned(); + Assert.assertNotNull(result.getMetrics()); + Assert.assertEquals(getClusterBlockBytesScanned(), bbs); + + // Test multigets + Get g2 = new Get(ROW_2); + g2.setQueryMetricsEnabled(true); + + Get g3 = new Get(ROW_3); + g3.setQueryMetricsEnabled(true); + + List> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3)); + + for (CompletableFuture future : futures) { + result = future.join(); + Assert.assertNotNull(result.getMetrics()); + bbs += result.getMetrics().getBlockBytesScanned(); + } + + Assert.assertEquals(getClusterBlockBytesScanned(), bbs); + } + + @Test + public void itTestsDefaultGetNoMetrics() throws Exception { + // Test a single Get + Get g1 = new Get(ROW_1); + + Result result = CONN.getTable(TABLE_NAME).get(g1).get(); + Assert.assertNull(result.getMetrics()); + + // Test multigets + Get g2 = new Get(ROW_2); + Get g3 = new Get(ROW_3); + List> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3)); + futures.forEach(f -> Assert.assertNull(f.join().getMetrics())); + + } + + @Test + public void itTestsScans() { + Scan scan = new Scan(); + scan.setQueryMetricsEnabled(true); + + long bbs = getClusterBlockBytesScanned(); + try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) { + for (Result result : scanner) { + Assert.assertNotNull(result.getMetrics()); + bbs += result.getMetrics().getBlockBytesScanned(); + Assert.assertEquals(getClusterBlockBytesScanned(), bbs); + } + } + } + + @Test + public void itTestsDefaultScanNoMetrics() { + Scan scan = new Scan(); + + try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) { + for (Result result : scanner) { + Assert.assertNull(result.getMetrics()); + } + } + } + + private static long getClusterBlockBytesScanned() { + long bbs = 0L; + + for (JVMClusterUtil.RegionServerThread rs : UTIL.getHBaseCluster().getRegionServerThreads()) { + MetricsRegionServer metrics = rs.getRegionServer().getMetrics(); + MetricsRegionServerSourceImpl source = + (MetricsRegionServerSourceImpl) metrics.getMetricsSource(); + + bbs += source.getMetricsRegistry() + .getCounter(MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY, 0L).value(); + } + + return bbs; + } +}