Skip to content

Commit 850cc81

Browse files
committed
[#596] Refactoring functions that update pipeline. Moving building of protocol features to ProtocolFeatureStore.
1 parent 184648c commit 850cc81

File tree

5 files changed

+156
-142
lines changed

5 files changed

+156
-142
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public ProtocolFeatureStore getSupportedFeatures() {
154154

155155
ProtocolFeatureStore fromChannel = ProtocolFeatureStore.loadFromChannel(channel);
156156
if (fromChannel == null) {
157-
return ProtocolFeatureStore.Empty;
157+
return ProtocolFeatureStore.EMPTY;
158158
}
159159
// Features can't be renegotiated.
160160
// Once features is populated into channel it is enough to update cache and no need to

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
4646
import com.datastax.oss.driver.internal.core.util.ProtocolUtils;
4747
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
48+
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
4849
import com.datastax.oss.protocol.internal.Message;
4950
import com.datastax.oss.protocol.internal.ProtocolConstants;
5051
import com.datastax.oss.protocol.internal.ProtocolConstants.ErrorCode;
@@ -225,16 +226,15 @@ void onResponse(Message response) {
225226
channel.attr(DriverChannel.OPTIONS_KEY).set(supported.options);
226227
featureStore = ProtocolFeatureStore.parseSupportedOptions(supported.options);
227228
featureStore.storeInChannel(channel);
228-
maybeUpdatePipelineWithProtocolOptions(featureStore.isMetadataIdEnabled());
229229
step = Step.STARTUP;
230230
send();
231231
} else if (step == Step.STARTUP && response instanceof Ready) {
232-
maybeSwitchToModernFraming();
232+
maybeUpdatePipeline();
233233
context.getAuthProvider().ifPresent(provider -> provider.onMissingChallenge(endPoint));
234234
step = Step.GET_CLUSTER_NAME;
235235
send();
236236
} else if (step == Step.STARTUP && response instanceof Authenticate) {
237-
maybeSwitchToModernFraming();
237+
maybeUpdatePipeline();
238238
Authenticate authenticate = (Authenticate) response;
239239
authenticator = buildAuthenticator(endPoint, authenticate.authenticator);
240240
authenticator
@@ -400,11 +400,18 @@ public String toString() {
400400
}
401401

402402
/**
403-
* Rearranges the pipeline to deal with the new framing structure in protocol v5 and above. The
403+
* Conditionally rebuilds pipeline.
404+
*
405+
* <p>Rearranges the pipeline to deal with the new framing structure in protocol v5 and above. The
404406
* first messages still use the legacy format, we only do this after a successful response to the
405407
* first STARTUP message.
408+
*
409+
* <p>If <code>SCYLLA_USE_METADATA_ID</code> feature was negotiated we need to replace {@link
410+
* FrameEncoder} and {@link FrameDecoder} handlers with instances aware of a negotiated protocol
411+
* feature.
406412
*/
407-
private void maybeSwitchToModernFraming() {
413+
private void maybeUpdatePipeline() {
414+
ProtocolFeatures protocolFeatures = featureStore.getProtocolFeatures();
408415
if (context
409416
.getProtocolVersionRegistry()
410417
.supports(initialProtocolVersion, DefaultProtocolFeature.MODERN_FRAMING)) {
@@ -432,20 +439,7 @@ private void maybeSwitchToModernFraming() {
432439
ChannelFactory.BYTES_TO_SEGMENT_DECODER_NAME,
433440
ChannelFactory.SEGMENT_TO_FRAME_DECODER_NAME,
434441
new SegmentToFrameDecoder(context.getFrameCodec(), logPrefix));
435-
}
436-
}
437-
438-
/**
439-
* If <code>SCYLLA_USE_METADATA_ID</code> feature was negotiated we need to replace {@link
440-
* FrameEncoder} and {@link FrameDecoder} handlers with instances aware of a negotiated protocol
441-
* feature.
442-
*
443-
* @param metadataIdEnabled indicates if feature is successfully negotiated
444-
*/
445-
private void maybeUpdatePipelineWithProtocolOptions(boolean metadataIdEnabled) {
446-
if (metadataIdEnabled) {
447-
ProtocolFeatures protocolFeatures =
448-
new ProtocolFeatures.Builder().setScyllaUseMetadataId().build();
442+
} else if (protocolFeatures.isScyllaUseMetadataId()) {
449443
int maxFrameLength =
450444
(int)
451445
context
@@ -457,7 +451,10 @@ private void maybeUpdatePipelineWithProtocolOptions(boolean metadataIdEnabled) {
457451
pipeline.replace(
458452
ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME,
459453
ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME,
460-
new FrameEncoder(context.getFrameCodec(), protocolFeatures, maxFrameLength));
454+
new FrameEncoder(
455+
context.getFrameCodec(),
456+
protocolFeatures, // Passing updated protocol features to alter codecs behaviors
457+
maxFrameLength));
461458
pipeline.replace(
462459
ChannelFactory.BYTES_TO_FRAME_DECODER_NAME,
463460
ChannelFactory.BYTES_TO_FRAME_DECODER_NAME,
@@ -468,4 +465,9 @@ private void maybeUpdatePipelineWithProtocolOptions(boolean metadataIdEnabled) {
468465
private String getString(List<ByteBuffer> row, int i) {
469466
return TypeCodecs.TEXT.decode(row.get(i), DefaultProtocolVersion.DEFAULT);
470467
}
468+
469+
@VisibleForTesting
470+
void setFeatureStore(ProtocolFeatureStore featureStore) {
471+
this.featureStore = featureStore;
472+
}
471473
}

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.datastax.oss.driver.internal.core.protocol;
22

3+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
34
import edu.umd.cs.findbugs.annotations.NonNull;
45
import io.netty.channel.Channel;
56
import io.netty.util.AttributeKey;
@@ -14,8 +15,9 @@ public class ProtocolFeatureStore {
1415
private final ShardingInfo.ConnectionShardingInfo shardingInfo;
1516
private final TabletInfo tabletInfo;
1617
private final boolean metadataIdEnabled;
18+
private ProtocolFeatures protocolFeatures;
1719

18-
public static final ProtocolFeatureStore Empty =
20+
public static final ProtocolFeatureStore EMPTY =
1921
new ProtocolFeatureStore(null, null, null, false);
2022

2123
ProtocolFeatureStore(
@@ -73,4 +75,19 @@ public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) {
7375
public void storeInChannel(@NonNull Channel channel) {
7476
channel.attr(ProtocolFeatureStore.CHANNEL_KEY).set(this);
7577
}
78+
79+
public ProtocolFeatures getProtocolFeatures() {
80+
if (protocolFeatures == null) {
81+
protocolFeatures = buildProtocolFeatures();
82+
}
83+
return protocolFeatures;
84+
}
85+
86+
private ProtocolFeatures buildProtocolFeatures() {
87+
if (metadataIdEnabled) {
88+
return new ProtocolFeatures.Builder().setScyllaUseMetadataId().build();
89+
} else {
90+
return ProtocolFeatures.EMPTY;
91+
}
92+
}
7693
}

core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryTestBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.datastax.oss.driver.internal.core.context.NettyOptions;
3535
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
3636
import com.datastax.oss.driver.internal.core.protocol.ByteBufPrimitiveCodec;
37+
import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore;
3738
import com.datastax.oss.protocol.internal.Compressor;
3839
import com.datastax.oss.protocol.internal.Frame;
3940
import com.datastax.oss.protocol.internal.FrameCodec;
@@ -271,6 +272,7 @@ protected void initChannel(Channel channel) throws Exception {
271272
options,
272273
heartbeatHandler,
273274
productType == null);
275+
initHandler.setFeatureStore(ProtocolFeatureStore.EMPTY);
274276
channel
275277
.pipeline()
276278
.addLast(ChannelFactory.INFLIGHT_HANDLER_NAME, inFlightHandler)

0 commit comments

Comments
 (0)