diff --git a/bom/pom.xml b/bom/pom.xml index 927bedaed02..0820c46f02c 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -81,9 +81,9 @@ 4.19.0.2-SNAPSHOT - com.datastax.oss + com.scylladb native-protocol - 1.5.2 + 1.5.2.0 diff --git a/core-shaded/pom.xml b/core-shaded/pom.xml index f0c17c0086e..6e8baa0a4c7 100644 --- a/core-shaded/pom.xml +++ b/core-shaded/pom.xml @@ -58,7 +58,7 @@ option of the shade plugin because it promotes all dependencies, even nested ones, to top level). --> - com.datastax.oss + com.scylladb native-protocol diff --git a/core/pom.xml b/core/pom.xml index 65ec3511345..0855ed77a8c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -46,7 +46,7 @@ - com.datastax.oss + com.scylladb native-protocol diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java index 0d29b59c2ad..c89401fa9e9 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java @@ -47,6 +47,7 @@ import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import com.datastax.oss.driver.shaded.guava.common.base.Preconditions; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -330,7 +331,7 @@ ChannelInitializer initializer( CompletableFuture resultFuture) { return new ChannelFactoryInitializer( endPoint, protocolVersion, options, nodeMetricUpdater, resultFuture); - }; + } class ChannelFactoryInitializer extends ChannelInitializer { @@ -422,10 +423,10 @@ protected void initChannel(Channel channel) { pipeline .addLast( FRAME_TO_BYTES_ENCODER_NAME, - new FrameEncoder(context.getFrameCodec(), maxFrameLength)) + new FrameEncoder(context.getFrameCodec(), ProtocolFeatures.EMPTY, maxFrameLength)) .addLast( BYTES_TO_FRAME_DECODER_NAME, - new FrameDecoder(context.getFrameCodec(), maxFrameLength)) + new FrameDecoder(context.getFrameCodec(), ProtocolFeatures.EMPTY, maxFrameLength)) // Note: HeartbeatHandler is inserted here once init completes .addLast(INFLIGHT_HANDLER_NAME, inFlightHandler) .addLast(INIT_HANDLER_NAME, initHandler); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java index f0e0176c144..bed12ebad09 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java @@ -154,7 +154,7 @@ public ProtocolFeatureStore getSupportedFeatures() { ProtocolFeatureStore fromChannel = ProtocolFeatureStore.loadFromChannel(channel); if (fromChannel == null) { - return ProtocolFeatureStore.Empty; + return ProtocolFeatureStore.EMPTY; } // Features can't be renegotiated. // Once features is populated into channel it is enough to update cache and no need to diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 248b51c89f1..04809b400d9 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -37,6 +37,8 @@ import com.datastax.oss.driver.internal.core.DefaultProtocolFeature; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.protocol.BytesToSegmentDecoder; +import com.datastax.oss.driver.internal.core.protocol.FrameDecoder; +import com.datastax.oss.driver.internal.core.protocol.FrameEncoder; import com.datastax.oss.driver.internal.core.protocol.FrameToSegmentEncoder; import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore; import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder; @@ -46,6 +48,7 @@ import com.datastax.oss.protocol.internal.Message; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.ProtocolConstants.ErrorCode; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import com.datastax.oss.protocol.internal.request.AuthResponse; import com.datastax.oss.protocol.internal.request.Options; import com.datastax.oss.protocol.internal.request.Query; @@ -91,7 +94,7 @@ class ProtocolInitHandler extends ConnectInitHandler { private String logPrefix; private ChannelHandlerContext ctx; private final boolean querySupportedOptions; - private ProtocolFeatureStore featureStore; + private ProtocolFeatureStore featureStore = ProtocolFeatureStore.EMPTY; /** * @param querySupportedOptions whether to send OPTIONS as the first message, to request which @@ -218,19 +221,19 @@ void onResponse(Message response) { ProtocolUtils.opcodeString(response.opcode)); try { if (step == Step.OPTIONS && response instanceof Supported) { - channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options); - Supported res = (Supported) response; - featureStore = ProtocolFeatureStore.parseSupportedOptions(res.options); + Supported supported = (Supported) response; + channel.attr(DriverChannel.OPTIONS_KEY).set(supported.options); + featureStore = ProtocolFeatureStore.parseSupportedOptions(supported.options); featureStore.storeInChannel(channel); step = Step.STARTUP; send(); } else if (step == Step.STARTUP && response instanceof Ready) { - maybeSwitchToModernFraming(); + maybeUpdatePipeline(); context.getAuthProvider().ifPresent(provider -> provider.onMissingChallenge(endPoint)); step = Step.GET_CLUSTER_NAME; send(); } else if (step == Step.STARTUP && response instanceof Authenticate) { - maybeSwitchToModernFraming(); + maybeUpdatePipeline(); Authenticate authenticate = (Authenticate) response; authenticator = buildAuthenticator(endPoint, authenticate.authenticator); authenticator @@ -396,11 +399,18 @@ public String toString() { } /** - * Rearranges the pipeline to deal with the new framing structure in protocol v5 and above. The + * Conditionally rebuilds pipeline. + * + *

