Skip to content

Commit c18b712

Browse files
authored
HBASE-29494: Capture Scan RPC processing time and queuing time in Scan Metrics (#7242)
Signed-off-by: Viraj Jasani <[email protected]> Signed-off-by: Hari Krishna Dara <[email protected]>
1 parent 494cbe4 commit c18b712

File tree

4 files changed

+121
-24
lines changed

4 files changed

+121
-24
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public void moveToNextRegion() {
5656
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
5757
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
5858
currentRegionScanMetricsData.createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
59+
currentRegionScanMetricsData.createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
60+
currentRegionScanMetricsData.createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
5961
}
6062

6163
/**
@@ -77,6 +79,8 @@ protected AtomicLong createCounter(String counterName) {
7779
"BYTES_READ_FROM_BLOCK_CACHE";
7880
public static final String BYTES_READ_FROM_MEMSTORE_METRIC_NAME = "BYTES_READ_FROM_MEMSTORE";
7981
public static final String BLOCK_READ_OPS_COUNT_METRIC_NAME = "BLOCK_READ_OPS_COUNT";
82+
public static final String RPC_SCAN_PROCESSING_TIME_METRIC_NAME = "RPC_SCAN_PROCESSING_TIME";
83+
public static final String RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME = "RPC_SCAN_QUEUE_WAIT_TIME";
8084

8185
/**
8286
* number of rows filtered during scan RPC
@@ -105,6 +109,12 @@ protected AtomicLong createCounter(String counterName) {
105109

106110
public final AtomicLong blockReadOpsCount = createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
107111

112+
public final AtomicLong rpcScanProcessingTime =
113+
createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
114+
115+
public final AtomicLong rpcScanQueueWaitTime =
116+
createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
117+
108118
/**
109119
* Sets counter with counterName to passed in value, does nothing if counter does not exist. If
110120
* region level scan metrics are enabled then sets the value of counter for the current region

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3316,7 +3316,8 @@ private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean mo
33163316
// return whether we have more results in region.
33173317
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
33183318
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3319-
ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
3319+
ScanResponse.Builder builder, RpcCall rpcCall, ServerSideScanMetrics scanMetrics)
3320+
throws IOException {
33203321
HRegion region = rsh.r;
33213322
RegionScanner scanner = rsh.s;
33223323
long maxResultSize;
@@ -3369,8 +3370,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
33693370
final LimitScope timeScope =
33703371
allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
33713372

3372-
boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3373-
33743373
// Configure with limits for this RPC. Set keep progress true since size progress
33753374
// towards size limit should be kept between calls to nextRaw
33763375
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
@@ -3392,7 +3391,8 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
33923391
contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
33933392
contextBuilder.setBatchLimit(scanner.getBatch());
33943393
contextBuilder.setTimeLimit(timeScope, timeLimit);
3395-
contextBuilder.setTrackMetrics(trackMetrics);
3394+
contextBuilder.setTrackMetrics(scanMetrics != null);
3395+
contextBuilder.setScanMetrics(scanMetrics);
33963396
ScannerContext scannerContext = contextBuilder.build();
33973397
boolean limitReached = false;
33983398
long blockBytesScannedBefore = 0;
@@ -3514,27 +3514,15 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
35143514
builder.setMoreResultsInRegion(moreRows);
35153515
// Check to see if the client requested that we track metrics server side. If the
35163516
// client requested metrics, retrieve the metrics from the scanner context.
3517-
if (trackMetrics) {
3517+
if (scanMetrics != null) {
35183518
// rather than increment yet another counter in StoreScanner, just set the value here
35193519
// from block size progress before writing into the response
3520-
scannerContext.getMetrics().setCounter(
3521-
ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
3520+
scanMetrics.setCounter(ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
35223521
scannerContext.getBlockSizeProgress());
35233522
if (rpcCall != null) {
3524-
scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
3523+
scanMetrics.setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
35253524
rpcCall.getFsReadTime());
35263525
}
3527-
Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
3528-
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3529-
NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3530-
3531-
for (Entry<String, Long> entry : metrics.entrySet()) {
3532-
pairBuilder.setName(entry.getKey());
3533-
pairBuilder.setValue(entry.getValue());
3534-
metricBuilder.addMetrics(pairBuilder.build());
3535-
}
3536-
3537-
builder.setScanMetrics(metricBuilder.build());
35383526
}
35393527
}
35403528
} finally {
@@ -3671,6 +3659,8 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36713659
boolean scannerClosed = false;
36723660
try {
36733661
List<Result> results = new ArrayList<>(Math.min(rows, 512));
3662+
boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
3663+
ServerSideScanMetrics scanMetrics = trackMetrics ? new ServerSideScanMetrics() : null;
36743664
if (rows > 0) {
36753665
boolean done = false;
36763666
// Call coprocessor. Get region info from scanner.
@@ -3690,7 +3680,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36903680
}
36913681
if (!done) {
36923682
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3693-
results, builder, rpcCall);
3683+
results, builder, rpcCall, scanMetrics);
36943684
} else {
36953685
builder.setMoreResultsInRegion(!results.isEmpty());
36963686
}
@@ -3742,6 +3732,28 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
37423732
throw new TimeoutIOException("Client deadline exceeded, cannot return results");
37433733
}
37443734

3735+
if (scanMetrics != null) {
3736+
if (rpcCall != null) {
3737+
long rpcScanTime = EnvironmentEdgeManager.currentTime() - rpcCall.getStartTime();
3738+
long rpcQueueWaitTime = rpcCall.getStartTime() - rpcCall.getReceiveTime();
3739+
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME,
3740+
rpcScanTime);
3741+
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
3742+
rpcQueueWaitTime);
3743+
}
3744+
Map<String, Long> metrics = scanMetrics.getMetricsMap();
3745+
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
3746+
NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
3747+
3748+
for (Entry<String, Long> entry : metrics.entrySet()) {
3749+
pairBuilder.setName(entry.getKey());
3750+
pairBuilder.setValue(entry.getValue());
3751+
metricBuilder.addMetrics(pairBuilder.build());
3752+
}
3753+
3754+
builder.setScanMetrics(metricBuilder.build());
3755+
}
3756+
37453757
return builder.build();
37463758
} catch (IOException e) {
37473759
try {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ public class ScannerContext {
124124
final ServerSideScanMetrics metrics;
125125

126126
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
127+
this(keepProgress, limitsToCopy, trackMetrics, null);
128+
}
129+
130+
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics,
131+
ServerSideScanMetrics scanMetrics) {
127132
this.limits = new LimitFields();
128133
if (limitsToCopy != null) {
129134
this.limits.copy(limitsToCopy);
@@ -134,7 +139,8 @@ public class ScannerContext {
134139

135140
this.keepProgress = keepProgress;
136141
this.scannerState = DEFAULT_STATE;
137-
this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
142+
this.metrics =
143+
trackMetrics ? (scanMetrics != null ? scanMetrics : new ServerSideScanMetrics()) : null;
138144
}
139145

140146
public boolean isTrackingMetrics() {
@@ -417,6 +423,7 @@ public static final class Builder {
417423
boolean keepProgress = DEFAULT_KEEP_PROGRESS;
418424
boolean trackMetrics = false;
419425
LimitFields limits = new LimitFields();
426+
ServerSideScanMetrics scanMetrics = null;
420427

421428
private Builder() {
422429
}
@@ -455,8 +462,13 @@ public Builder setBatchLimit(int batchLimit) {
455462
return this;
456463
}
457464

465+
public Builder setScanMetrics(ServerSideScanMetrics scanMetrics) {
466+
this.scanMetrics = scanMetrics;
467+
return this;
468+
}
469+
458470
public ScannerContext build() {
459-
return new ScannerContext(keepProgress, limits, trackMetrics);
471+
return new ScannerContext(keepProgress, limits, trackMetrics, scanMetrics);
460472
}
461473
}
462474

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
2222
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
2323
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
24+
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME;
25+
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME;
2426
import static org.junit.Assert.assertEquals;
2527

2628
import java.io.IOException;
@@ -34,16 +36,20 @@
3436
import java.util.concurrent.CountDownLatch;
3537
import java.util.concurrent.Executors;
3638
import java.util.concurrent.ThreadPoolExecutor;
39+
import java.util.concurrent.TimeUnit;
3740
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.concurrent.atomic.AtomicLong;
42+
import org.apache.hadoop.conf.Configuration;
3843
import org.apache.hadoop.hbase.HBaseClassTestRule;
3944
import org.apache.hadoop.hbase.HBaseTestingUtil;
45+
import org.apache.hadoop.hbase.HConstants;
4046
import org.apache.hadoop.hbase.HRegionLocation;
4147
import org.apache.hadoop.hbase.ServerName;
4248
import org.apache.hadoop.hbase.TableName;
4349
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
4450
import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
4551
import org.apache.hadoop.hbase.testclassification.ClientTests;
46-
import org.apache.hadoop.hbase.testclassification.MediumTests;
52+
import org.apache.hadoop.hbase.testclassification.LargeTests;
4753
import org.apache.hadoop.hbase.util.Bytes;
4854
import org.apache.hadoop.hbase.util.FutureUtils;
4955
import org.junit.AfterClass;
@@ -55,7 +61,7 @@
5561
import org.junit.runners.Parameterized.Parameter;
5662
import org.junit.runners.Parameterized.Parameters;
5763

58-
@Category({ ClientTests.class, MediumTests.class })
64+
@Category({ ClientTests.class, LargeTests.class })
5965
public class TestTableScanMetrics extends FromClientSideBase {
6066
@ClassRule
6167
public static final HBaseClassTestRule CLASS_RULE =
@@ -330,6 +336,8 @@ public void run() {
330336
.entrySet()) {
331337
ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
332338
Map<String, Long> metricsMap = entry.getValue();
339+
metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
340+
metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
333341
Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
334342
Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
335343
Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
@@ -347,6 +355,59 @@ public void run() {
347355
}
348356
}
349357

358+
@Test
359+
public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception {
360+
final int numThreads = 20;
361+
Configuration conf = TEST_UTIL.getConfiguration();
362+
// Handler count is 3 by default.
363+
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
364+
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
365+
// Keep the number of threads to be high enough for RPC calls to queue up. For now going with 6
366+
// times the handler count.
367+
Assert.assertTrue(numThreads > 6 * handlerCount);
368+
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
369+
TableName tableName = TableName.valueOf(
370+
TestTableScanMetrics.class.getSimpleName() + "_testRPCCallProcessingAndQueueWaitTimeMetrics");
371+
AtomicLong totalScanRpcTime = new AtomicLong(0);
372+
AtomicLong totalQueueWaitTime = new AtomicLong(0);
373+
CountDownLatch latch = new CountDownLatch(numThreads);
374+
try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
375+
TEST_UTIL.loadTable(table, CF);
376+
for (int i = 0; i < numThreads; i++) {
377+
executor.execute(new Runnable() {
378+
@Override
379+
public void run() {
380+
try {
381+
Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
382+
scan.setEnableScanMetricsByRegion(true);
383+
scan.setCaching(2);
384+
try (ResultScanner rs = table.getScanner(scan)) {
385+
Result r;
386+
while ((r = rs.next()) != null) {
387+
Assert.assertFalse(r.isEmpty());
388+
}
389+
ScanMetrics scanMetrics = rs.getScanMetrics();
390+
Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
391+
totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME));
392+
totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME));
393+
}
394+
latch.countDown();
395+
} catch (IOException e) {
396+
throw new RuntimeException(e);
397+
}
398+
}
399+
});
400+
}
401+
latch.await();
402+
executor.shutdown();
403+
executor.awaitTermination(10, TimeUnit.SECONDS);
404+
Assert.assertTrue(totalScanRpcTime.get() > 0);
405+
Assert.assertTrue(totalQueueWaitTime.get() > 0);
406+
} finally {
407+
TEST_UTIL.deleteTable(tableName);
408+
}
409+
}
410+
350411
@Test
351412
public void testScanMetricsByRegionWithRegionMove() throws Exception {
352413
TableName tableName = TableName.valueOf(
@@ -578,6 +639,8 @@ private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Lon
578639
for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) {
579640
ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
580641
Map<String, Long> metricsMap = entry.getValue();
642+
metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
643+
metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
581644
if (dstMap.containsKey(scanMetricsRegionInfo)) {
582645
Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
583646
for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {

0 commit comments

Comments
 (0)