From 2bb29bc76d7cd406e83974fd782d8faef4876ef3 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Tue, 16 Sep 2025 11:28:01 -0400 Subject: [PATCH 1/4] Implement ProtocolFeatureStore Implement class to store information about all features negotiated. --- .../internal/core/channel/DriverChannel.java | 4 +- .../core/channel/ProtocolInitHandler.java | 16 +++---- .../internal/core/protocol/LwtInfo.java | 4 +- .../core/protocol/ProtocolFeatureStore.java | 42 +++++++++++++++++++ 4 files changed, 52 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java index afd5daee392..2032a27dbfd 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java @@ -32,6 +32,7 @@ import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.protocol.LwtInfo; +import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo; import com.datastax.oss.driver.internal.core.session.DefaultSession; @@ -63,7 +64,6 @@ public class DriverChannel { AttributeKey.valueOf("options"); static final AttributeKey SHARDING_INFO_KEY = AttributeKey.valueOf("sharding_info"); - static final AttributeKey LWT_INFO_KEY = AttributeKey.valueOf("lwt_info"); @SuppressWarnings("RedundantStringConstructorCall") static final Object GRACEFUL_CLOSE_MESSAGE = new String("GRACEFUL_CLOSE_MESSAGE"); @@ -159,7 +159,7 @@ public ShardingInfo getShardingInfo() { } public LwtInfo getLwtInfo() { - return channel.attr(LWT_INFO_KEY).get(); + return ProtocolFeatureStore.loadFromChannel(channel).getLwtFeatureInfo(); } /** diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 95cbf1e200b..4b82f5940a3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -23,8 +23,6 @@ */ package com.datastax.oss.driver.internal.core.channel; -import static com.datastax.oss.driver.internal.core.channel.DriverChannel.LWT_INFO_KEY; - import com.datastax.oss.driver.api.core.DefaultProtocolVersion; import com.datastax.oss.driver.api.core.InvalidKeyspaceException; import com.datastax.oss.driver.api.core.ProtocolVersion; @@ -40,7 +38,7 @@ import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.protocol.BytesToSegmentDecoder; import com.datastax.oss.driver.internal.core.protocol.FrameToSegmentEncoder; -import com.datastax.oss.driver.internal.core.protocol.LwtInfo; +import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore; import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder; import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder; import com.datastax.oss.driver.internal.core.protocol.ShardingInfo; @@ -96,7 +94,7 @@ class ProtocolInitHandler extends ConnectInitHandler { private String logPrefix; private ChannelHandlerContext ctx; private final boolean querySupportedOptions; - private LwtInfo lwtInfo; + private ProtocolFeatureStore featureStore; private TabletInfo tabletInfo; /** @@ -192,8 +190,8 @@ Message getRequest() { return request = Options.INSTANCE; case STARTUP: Map startupOptions = new HashMap<>(context.getStartupOptions()); - if (lwtInfo != null) { - lwtInfo.addOption(startupOptions); + if (featureStore != null) { + featureStore.populateStartupOptions(startupOptions); } if (tabletInfo != null && tabletInfo.isEnabled()) { TabletInfo.addOption(startupOptions); @@ -229,15 +227,13 @@ void onResponse(Message response) { if (step == Step.OPTIONS && response instanceof Supported) { channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options); Supported res = (Supported) response; + featureStore = ProtocolFeatureStore.parseSupportedOptions(res.options); ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(res.options); if (shardingInfo != null) { channel.attr(DriverChannel.SHARDING_INFO_KEY).set(shardingInfo); } - lwtInfo = LwtInfo.parseLwtInfo(res.options); - if (lwtInfo != null) { - channel.attr(LWT_INFO_KEY).set(lwtInfo); - } tabletInfo = TabletInfo.parseTabletInfo(res.options); + featureStore.storeInChannel(channel); step = Step.STARTUP; send(); } else if (step == Step.STARTUP && response instanceof Ready) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java index 5ac8abc2e53..2058e8d317f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/LwtInfo.java @@ -37,7 +37,7 @@ public boolean isLwt(int flags) { return (flags & mask) == mask; } - public static LwtInfo parseLwtInfo(Map> supported) { + public static LwtInfo loadFromSupportedOptions(Map> supported) { if (!supported.containsKey(SCYLLA_LWT_ADD_METADATA_MARK_KEY)) { return null; } @@ -67,7 +67,7 @@ public static LwtInfo parseLwtInfo(Map> supported) { return new LwtInfo((int) mask); } - public void addOption(Map options) { + public void populateStartupOptions(Map options) { options.put(SCYLLA_LWT_ADD_METADATA_MARK_KEY, Integer.toString(mask)); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java new file mode 100644 index 00000000000..ac15bee1751 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java @@ -0,0 +1,42 @@ +package com.datastax.oss.driver.internal.core.protocol; + +import edu.umd.cs.findbugs.annotations.NonNull; +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; +import java.util.List; +import java.util.Map; + +public class ProtocolFeatureStore { + private static final AttributeKey CHANNEL_KEY = + AttributeKey.valueOf("protocol_feature_store"); + + private final LwtInfo lwtInfo; + + ProtocolFeatureStore(LwtInfo lwtInfo) { + this.lwtInfo = lwtInfo; + } + + public LwtInfo getLwtFeatureInfo() { + return lwtInfo; + } + + public static ProtocolFeatureStore parseSupportedOptions( + @NonNull Map> options) { + LwtInfo lwtInfo = LwtInfo.loadFromSupportedOptions(options); + return new ProtocolFeatureStore(lwtInfo); + } + + public void populateStartupOptions(@NonNull Map options) { + if (lwtInfo != null) { + lwtInfo.populateStartupOptions(options); + } + } + + public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) { + return channel.attr(ProtocolFeatureStore.CHANNEL_KEY).get(); + } + + public void storeInChannel(@NonNull Channel channel) { + channel.attr(ProtocolFeatureStore.CHANNEL_KEY).set(this); + } +} From e193c801ec7be9668bd484f838616beaf46c8279 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Tue, 16 Sep 2025 11:38:12 -0400 Subject: [PATCH 2/4] Pull ShardingInfo into ProtocolFeatureStore --- .../driver/internal/core/channel/DriverChannel.java | 10 ++++------ .../internal/core/channel/ProtocolInitHandler.java | 6 ------ .../internal/core/protocol/ProtocolFeatureStore.java | 11 +++++++++-- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java index 2032a27dbfd..f3adb9a4907 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java @@ -62,8 +62,6 @@ public class DriverChannel { static final AttributeKey CLUSTER_NAME_KEY = AttributeKey.valueOf("cluster_name"); static final AttributeKey>> OPTIONS_KEY = AttributeKey.valueOf("options"); - static final AttributeKey SHARDING_INFO_KEY = - AttributeKey.valueOf("sharding_info"); @SuppressWarnings("RedundantStringConstructorCall") static final Object GRACEFUL_CLOSE_MESSAGE = new String("GRACEFUL_CLOSE_MESSAGE"); @@ -149,13 +147,13 @@ public Map> getOptions() { } public int getShardId() { - return channel.hasAttr(SHARDING_INFO_KEY) ? channel.attr(SHARDING_INFO_KEY).get().shardId : 0; + ConnectionShardingInfo info = ProtocolFeatureStore.loadFromChannel(channel).getShardingInfo(); + return info != null ? info.shardId : 0; } public ShardingInfo getShardingInfo() { - return channel.hasAttr(SHARDING_INFO_KEY) - ? channel.attr(SHARDING_INFO_KEY).get().shardingInfo - : null; + ConnectionShardingInfo info = ProtocolFeatureStore.loadFromChannel(channel).getShardingInfo(); + return info != null ? info.shardingInfo : null; } public LwtInfo getLwtInfo() { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 4b82f5940a3..238261bf009 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -41,8 +41,6 @@ import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore; import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder; import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder; -import com.datastax.oss.driver.internal.core.protocol.ShardingInfo; -import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo; import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.util.ProtocolUtils; import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions; @@ -228,10 +226,6 @@ void onResponse(Message response) { channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options); Supported res = (Supported) response; featureStore = ProtocolFeatureStore.parseSupportedOptions(res.options); - ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(res.options); - if (shardingInfo != null) { - channel.attr(DriverChannel.SHARDING_INFO_KEY).set(shardingInfo); - } tabletInfo = TabletInfo.parseTabletInfo(res.options); featureStore.storeInChannel(channel); step = Step.STARTUP; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java index ac15bee1751..974a5801080 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java @@ -11,19 +11,26 @@ public class ProtocolFeatureStore { AttributeKey.valueOf("protocol_feature_store"); private final LwtInfo lwtInfo; + private final ShardingInfo.ConnectionShardingInfo shardingInfo; - ProtocolFeatureStore(LwtInfo lwtInfo) { + ProtocolFeatureStore(LwtInfo lwtInfo, ShardingInfo.ConnectionShardingInfo shardingInfo) { this.lwtInfo = lwtInfo; + this.shardingInfo = shardingInfo; } public LwtInfo getLwtFeatureInfo() { return lwtInfo; } + public ShardingInfo.ConnectionShardingInfo getShardingInfo() { + return shardingInfo; + } + public static ProtocolFeatureStore parseSupportedOptions( @NonNull Map> options) { LwtInfo lwtInfo = LwtInfo.loadFromSupportedOptions(options); - return new ProtocolFeatureStore(lwtInfo); + ShardingInfo.ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(options); + return new ProtocolFeatureStore(lwtInfo, shardingInfo); } public void populateStartupOptions(@NonNull Map options) { From 7c9a37f2704d75644d72e11f7ff0a76c40bc7d90 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Tue, 16 Sep 2025 11:49:27 -0400 Subject: [PATCH 3/4] Pull TabletInfo into ProtocolFeatureStore --- .../core/channel/ProtocolInitHandler.java | 6 ------ .../core/protocol/ProtocolFeatureStore.java | 15 +++++++++++++-- .../driver/internal/core/protocol/TabletInfo.java | 4 ++-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 238261bf009..248b51c89f1 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -41,7 +41,6 @@ import com.datastax.oss.driver.internal.core.protocol.ProtocolFeatureStore; import com.datastax.oss.driver.internal.core.protocol.SegmentToBytesEncoder; import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder; -import com.datastax.oss.driver.internal.core.protocol.TabletInfo; import com.datastax.oss.driver.internal.core.util.ProtocolUtils; import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions; import com.datastax.oss.protocol.internal.Message; @@ -93,7 +92,6 @@ class ProtocolInitHandler extends ConnectInitHandler { private ChannelHandlerContext ctx; private final boolean querySupportedOptions; private ProtocolFeatureStore featureStore; - private TabletInfo tabletInfo; /** * @param querySupportedOptions whether to send OPTIONS as the first message, to request which @@ -191,9 +189,6 @@ Message getRequest() { if (featureStore != null) { featureStore.populateStartupOptions(startupOptions); } - if (tabletInfo != null && tabletInfo.isEnabled()) { - TabletInfo.addOption(startupOptions); - } return request = new Startup(startupOptions); case GET_CLUSTER_NAME: return request = CLUSTER_NAME_QUERY; @@ -226,7 +221,6 @@ void onResponse(Message response) { channel.attr(DriverChannel.OPTIONS_KEY).set(((Supported) response).options); Supported res = (Supported) response; featureStore = ProtocolFeatureStore.parseSupportedOptions(res.options); - tabletInfo = TabletInfo.parseTabletInfo(res.options); featureStore.storeInChannel(channel); step = Step.STARTUP; send(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java index 974a5801080..d63f80774a8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java @@ -12,10 +12,13 @@ public class ProtocolFeatureStore { private final LwtInfo lwtInfo; private final ShardingInfo.ConnectionShardingInfo shardingInfo; + private final TabletInfo tabletInfo; - ProtocolFeatureStore(LwtInfo lwtInfo, ShardingInfo.ConnectionShardingInfo shardingInfo) { + ProtocolFeatureStore( + LwtInfo lwtInfo, ShardingInfo.ConnectionShardingInfo shardingInfo, TabletInfo tabletInfo) { this.lwtInfo = lwtInfo; this.shardingInfo = shardingInfo; + this.tabletInfo = tabletInfo; } public LwtInfo getLwtFeatureInfo() { @@ -26,17 +29,25 @@ public ShardingInfo.ConnectionShardingInfo getShardingInfo() { return shardingInfo; } + public TabletInfo getTabletFeatureInfo() { + return tabletInfo; + } + public static ProtocolFeatureStore parseSupportedOptions( @NonNull Map> options) { LwtInfo lwtInfo = LwtInfo.loadFromSupportedOptions(options); ShardingInfo.ConnectionShardingInfo shardingInfo = ShardingInfo.parseShardingInfo(options); - return new ProtocolFeatureStore(lwtInfo, shardingInfo); + TabletInfo tabletInfo = TabletInfo.loadFromSupportedOptions(options); + return new ProtocolFeatureStore(lwtInfo, shardingInfo, tabletInfo); } public void populateStartupOptions(@NonNull Map options) { if (lwtInfo != null) { lwtInfo.populateStartupOptions(options); } + if (tabletInfo != null && tabletInfo.isEnabled()) { + TabletInfo.populateStartupOptions(options); + } } public static ProtocolFeatureStore loadFromChannel(@NonNull Channel channel) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java index 8c33e803fd5..09ca42c9295 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/TabletInfo.java @@ -19,7 +19,7 @@ public boolean isEnabled() { return enabled; } - public static TabletInfo parseTabletInfo(Map> supported) { + public static TabletInfo loadFromSupportedOptions(Map> supported) { List values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY); return new TabletInfo( values != null @@ -27,7 +27,7 @@ public static TabletInfo parseTabletInfo(Map> supported) { && values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE)); } - public static void addOption(Map options) { + public static void populateStartupOptions(Map options) { options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE); } } From 245c2c5665ac0e00e7a29abcfdff973c115ce939 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Tue, 16 Sep 2025 14:34:02 -0400 Subject: [PATCH 4/4] Add ProtocolFeatureStore cache in DriverChannel No need to pull this information all the time from the channel. Let's cache it when it is populated to the channel and use it. --- .../internal/core/channel/DriverChannel.java | 24 ++++++++++++++++--- .../core/protocol/ProtocolFeatureStore.java | 2 ++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java index f3adb9a4907..f0e0176c144 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java @@ -76,6 +76,7 @@ public class DriverChannel { private final ProtocolVersion protocolVersion; private final AtomicBoolean closing = new AtomicBoolean(); private final AtomicBoolean forceClosing = new AtomicBoolean(); + private ProtocolFeatureStore featureStore; DriverChannel( EndPoint endPoint, @@ -146,18 +147,35 @@ public Map> getOptions() { return channel.attr(OPTIONS_KEY).get(); } + public ProtocolFeatureStore getSupportedFeatures() { + if (featureStore != null) { + return featureStore; + } + + ProtocolFeatureStore fromChannel = ProtocolFeatureStore.loadFromChannel(channel); + if (fromChannel == null) { + return ProtocolFeatureStore.Empty; + } + // Features can't be renegotiated. + // Once features is populated into channel it is enough to update cache and no need to + // invalidate it further. + + featureStore = fromChannel; + return featureStore; + } + public int getShardId() { - ConnectionShardingInfo info = ProtocolFeatureStore.loadFromChannel(channel).getShardingInfo(); + ConnectionShardingInfo info = getSupportedFeatures().getShardingInfo(); return info != null ? info.shardId : 0; } public ShardingInfo getShardingInfo() { - ConnectionShardingInfo info = ProtocolFeatureStore.loadFromChannel(channel).getShardingInfo(); + ConnectionShardingInfo info = getSupportedFeatures().getShardingInfo(); return info != null ? info.shardingInfo : null; } public LwtInfo getLwtInfo() { - return ProtocolFeatureStore.loadFromChannel(channel).getLwtFeatureInfo(); + return getSupportedFeatures().getLwtFeatureInfo(); } /** diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java index d63f80774a8..1d026d10dd0 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/protocol/ProtocolFeatureStore.java @@ -14,6 +14,8 @@ public class ProtocolFeatureStore { private final ShardingInfo.ConnectionShardingInfo shardingInfo; private final TabletInfo tabletInfo; + public static final ProtocolFeatureStore Empty = new ProtocolFeatureStore(null, null, null); + ProtocolFeatureStore( LwtInfo lwtInfo, ShardingInfo.ConnectionShardingInfo shardingInfo, TabletInfo tabletInfo) { this.lwtInfo = lwtInfo;