Rearranges the pipeline to deal with the new framing structure in protocol v5 and above. The * first messages still use the legacy format, we only do this after a successful response to the * first STARTUP message. + * + *

If SCYLLA_USE_METADATA_ID feature was negotiated we need to replace {@link + * FrameEncoder} and {@link FrameDecoder} handlers with instances aware of a negotiated protocol + * feature. */ - private void maybeSwitchToModernFraming() { + private void maybeUpdatePipeline() { + ProtocolFeatures protocolFeatures = featureStore.getProtocolFeatures(); if (context .getProtocolVersionRegistry() .supports(initialProtocolVersion, DefaultProtocolFeature.MODERN_FRAMING)) { @@ -428,6 +438,26 @@ private void maybeSwitchToModernFraming() { ChannelFactory.BYTES_TO_SEGMENT_DECODER_NAME, ChannelFactory.SEGMENT_TO_FRAME_DECODER_NAME, new SegmentToFrameDecoder(context.getFrameCodec(), logPrefix)); + } else if (protocolFeatures.isScyllaUseMetadataId()) { + int maxFrameLength = + (int) + context + .getConfig() + .getDefaultProfile() + .getBytes(DefaultDriverOption.PROTOCOL_MAX_FRAME_LENGTH); + + ChannelPipeline pipeline = ctx.pipeline(); + pipeline.replace( + ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME, + ChannelFactory.FRAME_TO_BYTES_ENCODER_NAME, + new FrameEncoder( + context.getFrameCodec(), + protocolFeatures, // Passing updated protocol features to alter codecs behaviors + maxFrameLength)); + pipeline.replace( + ChannelFactory.BYTES_TO_FRAME_DECODER_NAME, + ChannelFactory.BYTES_TO_FRAME_DECODER_NAME, + new FrameDecoder(context.getFrameCodec(), protocolFeatures, maxFrameLength)); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoder.java index 20816ba581b..add57fa9fc6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoder.java @@ -22,6 +22,7 @@ import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.FrameCodec; import com.datastax.oss.protocol.internal.ProtocolConstants; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import com.datastax.oss.protocol.internal.response.Error; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -41,11 +42,16 @@ public class FrameDecoder extends LengthFieldBasedFrameDecoder { private static final int LENGTH_FIELD_LENGTH = 4; private final FrameCodec frameCodec; + private final ProtocolFeatures protocolFeatures; private boolean isFirstResponse; - public FrameDecoder(FrameCodec frameCodec, int maxFrameLengthInBytes) { + public FrameDecoder( + FrameCodec frameCodec, + ProtocolFeatures protocolFeatures, + int maxFrameLengthInBytes) { super(maxFrameLengthInBytes, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, 0, 0, true); this.frameCodec = frameCodec; + this.protocolFeatures = protocolFeatures; } @Override @@ -87,7 +93,7 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception ByteBuf buffer = (ByteBuf) super.decode(ctx, in); return (buffer == null) ? null // did not receive whole frame yet, keep reading - : frameCodec.decode(buffer); + : frameCodec.decode(buffer, protocolFeatures); } catch (Exception e) { // If decoding failed, try to read at least the stream id, so that the error can be // propagated to the client request matching that id (otherwise we have to fail all diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameEncoder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameEncoder.java index 6504ab29728..ca69afeb9da 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameEncoder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameEncoder.java @@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.connection.FrameTooLongException; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.FrameCodec; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -32,17 +33,20 @@ public class FrameEncoder extends MessageToMessageEncoder { private final FrameCodec frameCodec; + private final ProtocolFeatures protocolFeatures; private final int maxFrameLength; - public FrameEncoder(FrameCodec frameCodec, int maxFrameLength) { + public FrameEncoder( + FrameCodec frameCodec, ProtocolFeatures protocolFeatures, int maxFrameLength) { super(Frame.class); this.frameCodec = frameCodec; + this.protocolFeatures = protocolFeatures; this.maxFrameLength = maxFrameLength; } @Override protected void encode(ChannelHandlerContext ctx, Frame frame, List out) throws Exception { - ByteBuf buffer = frameCodec.encode(frame); + ByteBuf buffer = frameCodec.encode(frame, protocolFeatures); int actualLength = buffer.readableBytes(); if (actualLength > maxFrameLength) { throw new FrameTooLongException( diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/MetadataIdInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/MetadataIdInfo.java new file mode 100644 index 00000000000..72dd1a31dd4 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/MetadataIdInfo.java @@ -0,0 +1,26 @@ +package com.datastax.oss.driver.internal.core.protocol; + +import java.util.List; +import java.util.Map; + +public class MetadataIdInfo { + private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY = "SCYLLA_USE_METADATA_ID"; + private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE = ""; + + private MetadataIdInfo() {} + + public static boolean loadFromSupportedOptions(Map> supported) { + if (!supported.containsKey(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY)) { + return false; + } + List values = supported.get(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY); + return values != null + && values.size() == 1 + && values.get(0).equals(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE); + } + + public static void populateStartupOptions(Map options) { + options.put( + SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY, SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java index 1d026d10dd0..87134d32030 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java @@ -1,5 +1,6 @@ package com.datastax.oss.driver.internal.core.protocol; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.channel.Channel; import io.netty.util.AttributeKey; @@ -13,14 +14,21 @@ public class ProtocolFeatureStore { private final LwtInfo lwtInfo; private final ShardingInfo.ConnectionShardingInfo shardingInfo; private final TabletInfo tabletInfo; + private final boolean metadataIdEnabled; + private ProtocolFeatures protocolFeatures; - public static final ProtocolFeatureStore Empty = new ProtocolFeatureStore(null, null, null); + public static final ProtocolFeatureStore EMPTY = + new ProtocolFeatureStore(null, null, null, false); ProtocolFeatureStore( - LwtInfo lwtInfo, ShardingInfo.ConnectionShardingInfo shardingInfo, TabletInfo tabletInfo) { + LwtInfo lwtInfo, + ShardingInfo.ConnectionShardingInfo shardingInfo, + TabletInfo tabletInfo, + boolean metadataIdEnabled) { this.lwtInfo = lwtInfo; this.shardingInfo = shardingInfo; this.tabletInfo = tabletInfo; + this.metadataIdEnabled = metadataIdEnabled; } public LwtInfo getLwtFeatureInfo() { @@ -35,12 +43,17 @@ public TabletInfo getTabletFeatureInfo() { return tabletInfo; } + public boolean isMetadataIdEnabled() { + return metadataIdEnabled; + } + public static ProtocolFeatureStore parseSupportedOptions( @NonNull Map> options) { LwtInfo lwtInfo = LwtInfo.loadFromSupportedOptions(options); ShardingInfo.ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(options); TabletInfo tabletInfo = TabletInfo.loadFromSupportedOptions(options); - return new ProtocolFeatureStore(lwtInfo, shardingInfo, tabletInfo); + boolean metadataIdEnabled = MetadataIdInfo.loadFromSupportedOptions(options); + return new ProtocolFeatureStore(lwtInfo, shardingInfo, tabletInfo, metadataIdEnabled); } public void populateStartupOptions(@NonNull Map options) { @@ -50,6 +63,9 @@ public void populateStartupOptions(@NonNull Map options) { if (tabletInfo != null && tabletInfo.isEnabled()) { TabletInfo.populateStartupOptions(options); } + if (metadataIdEnabled) { + MetadataIdInfo.populateStartupOptions(options); + } } public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) { @@ -59,4 +75,19 @@ public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) { public void storeInChannel(@NonNull Channel channel) { channel.attr(ProtocolFeatureStore.CHANNEL_KEY).set(this); } + + public ProtocolFeatures getProtocolFeatures() { + if (protocolFeatures == null) { + protocolFeatures = buildProtocolFeatures(); + } + return protocolFeatures; + } + + private ProtocolFeatures buildProtocolFeatures() { + if (metadataIdEnabled) { + return new ProtocolFeatures.Builder().setScyllaUseMetadataId().build(); + } else { + return ProtocolFeatures.EMPTY; + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoder.java index b15a17bb87f..3e1a5e27080 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoder.java @@ -19,6 +19,7 @@ import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.FrameCodec; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import com.datastax.oss.protocol.internal.Segment; import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.buffer.ByteBuf; @@ -73,7 +74,7 @@ private void decodeSelfContained(Segment segment, List out) { int frameCount = 0; try { do { - Frame frame = frameCodec.decode(payload); + Frame frame = frameCodec.decode(payload, ProtocolFeatures.EMPTY); LOG.trace( "[{}] Decoded response frame {} from self-contained segment", logPrefix, @@ -110,7 +111,7 @@ private void decodeSlice(Segment segment, ByteBufAllocator allocator, L encodedFrame.addComponents(true, accumulatedSlices); Frame frame; try { - frame = frameCodec.decode(encodedFrame); + frame = frameCodec.decode(encodedFrame, ProtocolFeatures.EMPTY); } finally { encodedFrame.release(); // Reset our state diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java index 4835e0ac7be..1f87e23f647 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java @@ -84,7 +84,7 @@ public class ProtocolInitHandlerTest extends ChannelHandlerTestBase { @Mock private DriverExecutionProfile defaultProfile; @Mock private Appender appender; - private ProtocolVersionRegistry protocolVersionRegistry = + private final ProtocolVersionRegistry protocolVersionRegistry = new DefaultProtocolVersionRegistry("test"); private HeartbeatHandler heartbeatHandler; @@ -365,7 +365,7 @@ public void should_invoke_auth_provider_when_server_does_not_send_challenge() { } @Test - public void should_fail_to_initialize_if_server_sends_auth_error() throws Throwable { + public void should_fail_to_initialize_if_server_sends_auth_error() { channel .pipeline() .addLast( @@ -443,7 +443,7 @@ public void should_check_cluster_name_if_provided() { } @Test - public void should_fail_to_initialize_if_cluster_name_does_not_match() throws Throwable { + public void should_fail_to_initialize_if_cluster_name_does_not_match() { channel .pipeline() .addLast( diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoderTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoderTest.java index 0ab61771da0..1388b51b8aa 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoderTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoderTest.java @@ -26,6 +26,7 @@ import com.datastax.oss.protocol.internal.Compressor; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.FrameCodec; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import com.datastax.oss.protocol.internal.response.AuthSuccess; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -63,7 +64,7 @@ public void setup() { @Test public void should_decode_valid_payload() { // Given - FrameDecoder decoder = new FrameDecoder(frameCodec, 1024); + FrameDecoder decoder = new FrameDecoder(frameCodec, ProtocolFeatures.EMPTY, 1024); channel.pipeline().addLast(decoder); // When @@ -83,7 +84,8 @@ public void should_decode_valid_payload() { @Test public void should_fail_to_decode_if_payload_is_valid_but_too_long() { // Given - FrameDecoder decoder = new FrameDecoder(frameCodec, VALID_PAYLOAD.readableBytes() - 1); + FrameDecoder decoder = + new FrameDecoder(frameCodec, ProtocolFeatures.EMPTY, VALID_PAYLOAD.readableBytes() - 1); channel.pipeline().addLast(decoder); // When @@ -102,7 +104,7 @@ public void should_fail_to_decode_if_payload_is_valid_but_too_long() { @Test public void should_fail_to_decode_if_payload_cannot_be_decoded() { // Given - FrameDecoder decoder = new FrameDecoder(frameCodec, 1024); + FrameDecoder decoder = new FrameDecoder(frameCodec, ProtocolFeatures.EMPTY, 1024); channel.pipeline().addLast(decoder); // When diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoderTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoderTest.java index 2886adeab4e..5f2da481241 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoderTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoderTest.java @@ -25,6 +25,7 @@ import com.datastax.oss.protocol.internal.FrameCodec; import com.datastax.oss.protocol.internal.Message; import com.datastax.oss.protocol.internal.ProtocolConstants; +import com.datastax.oss.protocol.internal.ProtocolFeatures; import com.datastax.oss.protocol.internal.ProtocolV5ClientCodecs; import com.datastax.oss.protocol.internal.ProtocolV5ServerCodecs; import com.datastax.oss.protocol.internal.Segment; @@ -93,6 +94,6 @@ private static ByteBuf encodeFrame(Message message) { Collections.emptyMap(), Collections.emptyList(), message); - return FRAME_CODEC.encode(frame); + return FRAME_CODEC.encode(frame, ProtocolFeatures.EMPTY); } } diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index fb7902cafed..234540d3bb6 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -194,7 +194,7 @@ test - com.datastax.oss.simulacron + com.scylladb.oss.simulacron simulacron-native-server test diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java index 86824c42cbc..dbf6ceb7dd5 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java @@ -30,6 +30,7 @@ import com.codahale.metrics.Gauge; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.Version; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; @@ -57,11 +58,11 @@ import com.google.common.collect.ImmutableList; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.CompletionStage; import junit.framework.TestCase; import org.assertj.core.api.AbstractThrowableAssert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -79,9 +80,12 @@ @Category(ParallelizableTests.class) public class PreparedStatementIT { - private CcmRule ccmRule = CcmRule.getInstance(); + private static final Version SCYLLA_METADATA_ID_SUPPORT_VERSION = + Objects.requireNonNull(Version.parse("2025.3")); - private SessionRule sessionRule = + private final CcmRule ccmRule = CcmRule.getInstance(); + + private final SessionRule sessionRule = SessionRule.builder(ccmRule) .withConfigLoader( SessionUtils.configLoaderBuilder() @@ -165,7 +169,7 @@ public void should_update_metadata_when_schema_changed_across_executions() { CqlSession session = sessionRule.session(); PreparedStatement ps = session.prepare("SELECT * FROM prepared_statement_test WHERE a = ?"); ByteBuffer idBefore = ps.getResultMetadataId(); - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not support CQL5 extensions and metadata id assertThat(idBefore).isNull(); } @@ -180,7 +184,7 @@ public void should_update_metadata_when_schema_changed_across_executions() { // Then ByteBuffer idAfter = ps.getResultMetadataId(); - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not support CQL5 extensions and metadata id assertThat(idAfter).isNull(); for (ColumnDefinitions columnDefinitions : @@ -213,7 +217,7 @@ public void should_update_metadata_when_schema_changed_across_pages() { CqlSession session = sessionRule.session(); PreparedStatement ps = session.prepare("SELECT * FROM prepared_statement_test"); ByteBuffer idBefore = ps.getResultMetadataId(); - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not support CQL5 and result metadata id assertThat(idBefore).isNull(); } @@ -250,7 +254,7 @@ public void should_update_metadata_when_schema_changed_across_pages() { assertThat(rows.getColumnDefinitions().get("d").getType()).isEqualTo(DataTypes.INT); // Should have updated the prepared statement too ByteBuffer idAfter = ps.getResultMetadataId(); - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not support CQL5 and result metadata id assertThat(idAfter).isNull(); assertThat(ps.getResultSetDefinitions()).hasSize(3); @@ -275,7 +279,7 @@ public void should_update_metadata_when_schema_changed_across_sessions() { ByteBuffer id1a = ps1.getResultMetadataId(); ByteBuffer id2a = ps2.getResultMetadataId(); - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not support CQL5 extensions and metadata id assertThat(id1a).isNull(); assertThat(id2a).isNull(); @@ -299,7 +303,7 @@ public void should_update_metadata_when_schema_changed_across_sessions() { ByteBuffer id2b = ps2.getResultMetadataId(); // Then - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not support CQL5 extensions and metadata id assertThat(id1b).isNull(); assertThat(id2b).isNull(); @@ -440,7 +444,7 @@ private void should_not_store_metadata_for_conditional_updates(CqlSession sessio // Then // Failed conditional update => regular metadata that should also contain the new column assertThat(rs.wasApplied()).isFalse(); - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not update column definitions is such case assertThat(rs.getColumnDefinitions()).hasSize(4); } else { @@ -450,16 +454,22 @@ private void should_not_store_metadata_for_conditional_updates(CqlSession sessio assertThat(nextRow.getBoolean("[applied]")).isFalse(); assertThat(nextRow.getInt("a")).isEqualTo(5); assertThat(nextRow.getInt("b")).isEqualTo(5); - if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + if (hasNoScyllaMetadataIdSupport()) { // Scylla does not support CQL5 and metadata id, that is why response metadata does not // contain "d" assertThrows(IllegalArgumentException.class, () -> nextRow.isNull("d")); assertThat(ps.getResultSetDefinitions()).hasSize(4); + } else if (CcmBridge.isDistributionOf(BackendType.SCYLLA)) { + assertThat(nextRow.isNull("d")).isTrue(); + assertThat(ps.getResultSetDefinitions()).hasSize(5); + assertThat(Bytes.toHexString(ps.getResultMetadataId())) + .isNotEqualTo(Bytes.toHexString(idBefore)); } else { assertThat(nextRow.isNull("d")).isTrue(); assertThat(ps.getResultSetDefinitions()).hasSize(0); + assertThat(Bytes.toHexString(ps.getResultMetadataId())) + .isEqualTo(Bytes.toHexString(idBefore)); } - assertThat(Bytes.toHexString(ps.getResultMetadataId())).isEqualTo(Bytes.toHexString(idBefore)); } @Test @@ -563,6 +573,7 @@ public void should_create_separate_instances_for_different_statement_parameters( // Add version bounds to the DSE requirement if there is a version containing fix for // CASSANDRA-15252 + @BackendRequirement( type = BackendType.DSE, description = "No DSE version contains fix for CASSANDRA-15252") @@ -572,8 +583,8 @@ public void should_create_separate_instances_for_different_statement_parameters( minInclusive = "3.11.0", maxExclusive = "3.11.12") @BackendRequirement(type = BackendType.CASSANDRA, minInclusive = "4.0.0", maxExclusive = "4.0.2") + @BackendRequirement(type = BackendType.SCYLLA) @Test - @Ignore("@IntegrationTestDisabledCassandra3Failure") public void should_fail_fast_if_id_changes_on_reprepare() { assertableReprepareAfterIdChange() .isInstanceOf(IllegalStateException.class) @@ -699,4 +710,10 @@ private static long getPreparedCacheSize(CqlSession session) { "Could not access metric " + DefaultSessionMetric.CQL_PREPARED_CACHE_SIZE.getPath())); } + + private static boolean hasNoScyllaMetadataIdSupport() { + return CcmBridge.isDistributionOf(BackendType.SCYLLA) + && CcmBridge.getScyllaVersion().isPresent() + && CcmBridge.getScyllaVersion().get().compareTo(SCYLLA_METADATA_ID_SUPPORT_VERSION) < 0; + } } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/mapper/ProfileIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/mapper/ProfileIT.java index 3ed2a48cced..5e422805a16 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/mapper/ProfileIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/mapper/ProfileIT.java @@ -306,8 +306,9 @@ private static void primeUpdateQuery() { when(query( "UPDATE ks.simple SET data=:data WHERE pk=:pk", Lists.newArrayList( + com.datastax.oss.simulacron.common.codec.ConsistencyLevel.LOCAL_ONE, com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ONE, - com.datastax.oss.simulacron.common.codec.ConsistencyLevel.ANY), + com.datastax.oss.simulacron.common.codec.ConsistencyLevel.TWO), params, paramTypes)) .then(noRows())); diff --git a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java index 713eb68f207..911d4e544f5 100644 --- a/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java +++ b/osgi-tests/src/test/java/com/datastax/oss/driver/internal/osgi/support/BundleOptions.java @@ -40,7 +40,7 @@ public static CompositeOption commonBundles() { mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), mavenBundle("org.hdrhistogram", "HdrHistogram").versionAsInProject(), mavenBundle("com.typesafe", "config").versionAsInProject(), - mavenBundle("com.datastax.oss", "native-protocol").versionAsInProject(), + mavenBundle("com.scylladb", "native-protocol").versionAsInProject(), logbackBundles(), debugOptions()); } diff --git a/pom.xml b/pom.xml index 674abd8f7a0..aa4a5130e64 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ 7.0.5 4.13.4 2.6.4 - 0.11.0 + 0.13.0.0 1.1.4 2.31 2.5.0 @@ -99,6 +99,7 @@ false ${skipTests} false + @@ -249,7 +250,7 @@ ${rxjava.version} - com.datastax.oss.simulacron + com.scylladb.oss.simulacron simulacron-native-server ${simulacron.version} diff --git a/test-infra/pom.xml b/test-infra/pom.xml index 6d5425456e9..14d704d535a 100644 --- a/test-infra/pom.xml +++ b/test-infra/pom.xml @@ -65,7 +65,7 @@ - com.datastax.oss.simulacron + com.scylladb.oss.simulacron simulacron-native-server true diff --git a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java index 0d066ca45c1..011c77c5a27 100644 --- a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java +++ b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java @@ -284,7 +284,7 @@ private static Version parseCcmVersion() { return result; } - public Optional getScyllaVersion() { + public static Optional getScyllaVersion() { return isDistributionOf(BackendType.SCYLLA) ? Optional.of(VERSION) : Optional.empty(); } @@ -628,12 +628,12 @@ public String getNodeIpAddress(int nodeId) { return ipPrefix + nodeId; } - private static String IN_MS_STR = "_in_ms"; - private static int IN_MS_STR_LENGTH = IN_MS_STR.length(); - private static String ENABLE_STR = "enable_"; - private static int ENABLE_STR_LENGTH = ENABLE_STR.length(); - private static String IN_KB_STR = "_in_kb"; - private static int IN_KB_STR_LENGTH = IN_KB_STR.length(); + private static final String IN_MS_STR = "_in_ms"; + private static final int IN_MS_STR_LENGTH = IN_MS_STR.length(); + private static final String ENABLE_STR = "enable_"; + private static final int ENABLE_STR_LENGTH = ENABLE_STR.length(); + private static final String IN_KB_STR = "_in_kb"; + private static final int IN_KB_STR_LENGTH = IN_KB_STR.length(); @SuppressWarnings("unused") private String getConfigKey(String originalKey, Object originalValue, Version cassandraVersion) {