Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 1 addition & 37 deletions pkg/lumera/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ const (

keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
retryDelay = 2 * time.Second
maxRetryDelay = 30 * time.Second
maxRetries = 5
backoffFactor = 2
)

// Connection defines the interface for a client connection.
Expand Down Expand Up @@ -108,7 +104,7 @@ func normaliseAddr(raw string) (hostPort string, useTLS bool, serverName string,
return net.JoinHostPort(host, port), false, host, nil
}

// createGRPCConnection creates a gRPC connection with keepalive and retry interceptor
// createGRPCConnection creates a gRPC connection with keepalive
func createGRPCConnection(ctx context.Context, hostPort string, creds credentials.TransportCredentials) (*grpc.ClientConn, error) {
_ = ctx // Keeping this for api compatibility
opts := []grpc.DialOption{
Expand All @@ -118,43 +114,11 @@ func createGRPCConnection(ctx context.Context, hostPort string, creds credential
Timeout: keepaliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithUnaryInterceptor(retryInterceptor),
}

return grpc.NewClient(hostPort, opts...)
}

// retryInterceptor retries failed calls with exponential backoff
func retryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
delay := retryDelay
var lastErr error

for attempt := 0; attempt < maxRetries; attempt++ {
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
return nil
}

lastErr = err

// Don't wait after the last attempt
if attempt < maxRetries-1 {
select {
case <-time.After(delay):
// Exponential backoff: 2s → 4s → 8s → 16s → 30s (capped)
delay *= backoffFactor
if delay > maxRetryDelay {
delay = maxRetryDelay
}
case <-ctx.Done():
return ctx.Err()
}
}
}

return lastErr // Return the last error after all retries exhausted
}

// Close closes the gRPC connection.
func (c *grpcConnection) Close() error {
if c.conn != nil {
Expand Down
72 changes: 0 additions & 72 deletions pkg/lumera/connection_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package lumera

import (
"context"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestNormaliseAddr(t *testing.T) {
Expand Down Expand Up @@ -124,72 +119,5 @@ func TestConnectionConstants(t *testing.T) {
if keepaliveTimeout >= keepaliveTime {
t.Errorf("keepaliveTimeout should be less than keepaliveTime: %v >= %v", keepaliveTimeout, keepaliveTime)
}

if retryDelay < 100*time.Millisecond {
t.Errorf("retryDelay too short: %v", retryDelay)
}

if maxRetryDelay <= retryDelay {
t.Errorf("maxRetryDelay should be greater than retryDelay: %v <= %v", maxRetryDelay, retryDelay)
}

if maxRetries < 1 {
t.Errorf("maxRetries should be at least 1: %v", maxRetries)
}

if backoffFactor < 1 {
t.Errorf("backoffFactor should be at least 1: %v", backoffFactor)
}
}

func TestRetryInterceptorSuccess(t *testing.T) {
attempts := 0

// Mock invoker that fails twice, then succeeds
mockInvoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
attempts++
if attempts < 3 {
return status.Error(codes.DeadlineExceeded, "simulated timeout")
}
return nil // Success on 3rd attempt
}

// Call retry interceptor
err := retryInterceptor(context.Background(), "/test", nil, nil, nil, mockInvoker)

// Should succeed
if err != nil {
t.Errorf("Expected success after retries, got error: %v", err)
}

if attempts != 3 {
t.Errorf("Expected 3 attempts, got %d", attempts)
}
}

func TestRetryInterceptorContextCancellation(t *testing.T) {
attempts := 0

// Mock invoker that always fails
mockInvoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
attempts++
return status.Error(codes.Unavailable, "simulated failure")
}

// Context that cancels quickly
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

// Call retry interceptor
err := retryInterceptor(ctx, "/test", nil, nil, nil, mockInvoker)

// Should return context error
if err != context.DeadlineExceeded {
t.Errorf("Expected context deadline exceeded, got: %v", err)
}

// Should have made at least one attempt
if attempts < 1 {
t.Errorf("Expected at least 1 attempt, got %d", attempts)
}
}