Skip to content

Commit

Permalink
Added retry for connection errors on server stream call (grpc-ecosyst…
Browse files Browse the repository at this point in the history
…em#161)

* Added retry on server stream call

* Updated interceptor suite with restart server and added test cases for stream connection retry

* Fixed a typo when initilizaing the restart channel in InterceptorTestSuite

* Added a delay when stopping the interceptor suite server

* Added another test for ServerStream CallFailsOnDeadlineExceeded

* Actually validates the errors in the stream call retry-tc's
  • Loading branch information
lonnblad authored and Michal Witkowski committed Aug 30, 2018
1 parent 15ea740 commit 498ae20
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 40 deletions.
53 changes: 38 additions & 15 deletions retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,45 @@ func StreamClientInterceptor(optFuncs ...CallOption) grpc.StreamClientIntercepto
if desc.ClientStreams {
return nil, grpc.Errorf(codes.Unimplemented, "grpc_retry: cannot retry on ClientStreams, set grpc_retry.Disable()")
}
logTrace(parentCtx, "grpc_retry attempt: %d, no backoff for this call", 0)
callCtx := perCallContext(parentCtx, callOpts, 0)
newStreamer, err := streamer(callCtx, desc, cc, method, grpcOpts...)
if err != nil {
// TODO(mwitkow): Maybe dial and transport errors should be retriable?
return nil, err
}
retryingStreamer := &serverStreamingRetryingStream{
ClientStream: newStreamer,
callOpts: callOpts,
parentCtx: parentCtx,
streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
return streamer(ctx, desc, cc, method, grpcOpts...)
},

var lastErr error
for attempt := uint(0); attempt < callOpts.max; attempt++ {
if err := waitRetryBackoff(attempt, parentCtx, callOpts); err != nil {
return nil, err
}
callCtx := perCallContext(parentCtx, callOpts, 0)

var newStreamer grpc.ClientStream
newStreamer, lastErr = streamer(callCtx, desc, cc, method, grpcOpts...)
if lastErr == nil {
retryingStreamer := &serverStreamingRetryingStream{
ClientStream: newStreamer,
callOpts: callOpts,
parentCtx: parentCtx,
streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
return streamer(ctx, desc, cc, method, grpcOpts...)
},
}
return retryingStreamer, nil
}

logTrace(parentCtx, "grpc_retry attempt: %d, got err: %v", attempt, lastErr)
if isContextError(lastErr) {
if parentCtx.Err() != nil {
logTrace(parentCtx, "grpc_retry attempt: %d, parent context error: %v", attempt, parentCtx.Err())
// its the parent context deadline or cancellation.
return nil, lastErr
} else {
logTrace(parentCtx, "grpc_retry attempt: %d, context error from retry call", attempt)
// its the callCtx deadline or cancellation, in which case try again.
continue
}
}
if !isRetriable(lastErr, callOpts) {
return nil, lastErr
}
}
return retryingStreamer, nil
return nil, lastErr
}
}

