-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PHOENIX-7499 : Update stream metadata when data table regions merge #2057
base: master
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,8 +23,10 @@ | |
import java.sql.SQLException; | ||
import java.sql.Types; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.hbase.TableName; | ||
|
@@ -60,9 +62,13 @@ public class PhoenixMasterObserver implements MasterObserver, MasterCoprocessor | |
private static final String PARTITION_UPSERT_SQL | ||
= "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,?,?,?,?,?)"; | ||
|
||
private static final String PARENT_PARTITION_QUERY | ||
private static final String PARENT_PARTITION_QUERY_FOR_SPLIT | ||
= "SELECT PARTITION_ID, PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME | ||
+ " WHERE TABLE_NAME = ? AND STREAM_NAME = ? "; | ||
+ " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_END_TIME IS NULL "; | ||
|
||
private static final String PARENT_PARTITION_QUERY_FOR_MERGE | ||
= "SELECT PARENT_PARTITION_ID FROM " + SYSTEM_CDC_STREAM_NAME | ||
+ " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_ID = ?"; | ||
|
||
private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL | ||
= "UPSERT INTO " + SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, STREAM_NAME, PARTITION_ID, " | ||
|
@@ -74,7 +80,7 @@ public Optional<MasterObserver> getMasterObserver() { | |
} | ||
|
||
/** | ||
* Update parent -> daughter relationship for CDC Streams. | ||
* Update parent -> daughter relationship for CDC Streams when a region splits. | ||
* - find parent partition id using start/end keys of daughters | ||
* - upsert partition metadata for the 2 daughters | ||
* - update the end time on the parent's partition metadata | ||
|
@@ -95,52 +101,92 @@ public void postCompletedSplitRegionAction(final ObserverContext<MasterCoprocess | |
regionInfoA.getTable()); | ||
return; | ||
} | ||
// find streamName with ENABLED status | ||
String tableName = phoenixTable.getName().getString(); | ||
PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY); | ||
pstmt.setString(1, tableName); | ||
ResultSet rs = pstmt.executeQuery(); | ||
if (rs.next()) { | ||
String streamName = rs.getString(1); | ||
LOGGER.info("Updating partition metadata for table={}, stream={} daughters {} {}", | ||
String streamName = getStreamName(conn, tableName); | ||
if (streamName != null) { | ||
LOGGER.info("Updating split partition metadata for table={}, stream={} daughters {} {}", | ||
tableName, streamName, regionInfoA.getEncodedName(), regionInfoB.getEncodedName()); | ||
// ancestorIDs = [parentId, grandparentId1] | ||
List<String> ancestorIDs = getAncestorIds(conn, tableName, streamName, regionInfoA, regionInfoB); | ||
upsertDaughterPartition(conn, tableName, streamName, ancestorIDs.get(0), regionInfoA); | ||
upsertDaughterPartition(conn, tableName, streamName, ancestorIDs.get(0), regionInfoB); | ||
updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, regionInfoA.getRegionId()); | ||
// ancestorIDs = [parentId, grandparentId1, grandparentId2...] | ||
List<String> ancestorIDs | ||
= getAncestorIdsForSplit(conn, tableName, streamName, regionInfoA, regionInfoB); | ||
|
||
upsertDaughterPartitions(conn, tableName, streamName, ancestorIDs.subList(0, 1), | ||
Arrays.asList(regionInfoA, regionInfoB)); | ||
|
||
updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, | ||
regionInfoA.getRegionId()); | ||
} else { | ||
LOGGER.info("{} does not have a stream enabled, skipping partition metadata update.", | ||
regionInfoA.getTable()); | ||
} | ||
} catch (SQLException e) { | ||
LOGGER.error("Unable to update CDC Stream Partition metadata during split with daughter regions: {} {}", | ||
regionInfoA.getEncodedName(), regionInfoB.getEncodedName(), e); | ||
} | ||
} | ||
|
||
private PTable getPhoenixTable(Connection conn, TableName tableName) throws SQLException { | ||
PTable pTable; | ||
try { | ||
pTable = PhoenixRuntime.getTable(conn, tableName.toString()); | ||
} catch (TableNotFoundException e) { | ||
return null; | ||
/** | ||
* Update parent -> daughter relationship for CDC Streams when regions merge. | ||
* - upsert partition metadata for the daughter with each parent | ||
* - update the end time on all the parents' partition metadata | ||
* @param c the environment to interact with the framework and master | ||
* @param regionsToMerge parent regions which merged | ||
* @param mergedRegion daughter region | ||
*/ | ||
@Override | ||
public void postCompletedMergeRegionsAction(final ObserverContext<MasterCoprocessorEnvironment> c, | ||
final RegionInfo[] regionsToMerge, | ||
final RegionInfo mergedRegion) { | ||
Configuration conf = c.getEnvironment().getConfiguration(); | ||
try (Connection conn = QueryUtil.getConnectionOnServer(conf)) { | ||
// CDC will be enabled on Phoenix tables only | ||
PTable phoenixTable = getPhoenixTable(conn, mergedRegion.getTable()); | ||
if (phoenixTable == null) { | ||
LOGGER.info("{} is not a Phoenix Table, skipping partition metadata update.", | ||
mergedRegion.getTable()); | ||
return; | ||
} | ||
String tableName = phoenixTable.getName().getString(); | ||
String streamName = getStreamName(conn, tableName); | ||
if (streamName != null) { | ||
LOGGER.info("Updating merged partition metadata for table={}, stream={} daughter {}", | ||
tableName, streamName, mergedRegion.getEncodedName()); | ||
// upsert a row for daughter-parent for each merged region | ||
upsertDaughterPartitions(conn, tableName, streamName, | ||
Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()), | ||
Arrays.asList(mergedRegion)); | ||
|
||
// lookup all ancestors of a merged region and update the endTime | ||
for (RegionInfo ri : regionsToMerge) { | ||
List<String> ancestorIDs = getAncestorIdsForMerge(conn, tableName, streamName, ri); | ||
updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, | ||
mergedRegion.getRegionId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit optimization: In case of merge, the parent will appear multiple times, but needs to be updated only once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @haridsv During merges, we decided to insert one row for the daughter per parent. (parent partition id is part of PK) Parents in a merge can also be results of merges in the past and hence they can have multiple rows in the table. We would need to update all those rows with the end time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By looking up the parent of the parent, we are actually updating the grand parents right? Is that really the intention? See my other comment visualizing the operations and let me know what I am missing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just looking up the grandparent to form the complete PK for updating the parent end time. |
||
} | ||
} else { | ||
LOGGER.info("{} does not have a stream enabled, skipping partition metadata update.", | ||
mergedRegion.getTable()); | ||
} | ||
} catch (SQLException e) { | ||
LOGGER.error("Unable to update CDC Stream Partition metadata during merge with " + | ||
"parent regions: {} and daughter region {}", | ||
regionsToMerge, mergedRegion.getEncodedName(), e); | ||
} | ||
return pTable; | ||
} | ||
|
||
/** | ||
* Lookup parent's partition id (region's encoded name) in SYSTEM.CDC_STREAM. | ||
* Lookup a split parent's partition id (region's encoded name) in SYSTEM.CDC_STREAM. | ||
* RegionInfoA is left daughter and RegionInfoB is right daughter so parent's key range would | ||
* be [RegionInfoA startKey, RegionInfoB endKey] | ||
* Return both parent and grandparent partition ids. | ||
* Return parent and all grandparent partition ids. | ||
* | ||
* TODO: When we implement merges in this coproc, there could be multiple grandparents. | ||
*/ | ||
private List<String> getAncestorIds(Connection conn, String tableName, String streamName, | ||
private List<String> getAncestorIdsForSplit(Connection conn, String tableName, String streamName, | ||
RegionInfo regionInfoA, RegionInfo regionInfoB) | ||
throws SQLException { | ||
byte[] parentStartKey = regionInfoA.getStartKey(); | ||
byte[] parentEndKey = regionInfoB.getEndKey(); | ||
|
||
StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY); | ||
StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY_FOR_SPLIT); | ||
if (parentStartKey.length == 0) { | ||
qb.append(" AND PARTITION_START_KEY IS NULL "); | ||
} else { | ||
|
@@ -173,50 +219,113 @@ private List<String> getAncestorIds(Connection conn, String tableName, String st | |
Bytes.toStringBinary(regionInfoB.getStartKey()), | ||
Bytes.toStringBinary(regionInfoB.getEndKey()))); | ||
} | ||
// if parent was a result of a merge, there will be multiple grandparents. | ||
while (rs.next()) { | ||
ancestorIDs.add(rs.getString(2)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the situation in which a split will have multiple ancestors? In fact, isn't it an error if we actually find 2 regions with exactly the same start and end keys? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @haridsv By ancestors here I mean both parent and grandparent. The parent in a split can be a result of a merge. For example: A and B merged to form C. We would have inserted 2 rows for the daughter C with 2 parents, (C-->A) and (C-->B), with the same start/end key. If C splits into D and E, we will insert (D-->C) and (E-->C). When we lookup the parent of D and E, we will find those 2 rows with the same start/end key - both of which will need to be marked with the end time. (Parent's partition id is now part of the PK for CDC_STREAM table, so we will have to find all grandparents to mark the end time for a parent which has split). Let me know if I am missing something here or over-complicating things. FYI @virajjasani There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't get the part about grand parents, but then again I must be missing something, please see my other comment with visualization and let me know what I am missing. |
||
return ancestorIDs; | ||
} | ||
|
||
/** | ||
* Insert partition metadata for a daughter region from the split. | ||
* Lookup the parent of a merged region. | ||
* If the merged region was an output of a merge in the past, it will have multiple parents. | ||
*/ | ||
private void upsertDaughterPartition(Connection conn, String tableName, | ||
String streamName, String parentPartitionID, | ||
RegionInfo regionInfo) | ||
throws SQLException { | ||
String partitionId = regionInfo.getEncodedName(); | ||
long startTime = regionInfo.getRegionId(); | ||
byte[] startKey = regionInfo.getStartKey(); | ||
byte[] endKey = regionInfo.getEndKey(); | ||
PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL); | ||
private List<String> getAncestorIdsForMerge(Connection conn, String tableName, String streamName, | ||
RegionInfo parent) throws SQLException { | ||
List<String> ancestorIDs = new ArrayList<>(); | ||
ancestorIDs.add(parent.getEncodedName()); | ||
PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE); | ||
pstmt.setString(1, tableName); | ||
pstmt.setString(2, streamName); | ||
pstmt.setString(3, partitionId); | ||
pstmt.setString(4, parentPartitionID); | ||
pstmt.setLong(5, startTime); | ||
// endTime in not set when inserting a new partition | ||
pstmt.setNull(6, Types.BIGINT); | ||
pstmt.setBytes(7, startKey.length == 0 ? null : startKey); | ||
pstmt.setBytes(8, endKey.length == 0 ? null : endKey); | ||
pstmt.executeUpdate(); | ||
pstmt.setString(3, parent.getEncodedName()); | ||
ResultSet rs = pstmt.executeQuery(); | ||
if (rs.next()) { | ||
ancestorIDs.add(rs.getString(1)); | ||
} else { | ||
throw new SQLException(String.format( | ||
"Could not find parent of the provided merged region: {}", parent.getEncodedName())); | ||
} | ||
// if parent was a result of a merge, there will be multiple grandparents. | ||
while (rs.next()) { | ||
ancestorIDs.add(rs.getString(1)); | ||
} | ||
return ancestorIDs; | ||
} | ||
|
||
/** | ||
* Insert partition metadata for a daughter region from a split or a merge. | ||
* split: 2 daughters, 1 parent | ||
* merge: 1 daughter, N parents | ||
*/ | ||
private void upsertDaughterPartitions(Connection conn, String tableName, | ||
String streamName, List<String> parentPartitionIDs, | ||
List<RegionInfo> daughters) | ||
throws SQLException { | ||
PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL); | ||
for (RegionInfo daughter : daughters) { | ||
for (String parentPartitionID : parentPartitionIDs) { | ||
String partitionId = daughter.getEncodedName(); | ||
long startTime = daughter.getRegionId(); | ||
byte[] startKey = daughter.getStartKey(); | ||
byte[] endKey = daughter.getEndKey(); | ||
pstmt.setString(1, tableName); | ||
pstmt.setString(2, streamName); | ||
pstmt.setString(3, partitionId); | ||
pstmt.setString(4, parentPartitionID); | ||
pstmt.setLong(5, startTime); | ||
// endTime in not set when inserting a new partition | ||
pstmt.setNull(6, Types.BIGINT); | ||
pstmt.setBytes(7, startKey.length == 0 ? null : startKey); | ||
pstmt.setBytes(8, endKey.length == 0 ? null : endKey); | ||
pstmt.executeUpdate(); | ||
} | ||
} | ||
conn.commit(); | ||
palashc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/** | ||
* Update endTime in all rows of parent partition by setting it to daughter's startTime. | ||
* parent came from a split : there will only be one record | ||
* parent came from a merge : there will be multiple rows, one per grandparent | ||
* | ||
* TODO: When we implement merges in this coproc, update all rows of the parent. | ||
*/ | ||
private void updateParentPartitionEndTime(Connection conn, String tableName, | ||
String streamName, List<String> ancestorIDs, | ||
long daughterStartTime) throws SQLException { | ||
// ancestorIDs = [parentID, grandparentID] | ||
long daughterStartTime) throws SQLException { | ||
// ancestorIDs = [parentID, grandparentID1, grandparentID2...] | ||
PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL); | ||
pstmt.setString(1, tableName); | ||
pstmt.setString(2, streamName); | ||
pstmt.setString(3, ancestorIDs.get(0)); | ||
pstmt.setString(4, ancestorIDs.get(1)); | ||
pstmt.setLong(5, daughterStartTime); | ||
pstmt.executeUpdate(); | ||
for (int i=1; i<ancestorIDs.size(); i++) { | ||
pstmt.setString(1, tableName); | ||
pstmt.setString(2, streamName); | ||
pstmt.setString(3, ancestorIDs.get(0)); | ||
pstmt.setString(4, ancestorIDs.get(i)); | ||
pstmt.setLong(5, daughterStartTime); | ||
pstmt.executeUpdate(); | ||
} | ||
conn.commit(); | ||
} | ||
|
||
/** | ||
* Get the stream name on the given table if one exists in ENABLED state. | ||
*/ | ||
private String getStreamName(Connection conn, String tableName) throws SQLException { | ||
PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY); | ||
pstmt.setString(1, tableName); | ||
ResultSet rs = pstmt.executeQuery(); | ||
if (rs.next()) { | ||
return rs.getString(1); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
private PTable getPhoenixTable(Connection conn, TableName tableName) throws SQLException { | ||
PTable pTable; | ||
try { | ||
pTable = PhoenixRuntime.getTable(conn, tableName.toString()); | ||
} catch (TableNotFoundException e) { | ||
return null; | ||
} | ||
return pTable; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to list the column names here to make it easier to read the code instead of having to jump to QueryConstants for the name and order of the columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put the list of columns in a comment just above this line.