Skip to content

Commit

Permalink
Adding more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lokiore committed Feb 11, 2025
1 parent e4aa099 commit 1de43c9
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,31 +117,6 @@ void transitClusterUrl(HighAvailabilityGroup haGroup, ClusterRoleRecord oldRecor
LOG.info("Couldn't find active url in old ClusterRoleRecord, " +
"Doing nothing for FailoverPhoenixConnections");
}

// if (!Objects.equals(oldRecord.getUrl1(), newRecord.getUrl1())) {
// //If role changes then it is already handled at the first transit step {@link #transitClusterRole}
// //And we only care about ACTIVE connections as STANDBY we are not creating connections
// if (oldRecord.getRole1() == ACTIVE && newRecord.getRole1() == ACTIVE) {
// LOG.info("Cluster {} is changed to {} in HA group {}, now closing all its connections",
// oldRecord.getUrl1(), newRecord.getUrl1(), haGroup);
// closeConnections(haGroup, oldRecord.getUrl1(), oldRecord.getRegistryType());
// //Do we need to invalidate now?
// } else {
// LOG.info("Cluster {} is changed to {} in HA group {}, which is not active",
// oldRecord.getUrl1(), newRecord.getUrl1(), haGroup);
// }
// }
// if (!Objects.equals(oldRecord.getUrl2(), newRecord.getUrl2())) {
// if (oldRecord.getRole2() == ACTIVE && newRecord.getRole2() == ACTIVE) {
// LOG.info("Cluster {} is changed to {} in HA group {}, now closing all its connections",
// oldRecord.getUrl2(), newRecord.getUrl2(), haGroup);
// closeConnections(haGroup, oldRecord.getUrl2(), oldRecord.getRegistryType());
// //Do we need to invalidate now?
// } else {
// LOG.info("Cluster {} is changed to {} in HA group {}, which is not active",
// oldRecord.getUrl2(), newRecord.getUrl2(), haGroup);
// }
// }
}

private void transitStandby(HighAvailabilityGroup haGroup, String url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ public String getJdbcUrl(HighAvailabilityGroup haGroup, String url, String princ
return String.format("%s:%s",interimUrl, principal);
}

public String getJdbcUrl(String url) {
return String.format("jdbc:phoenix+zk:%s:%s", url, PRINCIPAL);
}

public String getJdbcUrlWithoutPrincipal(HighAvailabilityGroup haGroup, String url) {
String interimUrl = getUrlWithoutPrincipal(haGroup, url);
if (interimUrl.endsWith("::")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testClusterUnavailableNormalConnection() throws Exception {
@Test
public void testClusterReplication() throws Exception {
try (Connection conn = CLUSTERS.getClusterConnection(0, haGroup)) {
doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
doTestBasicOperationsWithConnection(conn, tableName, null);
}

CLUSTERS.checkReplicationComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public static final void doSetup() throws Exception {
GLOBAL_PROPERTIES.put(PHOENIX_HA_GROUP_ATTR, PARALLEL.name());


CONNECTIONS = Lists.newArrayList(getConnection(CLUSTERS.getZkUrl1()), getConnection(CLUSTERS.getZkUrl2()));
CONNECTIONS = Lists.newArrayList(getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getZkUrl1())),
getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getZkUrl2())));
LOG.info(String.format("************* Num connections : %d", CONNECTIONS.size()));

for (Connection conn : CONNECTIONS) {
Expand All @@ -97,7 +98,7 @@ public static final void doSetup() throws Exception {
}

//preload some data
try (Connection connection = getConnection(CLUSTERS.getZkUrl1())) {
try (Connection connection = getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getZkUrl1()))) {
loadData(connection, ORG_ID, GROUP_ID, 100, 20);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.*;

import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
import static org.apache.phoenix.jdbc.HighAvailabilityPolicy.PARALLEL;
Expand Down Expand Up @@ -93,16 +90,38 @@ public class ParallelPhoenixConnectionWorkflowIT {
/**
* JDBC connection string for this test HA group.
*/
private String jdbcUrl;
private String jdbcHAUrl;
/**
* HA group for this test.
*/
private HighAvailabilityGroup haGroup;
/**
* registry Type to use in CRR
*/
private ClusterRoleRecord.RegistryType registryType;

@Parameters(name="ParallelPhoenixConnectionWorkflowIT_resultSetType={0}, registryType={1}") // name is used by failsafe as file name in reports
public static Collection<Object> data() {
return Arrays.asList( new Object[][]{
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_RESULT_SET.getName(),
ClusterRoleRecord.RegistryType.ZK},
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_NULL_COMPARING_RESULT_SET.getName(),
ClusterRoleRecord.RegistryType.ZK},
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_RESULT_SET.getName(),
ClusterRoleRecord.RegistryType.MASTER},
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_NULL_COMPARING_RESULT_SET.getName(),
ClusterRoleRecord.RegistryType.MASTER},
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_RESULT_SET.getName(),
ClusterRoleRecord.RegistryType.RPC},
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_NULL_COMPARING_RESULT_SET.getName(),
ClusterRoleRecord.RegistryType.RPC},
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_RESULT_SET.getName(),
null},
{ParallelPhoenixResultSetType.PARALLEL_PHOENIX_NULL_COMPARING_RESULT_SET.getName(),
null}

});

