Skip to content

Commit 4bb0809

Browse files
HIVE-29188: [hiveACIDReplication] Add src and tgt commit time in replication metrics for better monitoring (#6071)
* Details: * Currently when the Hive ACID replication is running we do not have any idea of how the replication is progressing. * Even when the replication is not running, we do not have any idea how much the target is behind the src. * This commit is to add this info in replication_metrics. Co-authored-by: shivam02 <[email protected]>
1 parent 3b3c1cf commit 4bb0809

File tree

5 files changed

+65
-2
lines changed

5 files changed

+65
-2
lines changed

ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hive.ql.parse.repl.load.message;
1919

2020
import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
21+
import org.apache.hadoop.hive.metastore.utils.StringUtils;
2122
import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
2223
import org.apache.hadoop.hive.ql.exec.Task;
2324
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -27,6 +28,8 @@
2728
import java.io.Serializable;
2829
import java.util.Collections;
2930
import java.util.List;
31+
import java.util.Optional;
32+
import java.util.stream.Collectors;
3033

3134
/**
3235
* AbortTxnHandler
@@ -43,6 +46,17 @@ public List<Task<?>> handle(Context context)
4346

4447
AbortTxnMessage msg = deserializer.getAbortTxnMessage(context.dmd.getPayload());
4548

49+
// Saving the timestamp of all write abort txn in metric 'progress' to calculate lag between src and tgt
50+
List<Long> writeIds = msg.getWriteIds();
51+
List<String> databases = Optional.ofNullable(msg.getDbsUpdated())
52+
.orElse(Collections.emptyList())
53+
.stream()
54+
.map(StringUtils::normalizeIdentifier)
55+
.toList();
56+
if (databases.contains(context.dbName) && writeIds != null && !writeIds.isEmpty()) {
57+
context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp());
58+
}
59+
4660
Task<ReplTxnWork> abortTxnTask = TaskFactory.get(
4761
new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName, null,
4862
msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec(),

ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.hadoop.fs.Path;
2121
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
2222
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
23+
import org.apache.hadoop.hive.metastore.utils.StringUtils;
2324
import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
2425
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
2526
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -33,6 +34,8 @@
3334
import java.util.ArrayList;
3435
import java.util.Collections;
3536
import java.util.List;
37+
import java.util.Optional;
38+
import java.util.stream.Collectors;
3639

3740
/**
3841
* CommitTxnHandler
@@ -54,6 +57,17 @@ public List<Task<?>> handle(Context context)
5457
String tableNamePrev = null;
5558
String tblName = null;
5659

60+
// Saving the timestamp of all write commit txn in metric 'progress' to calculate lag between src and tgt
61+
List<Long> writeIds = msg.getWriteIds();
62+
List<String> databases = Optional.ofNullable(msg.getDatabases())
63+
.orElse(Collections.emptyList())
64+
.stream()
65+
.map(StringUtils::normalizeIdentifier)
66+
.toList();
67+
if (databases.contains(dbName) && writeIds != null && !writeIds.isEmpty()) {
68+
context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp());
69+
}
70+
5771
ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName,
5872
null, msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN,
5973
context.eventOnlyReplicationSpec(), context.getDumpDirectory(),

ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,22 @@ public void reportFailoverStart(String stageName, Map<String, Long> metricMap,
136136
}
137137
}
138138

139+
public void setSrcTimeInProgress(long endTimeOnSrc) throws SemanticException {
140+
if (isEnabled) {
141+
LOG.debug("Updating last commit time on src in progress as: {}", endTimeOnSrc);
142+
Progress progress = replicationMetric.getProgress();
143+
Stage stage = progress.getStageByName("REPL_LOAD");
144+
if (stage == null) {
145+
return;
146+
}
147+
stage.setEndTimeOnSrc(endTimeOnSrc);
148+
stage.setEndTimeOnTgt(getCurrentTimeInMillis());
149+
progress.addStage(stage);
150+
replicationMetric.setProgress(progress);
151+
metricCollector.addMetric(replicationMetric);
152+
}
153+
}
154+
139155
public void reportStageEnd(String stageName, Status status, long lastReplId,
140156
SnapshotUtils.ReplSnapshotCount replSnapshotCount, ReplStatsTracker replStatsTracker) throws SemanticException {
141157
unRegisterMBeanSafe();

ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class Stage {
3232
private Status status;
3333
private long startTime;
3434
private long endTime;
35+
private long endTimeOnSrc;
36+
private long endTimeOnTgt;
3537
private Map<String, Metric> metrics = new HashMap<>();
3638
private String errorLogPath;
3739
private SnapshotUtils.ReplSnapshotCount replSnapshotCount = new SnapshotUtils.ReplSnapshotCount();
@@ -58,6 +60,8 @@ public Stage(Stage stage) {
5860
this.errorLogPath = stage.errorLogPath;
5961
this.replSnapshotCount = stage.replSnapshotCount;
6062
this.replStats = stage.replStats;
63+
this.endTimeOnSrc = stage.endTimeOnSrc;
64+
this.endTimeOnTgt = stage.endTimeOnTgt;
6165
}
6266

6367
public String getName() {
@@ -92,6 +96,21 @@ public void setEndTime(long endTime) {
9296
this.endTime = endTime;
9397
}
9498

99+
public long getEndTimeOnSrc() {
100+
return endTimeOnSrc;
101+
}
102+
103+
public void setEndTimeOnSrc(long endTimeOnSrc) {
104+
this.endTimeOnSrc = endTimeOnSrc;
105+
}
106+
107+
public long getEndTimeOnTgt() {
108+
return endTimeOnTgt;
109+
}
110+
111+
public void setEndTimeOnTgt(long endTimeOnTgt) {
112+
this.endTimeOnTgt = endTimeOnTgt;
113+
}
95114

96115
public void addMetric(Metric metric) {
97116
this.metrics.put(metric.getName(), metric);

ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
9292
POSTHOOK: Input: sys@replication_metrics
9393
POSTHOOK: Input: sys@replication_metrics_orig
9494
#### A masked pattern was here ####
95-
repl1 1 {"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA {"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":null,"replStats":null}]} gzip(json-2.0)
96-
repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA= {"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"endTime":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":{"numCreated":0,"numDeleted":0},"replStats":null}]} gzip(json-2.0)
95+
repl1 1 {"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22PsQ6CMBRF/6Uzg6xsWjExQSC2TIaYBhsgKS15fZ1I/90iwYi6tefe0/c6EYsCnSUJYRWlKWMkmlErA7pNRItBhuyaltn9WF3KJf0jAPJ+ru4iIvXj+1xoBs0W8BZfYJAIfbOZdqpyys9FPj/dOACpkRqnlz4aFGq9+ugt8f0hS3+NeGvEvg47ABjITFsK7EiinVIRATkqpsVoO7OqH0H4sl2Ar/0T5NGeBTQBAAA= {"status":"SUCCESS","stages":[{"name":"REPL_DUMP","status":"SUCCESS","startTime":0,"endTime":0,"endTimeOnSrc":0,"endTimeOnTgt":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":null,"replStats":null}]} gzip(json-2.0)
96+
repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22QQQuDMAyF/0vPHubV21YdDERl1dOQUVzQQW0lpifpf1/VOXDbLe8l30vIxEaSZEcWMVFxngjBgtlqwVu3iWnZg+9dkyK9p/kxXrt/AKTyOY8eAgb68V3nWmCzN8qWFqMHwmez23auMl5e8myObiwiaOLG6nWeDEm1SRd8oPJ4SpNfItwToav9DYgGU9MWkjoWaatUwBAGJbQcxs5sqI+2PUeQBI9ltZcxKFilezP+G+Ma4mr3Atju2TJPAQAA {"status":"SUCCESS","stages":[{"name":"REPL_LOAD","status":"SUCCESS","startTime":0,"endTime":0,"endTimeOnSrc":0,"endTimeOnTgt":0,"metrics":[{"name":"FUNCTIONS","currentCount":0,"totalCount":0},{"name":"TABLES","currentCount":1,"totalCount":1}],"errorLogPath":null,"replSnapshotCount":{"numCreated":0,"numDeleted":0},"replStats":null}]} gzip(json-2.0)

0 commit comments

Comments
 (0)