requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
// The requests use the same multiplexed session.
String session = requests.get(0).getSession();
for (ExecuteSqlRequest request : requests) {
assertEquals(session, request.getSession());
}
- // The requests use all gRPC channels.
- assertEquals(
- numChannels,
- SERVER_ADDRESSES
- .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", ImmutableSet.of())
- .size());
+ // Each attempt, including retries, must use a distinct channel hint.
+ int totalRequests = mockSpanner.countRequestsOfType(ExecuteSqlRequest.class);
+ int distinctHints =
+ CHANNEL_HINTS
+ .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", new HashSet<>())
+ .size();
+ assertEquals(totalRequests, distinctHints);
}
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java
index 9fc065f944c..be292eab5aa 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java
@@ -1100,6 +1100,7 @@ public void testDefaultNumChannelsWithGrpcGcpExtensionDisabled() {
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
+ .disableGrpcGcpExtension()
.build();
assertEquals(SpannerOptions.DEFAULT_CHANNELS, options.getNumChannels());
@@ -1135,7 +1136,8 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() {
@Test
public void checkCreatedInstanceWhenGrpcGcpExtensionDisabled() {
- SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build();
+ SpannerOptions options =
+ SpannerOptions.newBuilder().setProjectId("test-project").disableGrpcGcpExtension().build();
SpannerOptions options1 = options.toBuilder().build();
assertEquals(false, options.isGrpcGcpExtensionEnabled());
assertEquals(options.isGrpcGcpExtensionEnabled(), options1.isGrpcGcpExtensionEnabled());
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java
index b68ef4667d5..c8f3162255f 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java
@@ -21,7 +21,6 @@
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_RESULTSET;
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_TABLE_NAME;
-import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -35,7 +34,6 @@
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.StructType.Field;
import com.google.spanner.v1.TypeCode;
-import io.grpc.Attributes;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
@@ -62,7 +60,8 @@
* transaction, they go via same channel. For regular session, the hint is stored per session. For
* multiplexed sessions this hint is stored per transaction.
*
- * The below tests assert this behavior for both kinds of sessions.
+ *
The below tests assert this behavior by verifying that all operations within a transaction use
+ * the same channel hint (extracted from the X-Goog-Spanner-Request-Id header).
*/
@RunWith(JUnit4.class)
public class TransactionChannelHintTest {
@@ -94,10 +93,10 @@ public class TransactionChannelHintTest {
private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static InetSocketAddress address;
- private static final Set executeSqlLocalIps = ConcurrentHashMap.newKeySet();
- private static final Set beginTransactionLocalIps =
- ConcurrentHashMap.newKeySet();
- private static final Set streamingReadLocalIps = ConcurrentHashMap.newKeySet();
+ // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method
+ private static final Set executeSqlChannelHints = ConcurrentHashMap.newKeySet();
+ private static final Set beginTransactionChannelHints = ConcurrentHashMap.newKeySet();
+ private static final Set streamingReadChannelHints = ConcurrentHashMap.newKeySet();
private static Level originalLogLevel;
@BeforeClass
@@ -113,8 +112,8 @@ public static void startServer() throws Exception {
server =
NettyServerBuilder.forAddress(address)
.addService(mockSpanner)
- // Add a server interceptor to register the remote addresses that we are seeing. This
- // indicates how many channels are used client side to communicate with the server.
+ // Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id
+ // header. This verifies that all operations in a transaction use the same channel hint.
.intercept(
new ServerInterceptor() {
@Override
@@ -122,25 +121,30 @@ public ServerCall.Listener interceptCall(
ServerCall call,
Metadata headers,
ServerCallHandler next) {
- Attributes attributes = call.getAttributes();
- @SuppressWarnings({"unchecked", "deprecation"})
- Attributes.Key key =
- (Attributes.Key)
- attributes.keys().stream()
- .filter(k -> k.equals(TRANSPORT_ATTR_REMOTE_ADDR))
- .findFirst()
- .orElse(null);
- if (key != null) {
- if (call.getMethodDescriptor()
- .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) {
- executeSqlLocalIps.add(attributes.get(key));
- }
- if (call.getMethodDescriptor().equals(SpannerGrpc.getStreamingReadMethod())) {
- streamingReadLocalIps.add(attributes.get(key));
- }
- if (call.getMethodDescriptor()
- .equals(SpannerGrpc.getBeginTransactionMethod())) {
- beginTransactionLocalIps.add(attributes.get(key));
+ // Extract channel hint from X-Goog-Spanner-Request-Id header
+ String requestId = headers.get(XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY);
+ if (requestId != null) {
+ // Format:
+ // .....
+ String[] parts = requestId.split("\\.");
+ if (parts.length >= 4) {
+ try {
+ long channelHint = Long.parseLong(parts[3]);
+ if (call.getMethodDescriptor()
+ .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) {
+ executeSqlChannelHints.add(channelHint);
+ }
+ if (call.getMethodDescriptor()
+ .equals(SpannerGrpc.getStreamingReadMethod())) {
+ streamingReadChannelHints.add(channelHint);
+ }
+ if (call.getMethodDescriptor()
+ .equals(SpannerGrpc.getBeginTransactionMethod())) {
+ beginTransactionChannelHints.add(channelHint);
+ }
+ } catch (NumberFormatException e) {
+ // Ignore parse errors
+ }
}
}
return Contexts.interceptCall(Context.current(), call, headers, next);
@@ -172,9 +176,9 @@ public static void resetLogging() {
@After
public void reset() {
mockSpanner.reset();
- executeSqlLocalIps.clear();
- streamingReadLocalIps.clear();
- beginTransactionLocalIps.clear();
+ executeSqlChannelHints.clear();
+ streamingReadChannelHints.clear();
+ beginTransactionChannelHints.clear();
}
private SpannerOptions createSpannerOptions() {
@@ -195,18 +199,18 @@ private SpannerOptions createSpannerOptions() {
}
@Test
- public void testSingleUseReadOnlyTransaction_usesSingleChannel() {
+ public void testSingleUseReadOnlyTransaction_usesSingleChannelHint() {
try (Spanner spanner = createSpannerOptions().getService()) {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
try (ResultSet resultSet = client.singleUseReadOnlyTransaction().executeQuery(SELECT1)) {
while (resultSet.next()) {}
}
}
- assertEquals(1, executeSqlLocalIps.size());
+ assertEquals(1, executeSqlChannelHints.size());
}
@Test
- public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChannel() {
+ public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChannelHint() {
try (Spanner spanner = createSpannerOptions().getService()) {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
try (ResultSet resultSet =
@@ -216,11 +220,11 @@ public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChanne
while (resultSet.next()) {}
}
}
- assertEquals(1, executeSqlLocalIps.size());
+ assertEquals(1, executeSqlChannelHints.size());
}
@Test
- public void testReadOnlyTransaction_usesSingleChannel() {
+ public void testReadOnlyTransaction_usesSingleChannelHint() {
try (Spanner spanner = createSpannerOptions().getService()) {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) {
@@ -232,13 +236,14 @@ public void testReadOnlyTransaction_usesSingleChannel() {
}
}
}
- assertEquals(1, executeSqlLocalIps.size());
- assertEquals(1, beginTransactionLocalIps.size());
- assertEquals(executeSqlLocalIps, beginTransactionLocalIps);
+ // All ExecuteSql calls within the transaction should use the same channel hint
+ assertEquals(1, executeSqlChannelHints.size());
+ // BeginTransaction should use a single channel hint
+ assertEquals(1, beginTransactionChannelHints.size());
}
@Test
- public void testReadOnlyTransaction_withTimestampBound_usesSingleChannel() {
+ public void testReadOnlyTransaction_withTimestampBound_usesSingleChannelHint() {
try (Spanner spanner = createSpannerOptions().getService()) {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
try (ReadOnlyTransaction transaction =
@@ -251,13 +256,14 @@ public void testReadOnlyTransaction_withTimestampBound_usesSingleChannel() {
}
}
}
- assertEquals(1, executeSqlLocalIps.size());
- assertEquals(1, beginTransactionLocalIps.size());
- assertEquals(executeSqlLocalIps, beginTransactionLocalIps);
+ // All ExecuteSql calls within the transaction should use the same channel hint
+ assertEquals(1, executeSqlChannelHints.size());
+ // BeginTransaction should use a single channel hint
+ assertEquals(1, beginTransactionChannelHints.size());
}
@Test
- public void testTransactionManager_usesSingleChannel() {
+ public void testTransactionManager_usesSingleChannelHint() {
try (Spanner spanner = createSpannerOptions().getService()) {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
try (TransactionManager manager = client.transactionManager()) {
@@ -282,11 +288,11 @@ public void testTransactionManager_usesSingleChannel() {
}
}
}
- assertEquals(1, executeSqlLocalIps.size());
+ assertEquals(1, executeSqlChannelHints.size());
}
@Test
- public void testTransactionRunner_usesSingleChannel() {
+ public void testTransactionRunner_usesSingleChannelHint() {
try (Spanner spanner = createSpannerOptions().getService()) {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
TransactionRunner runner = client.readWriteTransaction();
@@ -312,6 +318,6 @@ public void testTransactionRunner_usesSingleChannel() {
return null;
});
}
- assertEquals(1, streamingReadLocalIps.size());
+ assertEquals(1, streamingReadChannelHints.size());
}
}
diff --git a/pom.xml b/pom.xml
index 29e65ec3502..ce528ca5cc7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
UTF-8
github
google-cloud-spanner-parent
+ 1.8.0
@@ -103,7 +104,6 @@
google-cloud-spanner
6.104.0