55import com .google .common .annotations .VisibleForTesting ;
66import com .spotify .confidence .flags .resolver .v1 .InternalFlagLoggerServiceGrpc ;
77import com .spotify .confidence .flags .resolver .v1 .WriteFlagLogsRequest ;
8- import io .grpc .CallOptions ;
9- import io .grpc .Channel ;
10- import io .grpc .ClientCall ;
11- import io .grpc .ClientInterceptor ;
12- import io .grpc .ForwardingClientCall ;
13- import io .grpc .Metadata ;
14- import io .grpc .MethodDescriptor ;
8+ import io .grpc .*;
9+ import java .time .Duration ;
1510import java .util .ArrayList ;
1611import java .util .List ;
1712import java .util .concurrent .ExecutorService ;
1813import java .util .concurrent .Executors ;
14+ import java .util .concurrent .TimeUnit ;
1915import org .slf4j .Logger ;
2016import org .slf4j .LoggerFactory ;
2117
@@ -28,54 +24,39 @@ public class GrpcWasmFlagLogger implements WasmFlagLogger {
2824 private static final Logger logger = LoggerFactory .getLogger (GrpcWasmFlagLogger .class );
2925 // Max number of flag_assigned entries per chunk to avoid exceeding gRPC max message size
3026 private static final int MAX_FLAG_ASSIGNED_PER_CHUNK = 1000 ;
27+ private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration .ofSeconds (10 );
3128 private final InternalFlagLoggerServiceGrpc .InternalFlagLoggerServiceBlockingStub stub ;
32- private final String clientSecret ;
3329 private final ExecutorService executorService ;
3430 private final FlagLogWriter writer ;
31+ private final Duration shutdownTimeout ;
32+ private ManagedChannel channel ;
3533
3634 @ VisibleForTesting
3735 public GrpcWasmFlagLogger (String clientSecret , FlagLogWriter writer ) {
38- this .stub = createStub (new DefaultChannelFactory ());
39- this .clientSecret = clientSecret ;
36+ this .stub = createAuthStub (new DefaultChannelFactory (), clientSecret );
4037 this .executorService = Executors .newCachedThreadPool ();
4138 this .writer = writer ;
39+ this .shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT ;
40+ }
41+
42+ @ VisibleForTesting
43+ public GrpcWasmFlagLogger (String clientSecret , FlagLogWriter writer , Duration shutdownTimeout ) {
44+ this .stub = createAuthStub (new DefaultChannelFactory (), clientSecret );
45+ this .executorService = Executors .newCachedThreadPool ();
46+ this .writer = writer ;
47+ this .shutdownTimeout = shutdownTimeout ;
4248 }
4349
4450 public GrpcWasmFlagLogger (String clientSecret , ChannelFactory channelFactory ) {
45- this .stub = createStub (channelFactory );
46- this .clientSecret = clientSecret ;
51+ this .stub = createAuthStub (channelFactory , clientSecret );
4752 this .executorService = Executors .newCachedThreadPool ();
53+ this .shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT ;
4854 this .writer =
4955 request ->
5056 executorService .submit (
5157 () -> {
5258 try {
53- // Create a stub with authorization header interceptor
54- InternalFlagLoggerServiceGrpc .InternalFlagLoggerServiceBlockingStub
55- stubWithAuth =
56- stub .withInterceptors (
57- new ClientInterceptor () {
58- @ Override
59- public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
60- MethodDescriptor <ReqT , RespT > method ,
61- CallOptions callOptions ,
62- Channel next ) {
63- return new ForwardingClientCall .SimpleForwardingClientCall <
64- ReqT , RespT >(next .newCall (method , callOptions )) {
65- @ Override
66- public void start (
67- Listener <RespT > responseListener , Metadata headers ) {
68- Metadata .Key <String > authKey =
69- Metadata .Key .of (
70- "authorization" , Metadata .ASCII_STRING_MARSHALLER );
71- headers .put (authKey , "ClientSecret " + clientSecret );
72- super .start (responseListener , headers );
73- }
74- };
75- }
76- });
77-
78- stubWithAuth .clientWriteFlagLogs (request );
59+ stub .clientWriteFlagLogs (request );
7960 logger .debug (
8061 "Successfully sent flag log with {} entries" ,
8162 request .getFlagAssignedCount ());
@@ -85,10 +66,10 @@ public void start(
8566 });
8667 }
8768
88- private static InternalFlagLoggerServiceGrpc .InternalFlagLoggerServiceBlockingStub createStub (
89- ChannelFactory channelFactory ) {
90- final var channel = createConfidenceChannel (channelFactory );
91- return InternalFlagLoggerServiceGrpc .newBlockingStub (channel );
69+ private InternalFlagLoggerServiceGrpc .InternalFlagLoggerServiceBlockingStub createAuthStub (
70+ ChannelFactory channelFactory , String clientSecret ) {
71+ this . channel = createConfidenceChannel (channelFactory );
72+ return addAuthInterceptor ( InternalFlagLoggerServiceGrpc .newBlockingStub (channel ), clientSecret );
9273 }
9374
9475 @ Override
@@ -150,12 +131,107 @@ private void sendAsync(WriteFlagLogsRequest request) {
150131 writer .write (request );
151132 }
152133
134+ @ Override
135+ public void writeSync (WriteFlagLogsRequest request ) {
136+ if (request .getClientResolveInfoList ().isEmpty ()
137+ && request .getFlagAssignedList ().isEmpty ()
138+ && request .getFlagResolveInfoList ().isEmpty ()) {
139+ logger .debug ("Skipping empty flag log request" );
140+ return ;
141+ }
142+
143+ final int flagAssignedCount = request .getFlagAssignedCount ();
144+
145+ // If flag_assigned list is small enough, send everything as-is
146+ if (flagAssignedCount <= MAX_FLAG_ASSIGNED_PER_CHUNK ) {
147+ sendSync (request );
148+ return ;
149+ }
150+
151+ // Split flag_assigned into chunks and send each chunk synchronously
152+ logger .debug (
153+ "Synchronously splitting {} flag_assigned entries into chunks of {}" ,
154+ flagAssignedCount ,
155+ MAX_FLAG_ASSIGNED_PER_CHUNK );
156+
157+ final List <WriteFlagLogsRequest > chunks = createFlagAssignedChunks (request );
158+ for (WriteFlagLogsRequest chunk : chunks ) {
159+ sendSync (chunk );
160+ }
161+ }
162+
163+ private void sendSync (WriteFlagLogsRequest request ) {
164+ try {
165+ stub .clientWriteFlagLogs (request );
166+ logger .debug ("Synchronously sent flag log with {} entries" , request .getFlagAssignedCount ());
167+ } catch (Exception e ) {
168+ logger .error ("Failed to write flag logs synchronously" , e );
169+ }
170+ }
171+
153172 /**
154- * Shutdown the executor service. This will allow any pending async writes to complete. Call this
155- * when the application is shutting down.
173+ * Shutdown the executor service and wait for pending async writes to complete. This method will
174+ * block for up to the configured shutdown timeout (default 10 seconds) waiting for pending log
175+ * writes to complete. Call this when the application is shutting down.
156176 */
157177 @ Override
158178 public void shutdown () {
159179 executorService .shutdown ();
180+ try {
181+ if (!executorService .awaitTermination (shutdownTimeout .toMillis (), TimeUnit .MILLISECONDS )) {
182+ logger .warn (
183+ "Flag logger executor did not terminate within {} seconds, some logs may be lost" ,
184+ shutdownTimeout .getSeconds ());
185+ executorService .shutdownNow ();
186+ } else {
187+ logger .debug ("Flag logger executor terminated gracefully" );
188+ }
189+ } catch (InterruptedException e ) {
190+ logger .warn ("Interrupted while waiting for flag logger shutdown" , e );
191+ executorService .shutdownNow ();
192+ Thread .currentThread ().interrupt ();
193+ }
194+
195+ if (channel != null ) {
196+ channel .shutdown ();
197+ try {
198+ if (!channel .awaitTermination (shutdownTimeout .toMillis (), TimeUnit .MILLISECONDS )) {
199+ logger .warn (
200+ "Channel did not terminate within {} seconds, forcing shutdown" ,
201+ shutdownTimeout .getSeconds ());
202+ channel .shutdownNow ();
203+ } else {
204+ logger .debug ("Channel terminated gracefully" );
205+ }
206+ } catch (InterruptedException e ) {
207+ logger .warn ("Interrupted while waiting for channel shutdown" , e );
208+ channel .shutdownNow ();
209+ Thread .currentThread ().interrupt ();
210+ }
211+ }
212+ }
213+
214+ private static InternalFlagLoggerServiceGrpc .InternalFlagLoggerServiceBlockingStub
215+ addAuthInterceptor (
216+ InternalFlagLoggerServiceGrpc .InternalFlagLoggerServiceBlockingStub stub ,
217+ String clientSecret ) {
218+ // Create a stub with authorization header interceptor
219+ return stub .withInterceptors (
220+ new ClientInterceptor () {
221+ @ Override
222+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
223+ MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
224+ return new ForwardingClientCall .SimpleForwardingClientCall <ReqT , RespT >(
225+ next .newCall (method , callOptions )) {
226+ @ Override
227+ public void start (Listener <RespT > responseListener , Metadata headers ) {
228+ Metadata .Key <String > authKey =
229+ Metadata .Key .of ("authorization" , Metadata .ASCII_STRING_MARSHALLER );
230+ headers .put (authKey , "ClientSecret " + clientSecret );
231+ super .start (responseListener , headers );
232+ }
233+ };
234+ }
235+ });
160236 }
161237}
0 commit comments