Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@
<version>4.19.0.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<groupId>com.scylladb</groupId>
<artifactId>native-protocol</artifactId>
<version>1.5.2</version>
<version>1.5.2.0</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
2 changes: 1 addition & 1 deletion core-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
option of the shade plugin because it promotes all dependencies, even nested ones, to top level).
-->
<dependency>
<groupId>com.datastax.oss</groupId>
<groupId>com.scylladb</groupId>
<artifactId>native-protocol</artifactId>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.datastax.oss</groupId>
<groupId>com.scylladb</groupId>
<artifactId>native-protocol</artifactId>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.protocol.internal.ProtocolFeatures;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -330,7 +331,7 @@ ChannelInitializer<Channel> initializer(
CompletableFuture<DriverChannel> resultFuture) {
return new ChannelFactoryInitializer(
endPoint, protocolVersion, options, nodeMetricUpdater, resultFuture);
};
}

class ChannelFactoryInitializer extends ChannelInitializer<Channel> {

Expand Down Expand Up @@ -422,10 +423,10 @@ protected void initChannel(Channel channel) {
pipeline
.addLast(
FRAME_TO_BYTES_ENCODER_NAME,
new FrameEncoder(context.getFrameCodec(), maxFrameLength))
new FrameEncoder(context.getFrameCodec(), ProtocolFeatures.EMPTY, maxFrameLength))
.addLast(
BYTES_TO_FRAME_DECODER_NAME,
new FrameDecoder(context.getFrameCodec(), maxFrameLength))
new FrameDecoder(context.getFrameCodec(), ProtocolFeatures.EMPTY, maxFrameLength))
// Note: HeartbeatHandler is inserted here once init completes
.addLast(INFLIGHT_HANDLER_NAME, inFlightHandler)
.addLast(INIT_HANDLER_NAME, initHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public ProtocolFeatureStore getSupportedFeatures() {

ProtocolFeatureStore fromChannel = ProtocolFeatureStore.loadFromChannel(channel);
if (fromChannel == null) {
return ProtocolFeatureStore.Empty;
return ProtocolFeatureStore.EMPTY;
}
// Features can't be renegotiated.
// Once features is populated into channel it is enough to update cache and no need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.datastax.oss.driver.internal.core.DefaultProtocolFeature;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.protocol.BytesToSegmentDecoder;
import com.datastax.oss.driver.internal.core.protocol.FrameDecoder;
import com.datastax.oss.driver.internal.core.protocol.FrameEncoder;
import com.datastax.oss.driver.internal.core.protocol.FrameToSegmentEncoder;
import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore;
import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder;
Expand All @@ -46,6 +48,7 @@
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.ProtocolConstants.ErrorCode;
import com.datastax.oss.protocol.internal.ProtocolFeatures;
import com.datastax.oss.protocol.internal.request.AuthResponse;
import com.datastax.oss.protocol.internal.request.Options;
import com.datastax.oss.protocol.internal.request.Query;
Expand Down Expand Up @@ -91,7 +94,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
private String logPrefix;
private ChannelHandlerContext ctx;
private final boolean querySupportedOptions;
private ProtocolFeatureStore featureStore;
private ProtocolFeatureStore featureStore = ProtocolFeatureStore.EMPTY;

/**
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
Expand Down Expand Up @@ -218,19 +221,19 @@ void onResponse(Message response) {
ProtocolUtils.opcodeString(response.opcode));
try {
if (step == Step.OPTIONS && response instanceof Supported) {
channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options);
Supported res = (Supported) response;
featureStore = ProtocolFeatureStore.parseSupportedOptions(res.options);
Supported supported = (Supported) response;
channel.attr(DriverChannel.OPTIONS_KEY).set(supported.options);
featureStore = ProtocolFeatureStore.parseSupportedOptions(supported.options);
featureStore.storeInChannel(channel);
step = Step.STARTUP;
send();
} else if (step == Step.STARTUP && response instanceof Ready) {
maybeSwitchToModernFraming();
maybeUpdatePipeline();
context.getAuthProvider().ifPresent(provider -> provider.onMissingChallenge(endPoint));
step = Step.GET_CLUSTER_NAME;
send();
} else if (step == Step.STARTUP && response instanceof Authenticate) {
maybeSwitchToModernFraming();
maybeUpdatePipeline();
Authenticate authenticate = (Authenticate) response;
authenticator = buildAuthenticator(endPoint, authenticate.authenticator);
authenticator
Expand Down Expand Up @@ -396,11 +399,18 @@ public String toString() {
}

/**
* Rearranges the pipeline to deal with the new framing structure in protocol v5 and above. The
* Conditionally rebuilds pipeline.
*
* <p>Rearranges the pipeline to deal with the new framing structure in protocol v5 and above. The
* first messages still use the legacy format, we only do this after a successful response to the
* first STARTUP message.
*
* <p>If <code>SCYLLA_USE_METADATA_ID</code> feature was negotiated we need to replace {@link
* FrameEncoder} and {@link FrameDecoder} handlers with instances aware of a negotiated protocol
* feature.
*/
private void maybeSwitchToModernFraming() {
private void maybeUpdatePipeline() {
ProtocolFeatures protocolFeatures = featureStore.getProtocolFeatures();
if (context
.getProtocolVersionRegistry()
.supports(initialProtocolVersion, DefaultProtocolFeature.MODERN_FRAMING)) {
Expand Down Expand Up @@ -428,6 +438,26 @@ private void maybeSwitchToModernFraming() {
ChannelFactory.BYTES_TO_SEGMENT_DECODER_NAME,
ChannelFactory.SEGMENT_TO_FRAME_DECODER_NAME,
new SegmentToFrameDecoder(context.getFrameCodec(), logPrefix));
} else if (protocolFeatures.isScyllaUseMetadataId()) {
int maxFrameLength =
(int)
context
.getConfig()
.getDefaultProfile()
.getBytes(DefaultDriverOption.PROTOCOL_MAX_FRAME_LENGTH);

ChannelPipeline pipeline = ctx.pipeline();
pipeline.replace(
ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME,
ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME,
new FrameEncoder(
context.getFrameCodec(),
protocolFeatures, // Passing updated protocol features to alter codecs behaviors
maxFrameLength));
pipeline.replace(
ChannelFactory.BYTES_TO_FRAME_DECODER_NAME,
ChannelFactory.BYTES_TO_FRAME_DECODER_NAME,
new FrameDecoder(context.getFrameCodec(), protocolFeatures, maxFrameLength));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.FrameCodec;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.ProtocolFeatures;
import com.datastax.oss.protocol.internal.response.Error;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -41,11 +42,16 @@ public class FrameDecoder extends LengthFieldBasedFrameDecoder {
private static final int LENGTH_FIELD_LENGTH = 4;

private final FrameCodec<ByteBuf> frameCodec;
private final ProtocolFeatures protocolFeatures;
private boolean isFirstResponse;

public FrameDecoder(FrameCodec<ByteBuf> frameCodec, int maxFrameLengthInBytes) {
public FrameDecoder(
FrameCodec<ByteBuf> frameCodec,
ProtocolFeatures protocolFeatures,
int maxFrameLengthInBytes) {
super(maxFrameLengthInBytes, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, 0, 0, true);
this.frameCodec = frameCodec;
this.protocolFeatures = protocolFeatures;
}

@Override
Expand Down Expand Up @@ -87,7 +93,7 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
ByteBuf buffer = (ByteBuf) super.decode(ctx, in);
return (buffer == null)
? null // did not receive whole frame yet, keep reading
: frameCodec.decode(buffer);
: frameCodec.decode(buffer, protocolFeatures);
} catch (Exception e) {
// If decoding failed, try to read at least the stream id, so that the error can be
// propagated to the client request matching that id (otherwise we have to fail all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.FrameCodec;
import com.datastax.oss.protocol.internal.ProtocolFeatures;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -32,17 +33,20 @@
public class FrameEncoder extends MessageToMessageEncoder<Frame> {

private final FrameCodec<ByteBuf> frameCodec;
private final ProtocolFeatures protocolFeatures;
private final int maxFrameLength;

public FrameEncoder(FrameCodec<ByteBuf> frameCodec, int maxFrameLength) {
public FrameEncoder(
FrameCodec<ByteBuf> frameCodec, ProtocolFeatures protocolFeatures, int maxFrameLength) {
super(Frame.class);
this.frameCodec = frameCodec;
this.protocolFeatures = protocolFeatures;
this.maxFrameLength = maxFrameLength;
}

@Override
protected void encode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception {
ByteBuf buffer = frameCodec.encode(frame);
ByteBuf buffer = frameCodec.encode(frame, protocolFeatures);
int actualLength = buffer.readableBytes();
if (actualLength > maxFrameLength) {
throw new FrameTooLongException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.datastax.oss.driver.internal.core.protocol;

import java.util.List;
import java.util.Map;

public class MetadataIdInfo {
private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY = "SCYLLA_USE_METADATA_ID";
private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE = "";

private MetadataIdInfo() {}

public static boolean loadFromSupportedOptions(Map<String, List<String>> supported) {
if (!supported.containsKey(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY)) {
return false;
}
List<String> values = supported.get(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY);
return values != null
&& values.size() == 1
&& values.get(0).equals(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE);
}

public static void populateStartupOptions(Map<String, String> options) {
options.put(
SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY, SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.datastax.oss.driver.internal.core.protocol;

import com.datastax.oss.protocol.internal.ProtocolFeatures;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
Expand All @@ -13,14 +14,21 @@ public class ProtocolFeatureStore {
private final LwtInfo lwtInfo;
private final ShardingInfo.ConnectionShardingInfo shardingInfo;
private final TabletInfo tabletInfo;
private final boolean metadataIdEnabled;
private ProtocolFeatures protocolFeatures;

public static final ProtocolFeatureStore Empty = new ProtocolFeatureStore(null, null, null);
public static final ProtocolFeatureStore EMPTY =
new ProtocolFeatureStore(null, null, null, false);

ProtocolFeatureStore(
LwtInfo lwtInfo, ShardingInfo.ConnectionShardingInfo shardingInfo, TabletInfo tabletInfo) {
LwtInfo lwtInfo,
ShardingInfo.ConnectionShardingInfo shardingInfo,
TabletInfo tabletInfo,
boolean metadataIdEnabled) {
this.lwtInfo = lwtInfo;
this.shardingInfo = shardingInfo;
this.tabletInfo = tabletInfo;
this.metadataIdEnabled = metadataIdEnabled;
}

public LwtInfo getLwtFeatureInfo() {
Expand All @@ -35,12 +43,17 @@ public TabletInfo getTabletFeatureInfo() {
return tabletInfo;
}

public boolean isMetadataIdEnabled() {
return metadataIdEnabled;
}

public static ProtocolFeatureStore parseSupportedOptions(
@NonNull Map<String, List<String>> options) {
LwtInfo lwtInfo = LwtInfo.loadFromSupportedOptions(options);
ShardingInfo.ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(options);
TabletInfo tabletInfo = TabletInfo.loadFromSupportedOptions(options);
return new ProtocolFeatureStore(lwtInfo, shardingInfo, tabletInfo);
boolean metadataIdEnabled = MetadataIdInfo.loadFromSupportedOptions(options);
return new ProtocolFeatureStore(lwtInfo, shardingInfo, tabletInfo, metadataIdEnabled);
}

public void populateStartupOptions(@NonNull Map<String, String> options) {
Expand All @@ -50,6 +63,9 @@ public void populateStartupOptions(@NonNull Map<String, String> options) {
if (tabletInfo != null && tabletInfo.isEnabled()) {
TabletInfo.populateStartupOptions(options);
}
if (metadataIdEnabled) {
MetadataIdInfo.populateStartupOptions(options);
}
}

public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) {
Expand All @@ -59,4 +75,19 @@ public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) {
public void storeInChannel(@NonNull Channel channel) {
channel.attr(ProtocolFeatureStore.CHANNEL_KEY).set(this);
}

public ProtocolFeatures getProtocolFeatures() {
if (protocolFeatures == null) {
protocolFeatures = buildProtocolFeatures();
}
return protocolFeatures;
}

private ProtocolFeatures buildProtocolFeatures() {
if (metadataIdEnabled) {
return new ProtocolFeatures.Builder().setScyllaUseMetadataId().build();
} else {
return ProtocolFeatures.EMPTY;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.FrameCodec;
import com.datastax.oss.protocol.internal.ProtocolFeatures;
import com.datastax.oss.protocol.internal.Segment;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -73,7 +74,7 @@ private void decodeSelfContained(Segment<ByteBuf> segment, List<Object> out) {
int frameCount = 0;
try {
do {
Frame frame = frameCodec.decode(payload);
Frame frame = frameCodec.decode(payload, ProtocolFeatures.EMPTY);
LOG.trace(
"[{}] Decoded response frame {} from self-contained segment",
logPrefix,
Expand Down Expand Up @@ -110,7 +111,7 @@ private void decodeSlice(Segment<ByteBuf> segment, ByteBufAllocator allocator, L
encodedFrame.addComponents(true, accumulatedSlices);
Frame frame;
try {
frame = frameCodec.decode(encodedFrame);
frame = frameCodec.decode(encodedFrame, ProtocolFeatures.EMPTY);
} finally {
encodedFrame.release();
// Reset our state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class ProtocolInitHandlerTest extends ChannelHandlerTestBase {
@Mock private DriverExecutionProfile defaultProfile;
@Mock private Appender<ILoggingEvent> appender;

private ProtocolVersionRegistry protocolVersionRegistry =
private final ProtocolVersionRegistry protocolVersionRegistry =
new DefaultProtocolVersionRegistry("test");
private HeartbeatHandler heartbeatHandler;

Expand Down Expand Up @@ -365,7 +365,7 @@ public void should_invoke_auth_provider_when_server_does_not_send_challenge() {
}

@Test
public void should_fail_to_initialize_if_server_sends_auth_error() throws Throwable {
public void should_fail_to_initialize_if_server_sends_auth_error() {
channel
.pipeline()
.addLast(
Expand Down Expand Up @@ -443,7 +443,7 @@ public void should_check_cluster_name_if_provided() {
}

@Test
public void should_fail_to_initialize_if_cluster_name_does_not_match() throws Throwable {
public void should_fail_to_initialize_if_cluster_name_does_not_match() {
channel
.pipeline()
.addLast(
Expand Down
Loading
Loading