From cf497862ef5ff0dd67cae55d238b70aa0fe13ed4 Mon Sep 17 00:00:00 2001
From: Palash Chauhan <p.chauhan@pchauha-ltm8owy.internal.salesforce.com>
Date: Wed, 15 Jan 2025 11:42:04 -0800
Subject: [PATCH 1/4] initial commit

---
 .../coprocessor/PhoenixMasterObserver.java    | 212 +++++++++++++-----
 1 file changed, 160 insertions(+), 52 deletions(-)

diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index f51cf7c154e..e2dbc3c175e 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -17,14 +17,17 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -60,10 +63,14 @@ public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor
     private static final String PARTITION_UPSERT_SQL
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,?,?,?,?,?)";
 
-    private static final String PARENT_PARTITION_QUERY
+    private static final String PARENT_PARTITION_QUERY_FOR_SPLIT
             = "SELECT PARTITION_ID, PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
             + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
 
+    private static final String PARENT_PARTITION_QUERY_FOR_MERGE
+            = "SELECT PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
+            + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_ID = ?";
+
     private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL
             = "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, STREAM_NAME, PARTITION_ID, "
             + "PARENT_PARTITION_ID, PARTITION_END_TIME) VALUES (?,?,?,?,?)";
@@ -74,7 +81,7 @@ public Optional<MasterObserver> getMasterObserver() {
     }
 
     /**
-     * Update parent -> daughter relationship for CDC Streams.
+     * Update parent -> daughter relationship for CDC Streams when a region splits.
      * - find parent partition id using start/end keys of daughters
      * - upsert partition metadata for the 2 daughters
      * - update the end time on the parent's partition metadata
@@ -95,20 +102,23 @@ public void postCompletedSplitRegionAction(final ObserverContext<MasterCoprocess
                         regionInfoA.getTable());
                 return;
             }
-            // find streamName with ENABLED status
             String tableName = phoenixTable.getName().getString();
-            PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY);
-            pstmt.setString(1, tableName);
-            ResultSet rs = pstmt.executeQuery();
-            if (rs.next()) {
-                String streamName = rs.getString(1);
+            String streamName = getStreamName(conn, tableName);
+            if (streamName != null) {
                 LOGGER.info("Updating partition metadata for table={}, stream={} daughters {} {}",
                         tableName, streamName, regionInfoA.getEncodedName(), regionInfoB.getEncodedName());
-                // ancestorIDs = [parentId, grandparentId1]
-                List<String> ancestorIDs = getAncestorIds(conn, tableName, streamName, regionInfoA, regionInfoB);
-                upsertDaughterPartition(conn, tableName, streamName, ancestorIDs.get(0), regionInfoA);
-                upsertDaughterPartition(conn, tableName, streamName, ancestorIDs.get(0), regionInfoB);
-                updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, regionInfoA.getRegionId());
+                // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
+                List<String> ancestorIDs
+                        = getAncestorIdsForSplit(conn, tableName, streamName, regionInfoA, regionInfoB);
+
+                upsertDaughterPartitions(conn, tableName, streamName, ancestorIDs.subList(0, 1),
+                        Arrays.asList(regionInfoA, regionInfoB));
+
+                updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs,
+                        regionInfoA.getRegionId());
+            } else {
+                LOGGER.info("{} does not have a stream enabled, skipping partition metadata update.",
+                        regionInfoA.getTable());
             }
         } catch (SQLException e) {
             LOGGER.error("Unable to update CDC Stream Partition metadata during split with daughter regions: {} {}",
@@ -116,31 +126,66 @@ public void postCompletedSplitRegionAction(final ObserverContext<MasterCoprocess
         }
     }
 
-    private PTable getPhoenixTable(Connection conn, TableName tableName) throws SQLException {
-        PTable pTable;
-        try {
-            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
-        } catch (TableNotFoundException e) {
-            return null;
+    /**
+     * Update parent -> daughter relationship for CDC Streams when regions merge.
+     * - upsert partition metadata for the daughter with each parent
+     * - update the end time on all the parents' partition metadata
+     * @param c the environment to interact with the framework and master
+     * @param regionsToMerge parent regions which merged
+     * @param mergedRegion daughter region
+     */
+    @Override
+    public void postCompletedMergeRegionsAction(final ObserverContext<MasterCoprocessorEnvironment> c,
+                                                final RegionInfo[] regionsToMerge,
+                                                final RegionInfo mergedRegion) {
+        Configuration conf = c.getEnvironment().getConfiguration();
+        try (Connection conn  = QueryUtil.getConnectionOnServer(conf)) {
+            // CDC will be enabled on Phoenix tables only
+            PTable phoenixTable = getPhoenixTable(conn, mergedRegion.getTable());
+            if (phoenixTable == null) {
+                LOGGER.info("{} is not a Phoenix Table, skipping partition metadata update.",
+                        mergedRegion.getTable());
+                return;
+            }
+            String tableName = phoenixTable.getName().getString();
+            String streamName = getStreamName(conn, tableName);
+            if (streamName != null) {
+                // upsert a row for daughter-parent for each merged region
+                upsertDaughterPartitions(conn, tableName, streamName,
+                        Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
+                        Arrays.asList(mergedRegion));
+
+                // lookup all ancestors of a merged region and update the endTime
+                for (RegionInfo ri : regionsToMerge) {
+                    List<String> ancestorIDs = getAncestorIdsForMerge(conn, tableName, streamName, ri);
+                    updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs,
+                            mergedRegion.getRegionId());
+                }
+            } else {
+                LOGGER.info("{} does not have a stream enabled, skipping partition metadata update.",
+                        mergedRegion.getTable());
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Unable to update CDC Stream Partition metadata during merge with " +
+                            "parent regions: {} and daughter region {}",
+                    regionsToMerge, mergedRegion.getEncodedName(), e);
         }