Expand Down
41 changes: 38 additions & 3 deletions retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (

pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"

"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/net/context"
Expand All @@ -31,16 +32,18 @@ var (

type failingService struct {
pb_testproto.TestServiceServer
mu sync.Mutex

reqCounter uint
reqModulo uint
reqSleep time.Duration
reqError codes.Code
mu sync.Mutex
}

func (s *failingService) resetFailingConfiguration(modulo uint, errorCode codes.Code, sleepTime time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()

s.reqCounter = 0
s.reqModulo = modulo
s.reqError = errorCode
Expand Down Expand Up @@ -242,6 +245,38 @@ func (s *RetrySuite) TestServerStream_PerCallDeadline_FailsOnParent() {
require.Equal(s.T(), codes.DeadlineExceeded, grpc.Code(err), "failre code must be a gRPC error of Deadline class")
}

func (s *RetrySuite) TestServerStream_CallFailsOnOutOfRetries() {
restarted := s.RestartServer(3 * retryTimeout)
_, err := s.Client.PingList(s.SimpleCtx(), goodPing)

require.Error(s.T(), err, "establishing the connection should not succeed")
assert.Equal(s.T(), codes.Unavailable, grpc.Code(err))

<-restarted
}

func (s *RetrySuite) TestServerStream_CallFailsOnDeadlineExceeded() {
restarted := s.RestartServer(3 * retryTimeout)
ctx, _ := context.WithTimeout(context.TODO(), retryTimeout)
_, err := s.Client.PingList(ctx, goodPing)

require.Error(s.T(), err, "establishing the connection should not succeed")
assert.Equal(s.T(), codes.DeadlineExceeded, grpc.Code(err))

<-restarted
}

func (s *RetrySuite) TestServerStream_CallRetrySucceeds() {
restarted := s.RestartServer(retryTimeout)

_, err := s.Client.PingList(s.SimpleCtx(), goodPing,
grpc_retry.WithMax(40),
)

assert.NoError(s.T(), err, "establishing the connection should succeed")
<-restarted
}

func (s *RetrySuite) assertPingListWasCorrect(stream pb_testproto.TestService_PingListClient) {
count := 0
for {
Expand Down
72 changes: 51 additions & 21 deletions testing/interceptor_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,66 @@ type InterceptorTestSuite struct {
ServerOpts []grpc.ServerOption
ClientOpts []grpc.DialOption

serverAddr string
ServerListener net.Listener
Server *grpc.Server
clientConn *grpc.ClientConn
Client pb_testproto.TestServiceClient

restartServerWithDelayedStart chan time.Duration
serverRunning chan bool
}

func (s *InterceptorTestSuite) SetupSuite() {
var err error
s.ServerListener, err = net.Listen("tcp", "127.0.0.1:0")
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
if *flagTls {
creds, err := credentials.NewServerTLSFromFile(
path.Join(getTestingCertsPath(), "localhost.crt"),
path.Join(getTestingCertsPath(), "localhost.key"),
)
require.NoError(s.T(), err, "failed reading server credentials for localhost.crt")
s.ServerOpts = append(s.ServerOpts, grpc.Creds(creds))
}
// This is the point where we hook up the interceptor
s.Server = grpc.NewServer(s.ServerOpts...)
// Crete a service of the instantiator hasn't provided one.
if s.TestService == nil {
s.TestService = &TestPingService{T: s.T()}
}
pb_testproto.RegisterTestServiceServer(s.Server, s.TestService)
s.restartServerWithDelayedStart = make(chan time.Duration)
s.serverRunning = make(chan bool)

s.serverAddr = "127.0.0.1:0"

go func() {
s.Server.Serve(s.ServerListener)
for {
var err error
s.ServerListener, err = net.Listen("tcp", s.serverAddr)
s.serverAddr = s.ServerListener.Addr().String()
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
if *flagTls {
creds, err := credentials.NewServerTLSFromFile(
path.Join(getTestingCertsPath(), "localhost.crt"),
path.Join(getTestingCertsPath(), "localhost.key"),
)
require.NoError(s.T(), err, "failed reading server credentials for localhost.crt")
s.ServerOpts = append(s.ServerOpts, grpc.Creds(creds))
}
// This is the point where we hook up the interceptor
s.Server = grpc.NewServer(s.ServerOpts...)
// Crete a service of the instantiator hasn't provided one.
if s.TestService == nil {
s.TestService = &TestPingService{T: s.T()}
}
pb_testproto.RegisterTestServiceServer(s.Server, s.TestService)

go func() {
s.Server.Serve(s.ServerListener)
}()
if s.Client == nil {
s.Client = s.NewClient(s.ClientOpts...)
}

s.serverRunning <- true

d := <-s.restartServerWithDelayedStart
s.Server.Stop()
time.Sleep(d)
}
}()
s.Client = s.NewClient(s.ClientOpts...)

<-s.serverRunning
}

func (s *InterceptorTestSuite) RestartServer(delayedStart time.Duration) <-chan bool {
s.restartServerWithDelayedStart <- delayedStart
time.Sleep(10 * time.Millisecond)
return s.serverRunning
}

func (s *InterceptorTestSuite) NewClient(dialOpts ...grpc.DialOption) pb_testproto.TestServiceClient {
Expand All @@ -84,7 +114,7 @@ func (s *InterceptorTestSuite) NewClient(dialOpts ...grpc.DialOption) pb_testpro
}

func (s *InterceptorTestSuite) ServerAddr() string {
return s.ServerListener.Addr().String()
return s.serverAddr
}

func (s *InterceptorTestSuite) SimpleCtx() context.Context {
Expand Down
2 changes: 1 addition & 1 deletion tracing/opentracing/interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (jaegerFormatInjector) Inject(ctx mocktracer.MockSpanContext, carrier inter
type jaegerFormatExtractor struct{}

func (jaegerFormatExtractor) Extract(carrier interface{}) (mocktracer.MockSpanContext, error) {
rval := mocktracer.MockSpanContext{0, 0, true, nil}
rval := mocktracer.MockSpanContext{Sampled: true}
reader, ok := carrier.(opentracing.TextMapReader)
if !ok {
return rval, opentracing.ErrInvalidCarrier
Expand Down

0 comments on commit 498ae20

Please sign in to comment.