Skip to content

Commit 27870c2

Browse files
feat: add registry-level client interceptor support (#42)
* feat: add registry-level client interceptor support * refactor: add in process support * style: spotless
1 parent 04adafb commit 27870c2

File tree

5 files changed

+122
-2
lines changed

5 files changed

+122
-2
lines changed

grpc-client-utils/src/main/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistry.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Map;
99
import java.util.Objects;
1010
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.ConcurrentMap;
1112
import java.util.concurrent.TimeUnit;
1213
import java.util.function.Supplier;
1314
import org.slf4j.Logger;
@@ -16,16 +17,27 @@
1617
public class GrpcChannelRegistry {
1718
private static final Logger LOG = LoggerFactory.getLogger(GrpcChannelRegistry.class);
1819
private final Map<String, ManagedChannel> channelMap;
20+
private final GrpcRegistryConfig registryConfig;
1921
private volatile boolean isShutdown = false;
2022

2123
public GrpcChannelRegistry(GrpcChannelRegistry sourceRegistry) {
2224
// Copy constructor
25+
this(sourceRegistry.registryConfig, new ConcurrentHashMap<>(sourceRegistry.channelMap));
2326
this.isShutdown = sourceRegistry.isShutdown();
24-
this.channelMap = new ConcurrentHashMap<>(sourceRegistry.channelMap);
2527
}
2628

2729
public GrpcChannelRegistry() {
28-
this.channelMap = new ConcurrentHashMap<>();
30+
this(GrpcRegistryConfig.builder().build());
31+
}
32+
33+
public GrpcChannelRegistry(GrpcRegistryConfig registryConfig) {
34+
this(registryConfig, new ConcurrentHashMap<>());
35+
}
36+
37+
private GrpcChannelRegistry(
38+
GrpcRegistryConfig registryConfig, ConcurrentMap<String, ManagedChannel> channelMap) {
39+
this.registryConfig = registryConfig;
40+
this.channelMap = channelMap;
2941
}
3042

3143
/**
@@ -78,6 +90,7 @@ protected ManagedChannel configureAndBuildChannel(
7890
if (config.getMaxInboundMessageSize() != null) {
7991
builder.maxInboundMessageSize(config.getMaxInboundMessageSize());
8092
}
93+
this.registryConfig.getDefaultInterceptors().forEach(builder::intercept);
8194
return builder.intercept(config.getClientInterceptors()).build();
8295
}
8396

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.hypertrace.core.grpcutils.client;
2+
3+
import io.grpc.ClientInterceptor;
4+
import java.util.List;
5+
import lombok.Builder;
6+
import lombok.Singular;
7+
import lombok.Value;
8+
9+
@Value
10+
@Builder(toBuilder = true)
11+
public class GrpcRegistryConfig {
12+
13+
@Singular List<ClientInterceptor> defaultInterceptors;
14+
}

grpc-client-utils/src/main/java/org/hypertrace/core/grpcutils/client/InProcessGrpcChannelRegistry.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ public InProcessGrpcChannelRegistry() {
1616
}
1717

1818
public InProcessGrpcChannelRegistry(Map<String, String> authorityToInProcessNamedOverride) {
19+
this(authorityToInProcessNamedOverride, GrpcRegistryConfig.builder().build());
20+
}
21+
22+
public InProcessGrpcChannelRegistry(GrpcRegistryConfig registryConfig) {
23+
this(Collections.emptyMap(), registryConfig);
24+
}
25+
26+
public InProcessGrpcChannelRegistry(
27+
Map<String, String> authorityToInProcessNamedOverride, GrpcRegistryConfig registryConfig) {
28+
super(registryConfig);
1929
this.authorityToInProcessNamedOverride = authorityToInProcessNamedOverride;
2030
}
2131

grpc-client-utils/src/test/java/org/hypertrace/core/grpcutils/client/GrpcChannelRegistryTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55
import static org.junit.jupiter.api.Assertions.assertSame;
66
import static org.junit.jupiter.api.Assertions.assertThrows;
77
import static org.mockito.Answers.RETURNS_SELF;
8+
import static org.mockito.ArgumentMatchers.any;
89
import static org.mockito.ArgumentMatchers.anyInt;
10+
import static org.mockito.ArgumentMatchers.anyList;
911
import static org.mockito.ArgumentMatchers.anyString;
1012
import static org.mockito.Mockito.inOrder;
1113
import static org.mockito.Mockito.mock;
14+
import static org.mockito.Mockito.times;
15+
import static org.mockito.Mockito.verify;
1216
import static org.mockito.Mockito.verifyNoInteractions;
1317
import static org.mockito.Mockito.when;
1418

1519
import io.grpc.Channel;
20+
import io.grpc.ClientInterceptor;
1621
import io.grpc.Deadline;
1722
import io.grpc.Deadline.Ticker;
1823
import io.grpc.ManagedChannel;
@@ -139,4 +144,36 @@ void copyConstructorReusesExistingChannels() {
139144

140145
assertSame(firstChannel, new GrpcChannelRegistry(firstRegistry).forSecureAddress("foo", 1000));
141146
}
147+
148+
@Test
149+
void registersRegistryInterceptors() {
150+
try (MockedStatic<ManagedChannelBuilder> mockedBuilderStatic =
151+
Mockito.mockStatic(ManagedChannelBuilder.class)) {
152+
ManagedChannelBuilder<?> mockBuilder = mock(ManagedChannelBuilder.class);
153+
ManagedChannel mockChannel = mock(ManagedChannel.class);
154+
155+
mockedBuilderStatic
156+
.when(() -> ManagedChannelBuilder.forAddress("foo", 1000))
157+
.thenAnswer(
158+
invocation -> {
159+
when(mockBuilder.intercept(anyList())).then(RETURNS_SELF);
160+
when(mockBuilder.intercept(any(ClientInterceptor.class))).then(RETURNS_SELF);
161+
when(mockBuilder.build()).thenReturn(mockChannel);
162+
return mockBuilder;
163+
});
164+
165+
ClientInterceptor mockInterceptor1 = mock(ClientInterceptor.class);
166+
ClientInterceptor mockInterceptor2 = mock(ClientInterceptor.class);
167+
this.channelRegistry =
168+
new GrpcChannelRegistry(
169+
GrpcRegistryConfig.builder()
170+
.defaultInterceptor(mockInterceptor1)
171+
.defaultInterceptor(mockInterceptor2)
172+
.build());
173+
174+
assertSame(mockChannel, this.channelRegistry.forPlaintextAddress("foo", 1000));
175+
verify(mockBuilder, times(1)).intercept(mockInterceptor1);
176+
verify(mockBuilder, times(1)).intercept(mockInterceptor2);
177+
}
178+
}
142179
}

grpc-client-utils/src/test/java/org/hypertrace/core/grpcutils/client/InProcessGrpcChannelRegistryTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,23 @@
33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertNotSame;
55
import static org.junit.jupiter.api.Assertions.assertSame;
6+
import static org.mockito.Answers.RETURNS_SELF;
7+
import static org.mockito.ArgumentMatchers.any;
8+
import static org.mockito.ArgumentMatchers.anyList;
9+
import static org.mockito.Mockito.mock;
10+
import static org.mockito.Mockito.times;
11+
import static org.mockito.Mockito.verify;
12+
import static org.mockito.Mockito.when;
613

714
import io.grpc.Channel;
15+
import io.grpc.ClientInterceptor;
16+
import io.grpc.ManagedChannel;
17+
import io.grpc.inprocess.InProcessChannelBuilder;
818
import java.util.Map;
919
import org.junit.jupiter.api.BeforeEach;
1020
import org.junit.jupiter.api.Test;
21+
import org.mockito.MockedStatic;
22+
import org.mockito.Mockito;
1123

1224
class InProcessGrpcChannelRegistryTest {
1325
InProcessGrpcChannelRegistry channelRegistry;
@@ -74,4 +86,38 @@ void copyConstructorReusesExistingChannelsAndOverrides() {
7486
firstChannel,
7587
new InProcessGrpcChannelRegistry(firstRegistry).forSecureAddress("foo", 1000));
7688
}
89+
90+
@Test
91+
void registersRegistryInterceptors() {
92+
try (MockedStatic<InProcessChannelBuilder> mockedBuilderStatic =
93+
Mockito.mockStatic(InProcessChannelBuilder.class)) {
94+
InProcessChannelBuilder mockBuilder = mock(InProcessChannelBuilder.class);
95+
ManagedChannel mockChannel = mock(ManagedChannel.class);
96+
97+
mockedBuilderStatic
98+
.when(() -> InProcessChannelBuilder.forName("test"))
99+
.thenAnswer(
100+
invocation -> {
101+
when(mockBuilder.intercept(anyList())).then(RETURNS_SELF);
102+
when(mockBuilder.intercept(any(ClientInterceptor.class))).then(RETURNS_SELF);
103+
when(mockBuilder.build()).thenReturn(mockChannel);
104+
return mockBuilder;
105+
});
106+
107+
ClientInterceptor mockInterceptor1 = mock(ClientInterceptor.class);
108+
ClientInterceptor mockInterceptor2 = mock(ClientInterceptor.class);
109+
this.channelRegistry =
110+
new InProcessGrpcChannelRegistry(
111+
Map.of("host:1000", "test"),
112+
GrpcRegistryConfig.builder()
113+
.defaultInterceptor(mockInterceptor1)
114+
.defaultInterceptor(mockInterceptor2)
115+
.build());
116+
117+
assertSame(mockChannel, this.channelRegistry.forName("test"));
118+
assertSame(mockChannel, this.channelRegistry.forPlaintextAddress("host", 1000));
119+
verify(mockBuilder, times(1)).intercept(mockInterceptor1);
120+
verify(mockBuilder, times(1)).intercept(mockInterceptor2);
121+
}
122+
}
77123
}

0 commit comments

Comments
 (0)