Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions openfeature-provider/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
* <li>Production customization: custom TLS settings, proxies, connection pooling
* <li>Debugging: add custom logging or tracing interceptors
* </ul>
*
* <p><strong>Channel Lifecycle:</strong> Channels created by this factory must be shut down by the
* caller when they are no longer needed. The factory does not manage channel lifecycles.
*/
@FunctionalInterface
public interface ChannelFactory {
/**
* Creates a gRPC channel with the given target and interceptors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
* </ul>
*/
public class DefaultChannelFactory implements ChannelFactory {

@Override
public ManagedChannel create(String target, List<ClientInterceptor> defaultInterceptors) {
final boolean useGrpcPlaintext =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.spotify.confidence;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of HttpClientFactory that creates standard HTTP connections.
*
* <p>This factory:
*
* <ul>
* <li>Creates HttpURLConnection instances for the given URLs
* <li>Uses default timeouts and settings from the JVM
* <li>Can be extended or replaced for testing or custom behavior
* </ul>
*/
public class DefaultHttpClientFactory implements HttpClientFactory {
private static final Logger logger = LoggerFactory.getLogger(DefaultHttpClientFactory.class);

@Override
public HttpURLConnection create(String url) throws IOException {
return (HttpURLConnection) new URL(url).openConnection();
}

@Override
public void shutdown() {
// HTTP connections are stateless and don't require cleanup
logger.debug("DefaultHttpClientFactory shutdown called (no-op for stateless HTTP)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand All @@ -26,15 +25,17 @@ class FlagsAdminStateFetcher implements AccountStateProvider {
"https://confidence-resolver-state-cdn.spotifycdn.com/";

private final String clientSecret;
private final HttpClientFactory httpClientFactory;
// ETag for conditional GETs of resolver state
private final AtomicReference<String> etagHolder = new AtomicReference<>();
private final AtomicReference<byte[]> rawResolverStateHolder =
new AtomicReference<>(
com.spotify.confidence.flags.admin.v1.ResolverState.newBuilder().build().toByteArray());
private String accountId = "";

public FlagsAdminStateFetcher(String clientSecret) {
public FlagsAdminStateFetcher(String clientSecret, HttpClientFactory httpClientFactory) {
this.clientSecret = clientSecret;
this.httpClientFactory = httpClientFactory;
}

public AtomicReference<byte[]> rawStateHolder() {
Expand Down Expand Up @@ -64,7 +65,7 @@ private void fetchAndUpdateStateIfChanged() {
// Build CDN URL using SHA256 hash of client secret
final var cdnUrl = CDN_BASE_URL + sha256Hex(clientSecret);
try {
final HttpURLConnection conn = (HttpURLConnection) new URL(cdnUrl).openConnection();
final HttpURLConnection conn = httpClientFactory.create(cdnUrl);
final String previousEtag = etagHolder.get();
if (previousEtag != null) {
conn.setRequestProperty("if-none-match", previousEtag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.spotify.confidence.flags.resolver.v1.InternalFlagLoggerServiceGrpc;
import com.spotify.confidence.flags.resolver.v1.WriteFlagLogsRequest;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,54 +24,39 @@ public class GrpcWasmFlagLogger implements WasmFlagLogger {
private static final Logger logger = LoggerFactory.getLogger(GrpcWasmFlagLogger.class);
// Max number of flag_assigned entries per chunk to avoid exceeding gRPC max message size
private static final int MAX_FLAG_ASSIGNED_PER_CHUNK = 1000;
private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
private final InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub stub;
private final String clientSecret;
private final ExecutorService executorService;
private final FlagLogWriter writer;
private final Duration shutdownTimeout;
private ManagedChannel channel;

@VisibleForTesting
public GrpcWasmFlagLogger(String clientSecret, FlagLogWriter writer) {
this.stub = createStub(new DefaultChannelFactory());
this.clientSecret = clientSecret;
this.stub = createAuthStub(new DefaultChannelFactory(), clientSecret);
this.executorService = Executors.newCachedThreadPool();
this.writer = writer;
this.shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
}

@VisibleForTesting
public GrpcWasmFlagLogger(String clientSecret, FlagLogWriter writer, Duration shutdownTimeout) {
this.stub = createAuthStub(new DefaultChannelFactory(), clientSecret);
this.executorService = Executors.newCachedThreadPool();
this.writer = writer;
this.shutdownTimeout = shutdownTimeout;
}

public GrpcWasmFlagLogger(String clientSecret, ChannelFactory channelFactory) {
this.stub = createStub(channelFactory);
this.clientSecret = clientSecret;
this.stub = createAuthStub(channelFactory, clientSecret);
this.executorService = Executors.newCachedThreadPool();
this.shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
this.writer =
request ->
executorService.submit(
() -> {
try {
// Create a stub with authorization header interceptor
InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub
stubWithAuth =
stub.withInterceptors(
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<
ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(
Listener<RespT> responseListener, Metadata headers) {
Metadata.Key<String> authKey =
Metadata.Key.of(
"authorization", Metadata.ASCII_STRING_MARSHALLER);
headers.put(authKey, "ClientSecret " + clientSecret);
super.start(responseListener, headers);
}
};
}
});

stubWithAuth.clientWriteFlagLogs(request);
stub.clientWriteFlagLogs(request);
logger.debug(
"Successfully sent flag log with {} entries",
request.getFlagAssignedCount());
Expand All @@ -85,10 +66,10 @@ public void start(
});
}

private static InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub createStub(
ChannelFactory channelFactory) {
final var channel = createConfidenceChannel(channelFactory);
return InternalFlagLoggerServiceGrpc.newBlockingStub(channel);
private InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub createAuthStub(
ChannelFactory channelFactory, String clientSecret) {
this.channel = createConfidenceChannel(channelFactory);
return addAuthInterceptor(InternalFlagLoggerServiceGrpc.newBlockingStub(channel), clientSecret);
}

@Override
Expand Down Expand Up @@ -150,12 +131,107 @@ private void sendAsync(WriteFlagLogsRequest request) {
writer.write(request);
}

@Override
public void writeSync(WriteFlagLogsRequest request) {
if (request.getClientResolveInfoList().isEmpty()
&& request.getFlagAssignedList().isEmpty()
&& request.getFlagResolveInfoList().isEmpty()) {
logger.debug("Skipping empty flag log request");
return;
}

final int flagAssignedCount = request.getFlagAssignedCount();

// If flag_assigned list is small enough, send everything as-is
if (flagAssignedCount <= MAX_FLAG_ASSIGNED_PER_CHUNK) {
sendSync(request);
return;
}

// Split flag_assigned into chunks and send each chunk synchronously
logger.debug(
"Synchronously splitting {} flag_assigned entries into chunks of {}",
flagAssignedCount,
MAX_FLAG_ASSIGNED_PER_CHUNK);

final List<WriteFlagLogsRequest> chunks = createFlagAssignedChunks(request);
for (WriteFlagLogsRequest chunk : chunks) {
sendSync(chunk);
}
}

private void sendSync(WriteFlagLogsRequest request) {
try {
stub.clientWriteFlagLogs(request);
logger.debug("Synchronously sent flag log with {} entries", request.getFlagAssignedCount());
} catch (Exception e) {
logger.error("Failed to write flag logs synchronously", e);
}
}

/**
* Shutdown the executor service. This will allow any pending async writes to complete. Call this
* when the application is shutting down.
* Shutdown the executor service and wait for pending async writes to complete. This method will
* block for up to the configured shutdown timeout (default 10 seconds) waiting for pending log
* writes to complete. Call this when the application is shutting down.
*/
@Override
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
logger.warn(
"Flag logger executor did not terminate within {} seconds, some logs may be lost",
shutdownTimeout.getSeconds());
executorService.shutdownNow();
} else {
logger.debug("Flag logger executor terminated gracefully");
}
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for flag logger shutdown", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}

if (channel != null) {
channel.shutdown();
try {
if (!channel.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
logger.warn(
"Channel did not terminate within {} seconds, forcing shutdown",
shutdownTimeout.getSeconds());
channel.shutdownNow();
} else {
logger.debug("Channel terminated gracefully");
}
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for channel shutdown", e);
channel.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

private static InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub
addAuthInterceptor(
InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub stub,
String clientSecret) {
// Create a stub with authorization header interceptor
return stub.withInterceptors(
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Metadata.Key<String> authKey =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
headers.put(authKey, "ClientSecret " + clientSecret);
super.start(responseListener, headers);
}
};
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.spotify.confidence;

import java.io.IOException;
import java.net.HttpURLConnection;

/**
* HttpClientFactory is an advanced/testing hook allowing callers to customize how HTTP connections
* are created. The provider will pass the URL that needs to be fetched.
*
* <p>Implementations may modify request properties, change URLs, or replace the connection creation
* mechanism entirely. This is particularly useful for:
*
* <ul>
* <li>Unit testing: inject mock HTTP responses
* <li>Integration testing: point to local mock HTTP servers
* <li>Production customization: custom timeouts, proxies, headers
* <li>Debugging: add custom logging or request tracking
* </ul>
*
* <p><strong>Lifecycle:</strong> The factory is responsible for managing any resources it creates.
* When {@link #shutdown()} is called, it should clean up any resources that were allocated.
*/
public interface HttpClientFactory {
/**
* Creates an HTTP connection for the given URL.
*
* @param url the URL to connect to (e.g.,
* "https://confidence-resolver-state-cdn.spotifycdn.com/...")
* @return a configured HttpURLConnection
* @throws IOException if an I/O error occurs while opening the connection
*/
HttpURLConnection create(String url) throws IOException;

/**
* Shuts down this factory and cleans up any resources. This method should be called when the
* provider is shutting down to ensure proper resource cleanup.
*
* <p>Implementations should clean up any resources that were created and wait for them to
* terminate gracefully if applicable.
*/
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@

public class LocalProviderConfig {
private final ChannelFactory channelFactory;
private final HttpClientFactory httpClientFactory;

public LocalProviderConfig() {
this(null);
this(null, null);
}

public LocalProviderConfig(ChannelFactory channelFactory) {
this(channelFactory, null);
}

public LocalProviderConfig(ChannelFactory channelFactory, HttpClientFactory httpClientFactory) {
this.channelFactory = channelFactory != null ? channelFactory : new DefaultChannelFactory();
this.httpClientFactory =
httpClientFactory != null ? httpClientFactory : new DefaultHttpClientFactory();
}

public ChannelFactory getChannelFactory() {
return channelFactory;
}

public HttpClientFactory getHttpClientFactory() {
return httpClientFactory;
}
}
Loading
Loading