-        return pTable;
     }
 
     /**
-     * Lookup parent's partition id (region's encoded name) in SYSTEM.CDC_STREAM.
+     * Lookup a split parent's partition id (region's encoded name) in SYSTEM.CDC_STREAM.
      * RegionInfoA is left daughter and RegionInfoB is right daughter so parent's key range would
      * be [RegionInfoA startKey, RegionInfoB endKey]
-     * Return both parent and grandparent partition ids.
+     * Return parent and all grandparent partition ids.
      *
-     * TODO: When we implement merges in this coproc, there could be multiple grandparents.
      */
-    private List<String> getAncestorIds(Connection conn, String tableName, String streamName,
+    private List<String> getAncestorIdsForSplit(Connection conn, String tableName, String streamName,
                                         RegionInfo regionInfoA, RegionInfo regionInfoB)
             throws SQLException {
         byte[] parentStartKey = regionInfoA.getStartKey();
         byte[] parentEndKey = regionInfoB.getEndKey();
 
-        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY);
+        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY_FOR_SPLIT);
         if (parentStartKey.length == 0) {
             qb.append(" AND PARTITION_START_KEY IS NULL ");
         } else {
@@ -173,50 +218,113 @@ private List<String> getAncestorIds(Connection conn, String tableName, String st
                     Bytes.toStringBinary(regionInfoB.getStartKey()),
                     Bytes.toStringBinary(regionInfoB.getEndKey())));
         }
+        // if parent was a result of a merge, there will be multiple grandparents.
+        while (rs.next()) {
+            ancestorIDs.add(rs.getString(2));
+        }
         return ancestorIDs;
     }
 
     /**
-     * Insert partition metadata for a daughter region from the split.
+     * Lookup the parent of a merged region.
+     * If the merged region was an output of a merge in the past, it will have multiple parents.
      */