@Parameters(name="ParallelPhoenixConnectionWorkflowIT_resultSetType={0}") // name is used by failsafe as file name in reports
public static Collection<String> data() {
return Arrays.asList(ParallelPhoenixResultSetType.PARALLEL_PHOENIX_RESULT_SET.getName(),
ParallelPhoenixResultSetType.PARALLEL_PHOENIX_NULL_COMPARING_RESULT_SET.getName());
}

@BeforeClass
Expand Down Expand Up @@ -131,7 +150,8 @@ public static void setUpBeforeClass() throws Exception {
" ) \n" +
") IMMUTABLE_ROWS=true, VERSIONS=1, REPLICATION_SCOPE=1", tableName);

CONNECTIONS = Lists.newArrayList(getConnection(CLUSTERS.getZkUrl1()), getConnection(CLUSTERS.getZkUrl2()));
CONNECTIONS = Lists.newArrayList(getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getZkUrl1())),
getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getZkUrl2())));

for (Connection conn : CONNECTIONS) {
try (Statement statement = conn.createStatement()) {
Expand All @@ -143,7 +163,7 @@ public static void setUpBeforeClass() throws Exception {
CLUSTERS.checkReplicationComplete();

//preload some data
try (Connection connection = getConnection(CLUSTERS.getZkUrl1())) {
try (Connection connection = getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getZkUrl1()))) {
loadData(connection, USER_ID, WORK_ID, 100, 20);
}
CLUSTERS.checkReplicationComplete();
Expand Down Expand Up @@ -187,8 +207,10 @@ private static void loadData(Connection connection, String userId, String workId
}
}

public ParallelPhoenixConnectionWorkflowIT(String resultSetType) {
public ParallelPhoenixConnectionWorkflowIT(String resultSetType,
ClusterRoleRecord.RegistryType registryType) {
GLOBAL_PROPERTIES.setProperty(ParallelPhoenixResultSetFactory.PHOENIX_PARALLEL_RESULTSET_TYPE, resultSetType);
this.registryType = registryType;
}

@Before
Expand All @@ -198,11 +220,15 @@ public void setup() throws Exception {
clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);

// Make first cluster ACTIVE
CLUSTERS.initClusterRole(haGroupName, PARALLEL);
if (registryType == null) {
CLUSTERS.initClusterRole(haGroupName, PARALLEL);
} else {
CLUSTERS.initClusterRole(haGroupName, PARALLEL, registryType);
}

jdbcUrl = CLUSTERS.getJdbcHAUrl();
haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(jdbcUrl, clientProperties);
LOG.info("Initialized haGroup {} with URL {}", haGroup.getGroupInfo().getName(), jdbcUrl);
jdbcHAUrl = CLUSTERS.getJdbcHAUrl();
haGroup = HighAvailabilityTestingUtility.getHighAvailibilityGroup(jdbcHAUrl, clientProperties);
LOG.info("Initialized haGroup {} with URL {}", haGroup.getGroupInfo().getName(), jdbcHAUrl);
}

@Test
Expand Down Expand Up @@ -460,7 +486,7 @@ public void testGetAllKeys() throws SQLException {
* @throws SQLException
*/
private Connection getParallelConnection() throws SQLException {
Connection connection = DriverManager.getConnection(jdbcUrl, clientProperties);
Connection connection = DriverManager.getConnection(jdbcHAUrl, clientProperties);
connection.setAutoCommit(true);
return connection;
}
Expand Down

0 comments on commit 1de43c9

Please sign in to comment.