Skip to content

Commit 440bb72

Browse files
committed
[#597] Enhancing protocol feature handling.
# Conflicts: # core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java # core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java
1 parent 814b46d commit 440bb72

File tree

5 files changed

+196
-7
lines changed

5 files changed

+196
-7
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import com.datastax.oss.driver.internal.core.session.DefaultSession;
3939
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
4040
import com.datastax.oss.protocol.internal.Message;
41-
import com.datastax.oss.protocol.internal.ProtocolFeatures;
4241
import io.netty.channel.Channel;
4342
import io.netty.channel.ChannelConfig;
4443
import io.netty.channel.ChannelFuture;

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@
3737
import com.datastax.oss.driver.internal.core.DefaultProtocolFeature;
3838
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
3939
import com.datastax.oss.driver.internal.core.protocol.BytesToSegmentDecoder;
40+
import com.datastax.oss.driver.internal.core.protocol.FrameDecoder;
41+
import com.datastax.oss.driver.internal.core.protocol.FrameEncoder;
4042
import com.datastax.oss.driver.internal.core.protocol.FrameToSegmentEncoder;
43+
import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureManager;
44+
import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureParser;
4145
import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore;
4246
import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder;
4347
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
@@ -46,6 +50,7 @@
4650
import com.datastax.oss.protocol.internal.Message;
4751
import com.datastax.oss.protocol.internal.ProtocolConstants;
4852
import com.datastax.oss.protocol.internal.ProtocolConstants.ErrorCode;
53+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
4954
import com.datastax.oss.protocol.internal.request.AuthResponse;
5055
import com.datastax.oss.protocol.internal.request.Options;
5156
import com.datastax.oss.protocol.internal.request.Query;
@@ -66,6 +71,7 @@
6671
import java.util.List;
6772
import java.util.Map;
6873
import java.util.Objects;
74+
import java.util.Optional;
6975
import net.jcip.annotations.NotThreadSafe;
7076
import org.slf4j.Logger;
7177
import org.slf4j.LoggerFactory;
@@ -91,6 +97,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
9197
private String logPrefix;
9298
private ChannelHandlerContext ctx;
9399
private final boolean querySupportedOptions;
100+
private ProtocolFeatureManager protocolFeatureManager;
94101
private ProtocolFeatureStore featureStore;
95102

96103
/**
@@ -189,6 +196,12 @@ Message getRequest() {
189196
if (featureStore != null) {
190197
featureStore.populateStartupOptions(startupOptions);
191198
}
199+
Optional.ofNullable(protocolFeatureManager)
200+
.ifPresent(m -> m.optionallyAddLwtInfoOption(startupOptions));
201+
Optional.ofNullable(protocolFeatureManager)
202+
.ifPresent(m -> m.optionallyAddTabletInfoOption(startupOptions));
203+
Optional.ofNullable(protocolFeatureManager)
204+
.ifPresent(m -> m.optionallyAddMetadataIdOption(startupOptions));
192205
return request = new Startup(startupOptions);
193206
case GET_CLUSTER_NAME:
194207
return request = CLUSTER_NAME_QUERY;
@@ -222,6 +235,14 @@ void onResponse(Message response) {
222235
Supported res = (Supported) response;
223236
featureStore = ProtocolFeatureStore.parseSupportedOptions(res.options);
224237
featureStore.storeInChannel(channel);
238+
Supported supported = (Supported) response;
239+
ProtocolFeatureParser featureParser =
240+
ProtocolFeatureParser.Builder.fromOptions(supported).build();
241+
protocolFeatureManager = featureParser.parse();
242+
protocolFeatureManager.updateOptionsAttributeForChannel(channel);
243+
protocolFeatureManager.updateShardingInfoAttributeForChannel(channel);
244+
protocolFeatureManager.updateLwtInfoAttributeForChannel(channel);
245+
maybeUpdatePipelineWithProtocolOptions(protocolFeatureManager.isMetadataIdEnabled());
225246
step = Step.STARTUP;
226247
send();
227248
} else if (step == Step.STARTUP && response instanceof Ready) {
@@ -431,6 +452,29 @@ private void maybeSwitchToModernFraming() {
431452
}
432453
}
433454

455+
private void maybeUpdatePipelineWithProtocolOptions(boolean metadataIdEnabled) {
456+
if (metadataIdEnabled) {
457+
ProtocolFeatures protocolFeatures = new ProtocolFeatures();
458+
protocolFeatures.addFeature(ProtocolFeatures.Feature.SCYLLA_USE_METADATA_ID);
459+
int maxFrameLength =
460+
(int)
461+
context
462+
.getConfig()
463+
.getDefaultProfile()
464+
.getBytes(DefaultDriverOption.PROTOCOL_MAX_FRAME_LENGTH);
465+
466+
ChannelPipeline pipeline = ctx.pipeline();
467+
pipeline.replace(
468+
ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME,
469+
ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME,
470+
new FrameEncoder(context.getFrameCodec(), protocolFeatures, maxFrameLength));
471+
pipeline.replace(
472+
ChannelFactory.BYTES_TO_FRAME_DECODER_NAME,
473+
ChannelFactory.BYTES_TO_FRAME_DECODER_NAME,
474+
new FrameDecoder(context.getFrameCodec(), protocolFeatures, maxFrameLength));
475+
}
476+
}
477+
434478
private String getString(List<ByteBuffer> row, int i) {
435479
return TypeCodecs.TEXT.decode(row.get(i), DefaultProtocolVersion.DEFAULT);
436480
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.datastax.oss.driver.internal.core.protocol;
2+
3+
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
4+
import io.netty.channel.Channel;
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
/**
9+
* <code>ProtocolFeatureManager</code> is a helper class storing and managing protocol related info
10+
* such as:
11+
*
12+
* <ul>
13+
* <li>{@link ShardingInfo}
14+
* <li>{@link LwtInfo}
15+
* <li>{@link TabletInfo}
16+
* <li>whether any of {@link com.datastax.oss.protocol.internal.ProtocolFeatures.Feature} is
17+
* negotiated
18+
* </ul>
19+
*/
20+
public class ProtocolFeatureManager {
21+
private final Map<String, List<String>> options;
22+
private final ShardingInfo.ConnectionShardingInfo shardingInfo;
23+
private final LwtInfo lwtInfo;
24+
private final TabletInfo tabletInfo;
25+
private final boolean metadataIdEnabled;
26+
27+
public ProtocolFeatureManager(
28+
Map<String, List<String>> options,
29+
ShardingInfo.ConnectionShardingInfo shardingInfo,
30+
LwtInfo lwtInfo,
31+
TabletInfo tabletInfo,
32+
boolean metadataIdEnabled) {
33+
this.options = options;
34+
this.shardingInfo = shardingInfo;
35+
this.lwtInfo = lwtInfo;
36+
this.tabletInfo = tabletInfo;
37+
this.metadataIdEnabled = metadataIdEnabled;
38+
}
39+
40+
public Map<String, List<String>> getOptions() {
41+
return options;
42+
}
43+
44+
public ShardingInfo.ConnectionShardingInfo getShardingInfo() {
45+
return shardingInfo;
46+
}
47+
48+
public LwtInfo getLwtInfo() {
49+
return lwtInfo;
50+
}
51+
52+
public TabletInfo getTabletInfo() {
53+
return tabletInfo;
54+
}
55+
56+
public boolean isMetadataIdEnabled() {
57+
return metadataIdEnabled;
58+
}
59+
60+
public void updateOptionsAttributeForChannel(Channel channel) {
61+
if (channel != null) {
62+
channel.attr(DriverChannel.OPTIONS_KEY).set(options);
63+
}
64+
}
65+
66+
public void updateShardingInfoAttributeForChannel(Channel channel) {
67+
if (shardingInfo != null) {
68+
channel.attr(DriverChannel.SHARDING_INFO_KEY).set(shardingInfo);
69+
}
70+
}
71+
72+
public void updateLwtInfoAttributeForChannel(Channel channel) {
73+
if (lwtInfo != null) {
74+
channel.attr(DriverChannel.LWT_INFO_KEY).set(lwtInfo);
75+
}
76+
}
77+
78+
public void optionallyAddLwtInfoOption(Map<String, String> options) {
79+
if (lwtInfo != null) {
80+
lwtInfo.addOption(options);
81+
}
82+
}
83+
84+
public void optionallyAddMetadataIdOption(Map<String, String> options) {
85+
if (metadataIdEnabled) {
86+
MetadataIdInfo.addOption(options);
87+
}
88+
}
89+
90+
public void optionallyAddTabletInfoOption(Map<String, String> options) {
91+
if (tabletInfo != null && tabletInfo.isEnabled()) {
92+
TabletInfo.addOption(options);
93+
}
94+
}
95+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.datastax.oss.driver.internal.core.protocol;
2+
3+
import com.datastax.oss.protocol.internal.response.Supported;
4+
import java.util.List;
5+
import java.util.Map;
6+
7+
/**
8+
* <code>ProtocolFeatureParser</code> is a utility class handling parsing of {@link Supported}
9+
* response options providing an API to check for supported features
10+
*/
11+
public class ProtocolFeatureParser {
12+
13+
/** A builder class for a {@link ProtocolFeatureParser}. */
14+
public static class Builder {
15+
16+
private final Supported supported;
17+
18+
private Builder(Supported supported) {
19+
this.supported = supported;
20+
}
21+
22+
public static Builder fromOptions(Supported supported) {
23+
return new Builder(supported);
24+
}
25+
26+
public ProtocolFeatureParser build() {
27+
return new ProtocolFeatureParser(supported.options);
28+
}
29+
}
30+
31+
private final Map<String, List<String>> options;
32+
33+
private ProtocolFeatureParser(Map<String, List<String>> options) {
34+
this.options = options;
35+
}
36+
37+
/**
38+
* Parses {@link Supported#options} field of {@link Supported} message
39+
*
40+
* @return instance of {@link ProtocolFeatureManager} containing parsed properties
41+
*/
42+
public ProtocolFeatureManager parse() {
43+
ShardingInfo.ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(options);
44+
LwtInfo lwtInfo = LwtInfo.parseLwtInfo(options);
45+
TabletInfo tabletInfo = TabletInfo.parseTabletInfo(options);
46+
boolean metadataIdEnabled = MetadataIdInfo.parseMetadataId(options);
47+
return new ProtocolFeatureManager(
48+
options, shardingInfo, lwtInfo, tabletInfo, metadataIdEnabled);
49+
}
50+
}

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,7 @@ public void should_have_non_empty_variable_definitions_for_select_query_with_bou
164164
@Test
165165
@BackendRequirement(type = BackendType.CASSANDRA, minInclusive = "4.0")
166166
@BackendRequirement(type = BackendType.SCYLLA)
167-
public void
168-
should_update_metadata_when_schema_changed_across_executions() {
167+
public void should_update_metadata_when_schema_changed_across_executions() {
169168
// Given
170169
CqlSession session = sessionRule.session();
171170
PreparedStatement ps = session.prepare("SELECT * FROM prepared_statement_test WHERE a = ?");
@@ -463,11 +462,13 @@ private void should_not_store_metadata_for_conditional_updates(CqlSession sessio
463462
} else if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) {
464463
assertThat(nextRow.isNull("d")).isTrue();
465464
assertThat(ps.getResultSetDefinitions()).hasSize(5);
466-
assertThat(Bytes.toHexString(ps.getResultMetadataId())).isNotEqualTo(Bytes.toHexString(idBefore));
465+
assertThat(Bytes.toHexString(ps.getResultMetadataId()))
466+
.isNotEqualTo(Bytes.toHexString(idBefore));
467467
} else {
468468
assertThat(nextRow.isNull("d")).isTrue();
469469
assertThat(ps.getResultSetDefinitions()).hasSize(0);
470-
assertThat(Bytes.toHexString(ps.getResultMetadataId())).isEqualTo(Bytes.toHexString(idBefore));
470+
assertThat(Bytes.toHexString(ps.getResultMetadataId()))
471+
.isEqualTo(Bytes.toHexString(idBefore));
471472
}
472473
}
473474

@@ -712,7 +713,7 @@ private static long getPreparedCacheSize(CqlSession session) {
712713

713714
private static boolean hasNoScyllaMetadataIdSupport() {
714715
return CcmBridge.isDistributionOf(BackendType.SCYLLA)
715-
&& CcmBridge.getScyllaVersion().isPresent()
716-
&& CcmBridge.getScyllaVersion().get().compareTo(SCYLLA_METADATA_ID_SUPPORT_VERSION) < 0;
716+
&& CcmBridge.getScyllaVersion().isPresent()
717+
&& CcmBridge.getScyllaVersion().get().compareTo(SCYLLA_METADATA_ID_SUPPORT_VERSION) < 0;
717718
}
718719
}

0 commit comments

Comments
 (0)