-    private void upsertDaughterPartition(Connection conn, String tableName,
-                                         String streamName, String parentPartitionID,
-                                         RegionInfo regionInfo)
-            throws SQLException {
-        String partitionId = regionInfo.getEncodedName();
-        long startTime = regionInfo.getRegionId();
-        byte[] startKey = regionInfo.getStartKey();
-        byte[] endKey = regionInfo.getEndKey();
-        PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+    private List<String> getAncestorIdsForMerge(Connection conn, String tableName, String streamName,
+                                                RegionInfo parent) throws SQLException {
+        List<String> ancestorIDs = new ArrayList<>();
+        ancestorIDs.add(parent.getEncodedName());
+        PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);
         pstmt.setString(1, tableName);
         pstmt.setString(2, streamName);
-        pstmt.setString(3, partitionId);
-        pstmt.setString(4, parentPartitionID);
-        pstmt.setLong(5, startTime);
-        // endTime in not set when inserting a new partition
-        pstmt.setNull(6, Types.BIGINT);
-        pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
-        pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
-        pstmt.executeUpdate();
+        pstmt.setString(3, parent.getEncodedName());
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+            ancestorIDs.add(rs.getString(1));
+        } else {
+            throw new SQLException(String.format(
+                    "Could not find parent of the provided merged region: {}", parent.getEncodedName()));
+        }
+        // if parent was a result of a merge, there will be multiple grandparents.
+        while (rs.next()) {
+            ancestorIDs.add(rs.getString(1));
+        }
+        return ancestorIDs;
+    }
+
+    /**
+     * Insert partition metadata for a daughter region from a split or a merge.
+     * split: 2 daughters, 1 parent
+     * merge: 1 daughter, N parents
+     */
+    private void upsertDaughterPartitions(Connection conn, String tableName,
+                                         String streamName, List<String> parentPartitionIDs,
+                                         List<RegionInfo> daughters)
+            throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
+        for (RegionInfo daughter : daughters) {
+            for (String parentPartitionID : parentPartitionIDs) {
+                String partitionId = daughter.getEncodedName();
+                long startTime = daughter.getRegionId();
+                byte[] startKey = daughter.getStartKey();
+                byte[] endKey = daughter.getEndKey();
+                pstmt.setString(1, tableName);
+                pstmt.setString(2, streamName);
+                pstmt.setString(3, partitionId);
+                pstmt.setString(4, parentPartitionID);
+                pstmt.setLong(5, startTime);
+                // endTime in not set when inserting a new partition
+                pstmt.setNull(6, Types.BIGINT);
+                pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
+                pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
+                pstmt.executeUpdate();
+            }
+        }
         conn.commit();
     }
 
     /**
      * Update endTime in all rows of parent partition by setting it to daughter's startTime.
+     * parent came from a split : there will only be one record
+     * parent came from a merge : there will be multiple rows, one per grandparent
      *
-     * TODO: When we implement merges in this coproc, update all rows of the parent.
      */
     private void updateParentPartitionEndTime(Connection conn, String tableName,
                                               String streamName, List<String> ancestorIDs,
-                                          long daughterStartTime) throws SQLException {
-        // ancestorIDs = [parentID, grandparentID]
+                                              long daughterStartTime) throws SQLException {
+        // ancestorIDs = [parentID, grandparentID1, grandparentID2...]
         PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
-        pstmt.setString(1, tableName);
-        pstmt.setString(2, streamName);
-        pstmt.setString(3, ancestorIDs.get(0));
-        pstmt.setString(4, ancestorIDs.get(1));
-        pstmt.setLong(5, daughterStartTime);
-        pstmt.executeUpdate();
+        for (int i=1; i<ancestorIDs.size(); i++) {
+            pstmt.setString(1, tableName);
+            pstmt.setString(2, streamName);
+            pstmt.setString(3, ancestorIDs.get(0));
+            pstmt.setString(4, ancestorIDs.get(i));
+            pstmt.setLong(5, daughterStartTime);
+            pstmt.executeUpdate();
+        }
         conn.commit();
     }
