Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class AdvancedShardAwarenessIT {

@ClassRule
public static final CustomCcmRule CCM_RULE =
CustomCcmRule.builder().withNodes(2).withJvmArgs("--smp=3").build();
CustomCcmRule.builder().withNodes(1).withJvmArgs("--smp=3").build();

public static ch.qos.logback.classic.Logger channelPoolLogger =
(ch.qos.logback.classic.Logger) LoggerFactory.getLogger(ChannelPool.class);
Expand Down Expand Up @@ -100,33 +100,22 @@ public void stopCapturingLogs() {
public void should_initialize_all_channels(boolean reuseAddress) {
int expectedChannelsPerNode = 6; // Divisible by smp
String node1 = CCM_RULE.getCcmBridge().getNodeIpAddress(1);
String node2 = CCM_RULE.getCcmBridge().getNodeIpAddress(2);
Pattern reconnectionPattern1 =
Pattern.compile(".*" + Pattern.quote(node1) + ".*Scheduling next reconnection in.*");
Pattern reconnectionPattern2 =
Pattern.compile(".*" + Pattern.quote(node2) + ".*Scheduling next reconnection in.*");
Set<Pattern> forbiddenOccurences =
ImmutableSet.of(shardMismatchPattern, reconnectionPattern1, reconnectionPattern2);
Set<Pattern> forbiddenOccurences = ImmutableSet.of(shardMismatchPattern, reconnectionPattern1);
Map<Pattern, Integer> expectedOccurences =
ImmutableMap.of(
Pattern.compile(
".*"
+ Pattern.quote(node1)
+ ":19042.*Reconnection attempt complete, 6/6 channels.*"),
1,
Pattern.compile(
".*"
+ Pattern.quote(node2)
+ ":19042.*Reconnection attempt complete, 6/6 channels.*"),
1);
".*"
+ Pattern.quote(node1)
+ ":19042.*Reconnection attempt complete, 6/6 channels.*"),
1);
DriverConfigLoader loader =
SessionUtils.configLoaderBuilder()
.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, reuseAddress)
.withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true)
.withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000)
.withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 60000)
// Due to rounding up the connections per shard this will result in 6 connections per
// node
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode)
.build();
try (CqlSession session =
Expand All @@ -149,13 +138,13 @@ public void should_initialize_all_channels(boolean reuseAddress) {

@Test
public void should_see_mismatched_shard() {
int expectedChannelsPerNode = 66; // Divisible by smp
int expectedChannelsPerNode = 33; // Divisible by smp
DriverConfigLoader loader =
SessionUtils.configLoaderBuilder()
.withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true)
.withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_LOW, 10000)
.withInt(DefaultDriverOption.ADVANCED_SHARD_AWARENESS_PORT_HIGH, 60000)
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 66)
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode)
.build();
try (CqlSession session =
CqlSession.builder()
Expand All @@ -176,13 +165,13 @@ public void should_see_mismatched_shard() {
// There is no need to run this as a test, but it serves as a comparison
@SuppressWarnings("unused")
public void should_struggle_to_fill_pools() {
int expectedChannelsPerNode = 66; // Divisible by smp
int expectedChannelsPerNode = 33; // Divisible by smp
DriverConfigLoader loader =
SessionUtils.configLoaderBuilder()
.withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, false)
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 66)
.withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(200))
.withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(4000))
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode)
.withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(300))
.withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(3200))
.build();
CqlSessionBuilder builder =
CqlSession.builder()
Expand Down Expand Up @@ -210,13 +199,13 @@ public void should_struggle_to_fill_pools() {

@Test
public void should_not_struggle_to_fill_pools() {
int expectedChannelsPerNode = 66;
int expectedChannelsPerNode = 33;
DriverConfigLoader loader =
SessionUtils.configLoaderBuilder()
.withBoolean(DefaultDriverOption.CONNECTION_ADVANCED_SHARD_AWARENESS_ENABLED, true)
.withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, expectedChannelsPerNode)
.withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(10))
.withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(20))
.withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(300))
.withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(3200))
.build();
CqlSessionBuilder builder =
CqlSession.builder()
Expand All @@ -239,29 +228,24 @@ public void should_not_struggle_to_fill_pools() {
.until(() -> areAllPoolsFullyInitialized(allSessions, expectedChannelsPerNode));
int tolerance = 2; // Sometimes socket ends up already in use
String node1 = CCM_RULE.getCcmBridge().getNodeIpAddress(1);
String node2 = CCM_RULE.getCcmBridge().getNodeIpAddress(2);
Pattern reconnectionPattern1 =
Pattern.compile(".*" + Pattern.quote(node1) + ".*Scheduling next reconnection in.*");
Pattern reconnectionPattern2 =
Pattern.compile(".*" + Pattern.quote(node2) + ".*Scheduling next reconnection in.*");
Map<Pattern, Integer> expectedOccurences =
ImmutableMap.of(
Pattern.compile(
".*"
+ Pattern.quote(node1)
+ ":19042.*Reconnection attempt complete, 66/66 channels.*"),
1 * sessions,
Pattern.compile(
".*"
+ Pattern.quote(node2)
+ ":19042.*Reconnection attempt complete, 66/66 channels.*"),
1 * sessions);
".*"
+ Pattern.quote(node1)
+ ":19042.*Reconnection attempt complete, "
+ expectedChannelsPerNode
+ "/"
+ expectedChannelsPerNode
+ " channels.*"),
sessions);
List<ILoggingEvent> logsCopy = ImmutableList.copyOf(appender.list);
expectedOccurences.forEach(
(pattern, times) -> assertMatchesAtLeast(pattern, times, logsCopy));
assertNoLogMatches(shardMismatchPattern, logsCopy);
assertMatchesAtMost(reconnectionPattern1, tolerance, logsCopy);
assertMatchesAtMost(reconnectionPattern2, tolerance, logsCopy);
}
}

Expand Down
Loading