+
+    /**
+     * Get the stream name on the given table if one exists in ENABLED state.
+     */
+    private String getStreamName(Connection conn, String tableName) throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY);
+        pstmt.setString(1, tableName);
+        ResultSet rs = pstmt.executeQuery();
+        if (rs.next()) {
+            return rs.getString(1);
+        } else {
+            return null;
+        }
+    }
+
+    private PTable getPhoenixTable(Connection conn, TableName tableName) throws SQLException {
+        PTable pTable;
+        try {
+            pTable = PhoenixRuntime.getTable(conn, tableName.toString());
+        } catch (TableNotFoundException e) {
+            return null;
+        }
+        return pTable;
+    }
 }

From 7d1e17dec1e4b41e6345d13a487f8b0db6847f2e Mon Sep 17 00:00:00 2001
From: Palash Chauhan <p.chauhan@pchauha-ltm8owy.internal.salesforce.com>
Date: Fri, 17 Jan 2025 16:44:32 -0800
Subject: [PATCH 2/4] add tests

---
 .../coprocessor/PhoenixMasterObserver.java    |   7 +-
 .../apache/phoenix/end2end/CDCStreamIT.java   | 206 ++++++++++++++----
 .../org/apache/phoenix/util/TestUtil.java     |  83 ++++++-
 3 files changed, 248 insertions(+), 48 deletions(-)

diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index e2dbc3c175e..3f06f8e1216 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -65,7 +64,7 @@ public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor
 
     private static final String PARENT_PARTITION_QUERY_FOR_SPLIT
             = "SELECT PARTITION_ID, PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
-            + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? ";
+            + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_END_TIME IS NULL ";
 
     private static final String PARENT_PARTITION_QUERY_FOR_MERGE
             = "SELECT PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME
@@ -105,7 +104,7 @@ public void postCompletedSplitRegionAction(final ObserverContext<MasterCoprocess
             String tableName = phoenixTable.getName().getString();
             String streamName = getStreamName(conn, tableName);
             if (streamName != null) {
-                LOGGER.info("Updating partition metadata for table={}, stream={} daughters {} {}",
+                LOGGER.info("Updating split partition metadata for table={}, stream={} daughters {} {}",
                         tableName, streamName, regionInfoA.getEncodedName(), regionInfoB.getEncodedName());
                 // ancestorIDs = [parentId, grandparentId1, grandparentId2...]
                 List<String> ancestorIDs
@@ -150,6 +149,8 @@ public void postCompletedMergeRegionsAction(final ObserverContext<MasterCoproces
             String tableName = phoenixTable.getName().getString();
             String streamName = getStreamName(conn, tableName);
             if (streamName != null) {
+                LOGGER.info("Updating merged partition metadata for table={}, stream={} daughter {}",
+                        tableName, streamName, mergedRegion.getEncodedName());
                 // upsert a row for daughter-parent for each merged region
                 upsertDaughterPartitions(conn, tableName, streamName,
                         Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()),
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index d70821bb94c..98dd22f5350 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,13 +18,8 @@
 
 package org.apache.phoenix.end2end;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
@@ -33,12 +28,12 @@
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -49,8 +44,12 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
@@ -172,7 +171,7 @@ public void testPartitionMetadataTableWithSingleRegionSplits() throws Exception
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region somewhere in the middle
-        splitTable(conn, tableName, Bytes.toBytes("m"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("m"));
 
         //check partition metadata - daughter regions are inserted and parent's end time is updated.
         ResultSet rs = conn.createStatement().executeQuery(
@@ -209,9 +208,9 @@ public void testPartitionMetadataFirstRegionSplits() throws Exception {
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region [null, null]
-        splitTable(conn, tableName, Bytes.toBytes("l"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
         // we have 2 regions - [null, l], [l, null], split the first region
-        splitTable(conn, tableName, Bytes.toBytes("d"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
         ResultSet rs = conn.createStatement().executeQuery(
                 "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
         PartitionMetadata grandparent = null, splitParent = null, unSplitParent = null;
@@ -253,9 +252,9 @@ public void testPartitionMetadataLastRegionSplits() throws Exception {
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region [null, null]
-        splitTable(conn, tableName, Bytes.toBytes("l"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
         // we have 2 regions - [null, l], [l, null], split the second region
-        splitTable(conn, tableName, Bytes.toBytes("q"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
         ResultSet rs = conn.createStatement().executeQuery(
                 "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
         PartitionMetadata grandparent = null, splitParent = null, unSplitParent = null;
@@ -297,11 +296,11 @@ public void testPartitionMetadataMiddleRegionSplits() throws Exception {
         createTableAndEnableCDC(conn, tableName);
 
         //split the only region [null, null]
-        splitTable(conn, tableName, Bytes.toBytes("d"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
         // we have 2 regions - [null, d], [d, null], split the second region
-        splitTable(conn, tableName, Bytes.toBytes("q"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
         // we have 3 regions - [null, d], [d, q], [q, null], split the second region
-        splitTable(conn, tableName, Bytes.toBytes("j"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("j"));
         // [null, d], [d, j], [j, q], [q, null]
         ResultSet rs = conn.createStatement().executeQuery(
                 "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
@@ -324,6 +323,160 @@ public void testPartitionMetadataMiddleRegionSplits() throws Exception {
         assertTrue(daughters.stream().anyMatch(d -> d.startKey[0] == 'j' && d.endKey[0] == 'q'));
     }
 
+    /**
+     * Test split of a region which came from a merge.
+     */
+    @Test
+    public void testPartitionMetadataMergedRegionSplits() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
+
+        //merge the 2 regions
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName);
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+
+        //split again
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+
+        //verify partition metadata
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
+        List<PartitionMetadata> mergedParent = new ArrayList<>();
+        List<PartitionMetadata> splitDaughters = new ArrayList<>();
+
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.startKey == null && pm.endKey == null && pm.parentPartitionId != null) {
+                mergedParent.add(pm);
+            }
+            if (pm.endTime == 0) {
+                splitDaughters.add(pm);
+            }
+        }
+        assertEquals(2, mergedParent.size());
+        assertEquals(2, splitDaughters.size());
+        assertEquals(mergedParent.get(0).partitionId, mergedParent.get(1).partitionId);
+        assertEquals(mergedParent.get(0).partitionId, splitDaughters.get(0).parentPartitionId);
+        assertEquals(mergedParent.get(0).partitionId, splitDaughters.get(1).parentPartitionId);
+        assertEquals(splitDaughters.get(0).startTime, splitDaughters.get(1).startTime);
+        assertEquals(splitDaughters.get(0).startTime, mergedParent.get(0).endTime);
+        assertEquals(splitDaughters.get(0).startTime, mergedParent.get(1).endTime);
+    }
+
+
+
+    /**
+     * Test merge of 2 regions which came from a split.
+     */
+    @Test
+    public void testPartitionMetadataSplitRegionsMerge() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        //split the only region
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+
+        //merge the 2 regions
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName);
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                                                    .map(HRegionLocation::getRegion)
+                                                    .map(RegionInfo::getEncodedName)
+                                                    .collect(Collectors.toList()));
+
+        //verify partition metadata
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
+
+        List<PartitionMetadata> splitParents = new ArrayList<>();
+        List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.startKey == null && pm.endKey == null && pm.endTime == 0) {
+                mergedDaughter.add(pm);
+            }
+            if (pm.startKey != null || pm.endKey != null) {
+                splitParents.add(pm);
+            }
+        }
+        assertEquals(2, mergedDaughter.size());
+        assertEquals(2, splitParents.size());
+        assertEquals(mergedDaughter.get(0).startTime, mergedDaughter.get(1).startTime);
+        assertEquals(mergedDaughter.get(0).endTime, mergedDaughter.get(1).endTime);
+        assertEquals(mergedDaughter.get(0).partitionId, mergedDaughter.get(1).partitionId);
+        assertTrue(mergedDaughter.stream().anyMatch(d -> Objects.equals(d.parentPartitionId, splitParents.get(0).partitionId)));
+        assertTrue(mergedDaughter.stream().anyMatch(d -> Objects.equals(d.parentPartitionId, splitParents.get(1).partitionId)));
+        for (PartitionMetadata splitDaughter : splitParents) {
+            Assert.assertEquals(mergedDaughter.get(0).startTime, splitDaughter.endTime);
+        }
+    }
+
+    /**
+     * Test merge of 2 regions which came from different merges.
+     */
+    @Test
+    public void testPartitionMetadataMergedRegionsMerge() throws Exception {
+        // create table, cdc and bootstrap stream metadata
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        // split the only region
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("l"));
+        // split both regions
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("d"));
+        TestUtil.splitTable(conn, tableName, Bytes.toBytes("q"));
+        // merge first two and last two regions
+        List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName);
+        TestUtil.mergeTableRegions(conn, tableName, regions.subList(0,2).stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+        TestUtil.mergeTableRegions(conn, tableName, regions.subList(2,4).stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+        // merge the two regions
+        regions = TestUtil.getAllTableRegions(conn, tableName);
+        TestUtil.mergeTableRegions(conn, tableName, regions.stream()
+                .map(HRegionLocation::getRegion)
+                .map(RegionInfo::getEncodedName)
+                .collect(Collectors.toList()));
+
+        //verify partition metadata
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT * FROM SYSTEM.CDC_STREAM WHERE TABLE_NAME='" + tableName + "'");
+
+        List<PartitionMetadata> mergedDaughter = new ArrayList<>();
+        List<PartitionMetadata> mergedParents = new ArrayList<>();
+        while (rs.next()) {
+            PartitionMetadata pm = new PartitionMetadata(rs);
+            if (pm.endTime == 0) {
+                mergedDaughter.add(pm);
+            }
+            // this will add extra rows, we will prune later
+            else if (pm.startKey == null || pm.endKey == null) {
+                mergedParents.add(pm);
+            }
+        }
+        assertEquals(2, mergedDaughter.size());
+        assertEquals(9, mergedParents.size());
+        assertEquals(mergedDaughter.get(0).startTime, mergedDaughter.get(1).startTime);
+        Collections.sort(mergedParents, Comparator.comparing(o -> o.endTime));
+        for (PartitionMetadata mergedParent : mergedParents.subList(mergedParents.size()-4, mergedParents.size())) {
+            assertEquals(mergedDaughter.get(0).startTime, mergedParent.endTime);
+        }
+    }
+
     private String getStreamName(Connection conn, String tableName, String cdcName) throws SQLException {
         return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, CDCUtil.getCDCCreationTimestamp(
                 conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
@@ -381,31 +534,6 @@ private void createTableAndEnableCDC(Connection conn, String tableName) throws E
         conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('z', 8, 'cat')");
     }
 
-    /**
-     * Split the table at the provided split point.
-     */
-    private void splitTable(Connection conn, String tableName, byte[] splitPoint) throws Exception {
-        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-        Admin admin = services.getAdmin();
-        Configuration configuration =
-                conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
-        org.apache.hadoop.hbase.client.Connection hbaseConn =
-                ConnectionFactory.createConnection(configuration);
-        RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
-        int nRegions = regionLocator.getAllRegionLocations().size();
-        try {
-            admin.split(TableName.valueOf(tableName), splitPoint);
-            int retryCount = 0;
-            do {
-                Thread.sleep(2000);
-                retryCount++;
-            } while (retryCount < 10 && regionLocator.getAllRegionLocations().size() == nRegions);
-            Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), nRegions);
-        } finally {
-            admin.close();
-        }
-    }
-
     /**
      * Inner class to represent partition metadata for a region i.e. single row from SYSTEM.CDC_STREAM
      */
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 6ea2a2eb656..969dc803cf7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -59,22 +59,26 @@
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.CompactionState;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.CoprocessorDescriptor;
 import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
 
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -220,7 +224,7 @@ private TestUtil() {
     public final static String ROW7 = "00B723122312312";
     public final static String ROW8 = "00B823122312312";
     public final static String ROW9 = "00C923122312312";
-    
+
     public final static String PARENTID1 = "0500x0000000001";
     public final static String PARENTID2 = "0500x0000000002";
     public final static String PARENTID3 = "0500x0000000003";
@@ -230,9 +234,9 @@ private TestUtil() {
     public final static String PARENTID7 = "0500x0000000007";
     public final static String PARENTID8 = "0500x0000000008";
     public final static String PARENTID9 = "0500x0000000009";
-    
+
     public final static List<String> PARENTIDS = Lists.newArrayList(PARENTID1, PARENTID2, PARENTID3, PARENTID4, PARENTID5, PARENTID6, PARENTID7, PARENTID8, PARENTID9);
-    
+
     public final static String ENTITYHISTID1 = "017x00000000001";
     public final static String ENTITYHISTID2 = "017x00000000002";
     public final static String ENTITYHISTID3 = "017x00000000003";
@@ -244,7 +248,7 @@ private TestUtil() {
     public final static String ENTITYHISTID9 = "017x00000000009";
 
     public final static List<String> ENTITYHISTIDS = Lists.newArrayList(ENTITYHISTID1, ENTITYHISTID2, ENTITYHISTID3, ENTITYHISTID4, ENTITYHISTID5, ENTITYHISTID6, ENTITYHISTID7, ENTITYHISTID8, ENTITYHISTID9);
-    
+
     public static final String LOCALHOST = "localhost";
     public static final String PHOENIX_JDBC_URL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
     public static final String PHOENIX_CONNECTIONLESS_JDBC_URL = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
@@ -897,7 +901,7 @@ public static void doMajorCompaction(Connection conn, String tableName) throws E
             if (table.isTransactional()) {
                 mutationState.commit();
             }
-        
+
             Admin hbaseAdmin = services.getAdmin();
             hbaseAdmin.flush(TableName.valueOf(tableName));
             hbaseAdmin.majorCompact(TableName.valueOf(tableName));
@@ -910,7 +914,7 @@ public static void doMajorCompaction(Connection conn, String tableName) throws E
                 scan.withStartRow(markerRowKey);
                 scan.withStopRow(Bytes.add(markerRowKey, new byte[]{0}));
                 scan.setRaw(true);
-        
+
                 try (Table htableForRawScan = services.getTable(Bytes.toBytes(tableName))) {
                     ResultScanner scanner = htableForRawScan.getScanner(scan);
                     List<Result> results = Lists.newArrayList(scanner);
@@ -1450,4 +1454,71 @@ public static Path createTempDirectory() throws IOException {
         return Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), null);
     }
 
+    /**
+     * Split the table at the provided split point.
+     */
+    public static void splitTable(Connection conn, String tableName, byte[] splitPoint)
+            throws Exception {
+        executeHBaseTableRegionOperation(conn, tableName, (admin, regionLocator, nRegions) -> {
+            admin.split(TableName.valueOf(tableName), splitPoint);
+            waitForRegionChange(regionLocator, nRegions);
+        });
+    }
+
+    /**
+     * Merge the given regions of a table.
+     */
+    public static void mergeTableRegions(Connection conn, String tableName, List<String> regions)
+            throws Exception {
+        byte[][] regionsToMerge = regions.stream()
+                .map(String::getBytes)
+                .toArray(byte[][]::new);
+
+        executeHBaseTableRegionOperation(conn, tableName, (admin, regionLocator, nRegions) -> {
+            admin.mergeRegionsAsync(regionsToMerge, true);
+            waitForRegionChange(regionLocator, nRegions);
+        });
+    }
+
+    @FunctionalInterface
+    private interface TableOperation {
+        void execute(Admin admin, RegionLocator regionLocator, int initialRegionCount)
+                throws Exception;
+    }
+
+    private static void executeHBaseTableRegionOperation(Connection conn, String tableName,
+                                                     TableOperation operation) throws Exception {
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Configuration configuration = services.getConfiguration();
+        org.apache.hadoop.hbase.client.Connection hbaseConn
+                = ConnectionFactory.createConnection(configuration);
+        Admin admin = services.getAdmin();
+        RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+        int nRegions = regionLocator.getAllRegionLocations().size();
+        operation.execute(admin, regionLocator, nRegions);
+
+    }
+
+    private static void waitForRegionChange(RegionLocator regionLocator, int initialRegionCount)
+            throws Exception {
+        int retryCount = 0;
+        while (retryCount < 20
+                && regionLocator.getAllRegionLocations().size() == initialRegionCount) {
+            Thread.sleep(5000);
+            retryCount++;
+        }
+        Assert.assertNotEquals(regionLocator.getAllRegionLocations().size(), initialRegionCount);
+    }
+
+    public static List<HRegionLocation> getAllTableRegions(Connection conn, String tableName)
+            throws Exception {
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Configuration configuration = services.getConfiguration();
+        RegionLocator regionLocator;
+        org.apache.hadoop.hbase.client.Connection hbaseConn
+                = ConnectionFactory.createConnection(configuration);
+        regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+        return regionLocator.getAllRegionLocations();
+    }
+
 }

From d089cbad881ec705014a1817ffd6ea642e18fae2 Mon Sep 17 00:00:00 2001
From: Palash Chauhan <p.chauhan@pchauha-ltm8owy.internal.salesforce.com>
Date: Mon, 27 Jan 2025 22:53:13 -0800
Subject: [PATCH 3/4] set auto commit to false

---
 .../org/apache/phoenix/coprocessor/PhoenixMasterObserver.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index 3f06f8e1216..e335d7604a3 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -261,6 +261,7 @@ private void upsertDaughterPartitions(Connection conn, String tableName,
                                          String streamName, List<String> parentPartitionIDs,
                                          List<RegionInfo> daughters)
             throws SQLException {
+        conn.setAutoCommit(false);
         PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
         for (RegionInfo daughter : daughters) {
             for (String parentPartitionID : parentPartitionIDs) {
@@ -292,6 +293,7 @@ private void upsertDaughterPartitions(Connection conn, String tableName,
     private void updateParentPartitionEndTime(Connection conn, String tableName,
                                               String streamName, List<String> ancestorIDs,
                                               long daughterStartTime) throws SQLException {
+        conn.setAutoCommit(false);
         // ancestorIDs = [parentID, grandparentID1, grandparentID2...]
         PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
         for (int i=1; i<ancestorIDs.size(); i++) {

From 433b960726a3106fdf03b039fab4ffbe428ced62 Mon Sep 17 00:00:00 2001
From: Palash Chauhan <p.chauhan@pchauha-ltm8owy.internal.salesforce.com>
Date: Mon, 3 Feb 2025 15:26:09 -0800
Subject: [PATCH 4/4] add stream type column

---
 .../java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java   | 1 +
 .../src/main/java/org/apache/phoenix/query/QueryConstants.java  | 2 ++
 2 files changed, 3 insertions(+)

diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index ff250f40ba9..1a4b2996b76 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -461,6 +461,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
             SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CDC_STREAM_TABLE);
     public static final String STREAM_NAME = "STREAM_NAME";
     public static final String STREAM_STATUS = "STREAM_STATUS";
+    public static final String STREAM_TYPE = "STREAM_TYPE";
     public static final String PARTITION_ID = "PARTITION_ID";
     public static final String PARENT_PARTITION_ID = "PARENT_PARTITION_ID";
     public static final String PARTITION_START_TIME = "PARTITION_START_TIME";
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 14e9b05ffff..41c5b250e1b 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -142,6 +142,7 @@
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_STATUS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
@@ -658,6 +659,7 @@ enum JoinType {INNER, LEFT_OUTER}
             STREAM_NAME + " VARCHAR NOT NULL," +
             // Non-PK columns
             STREAM_STATUS + " VARCHAR,\n" +
+            STREAM_TYPE + " VARCHAR,\n" +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
             TABLE_NAME + "," + STREAM_NAME + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +