Skip to content

Commit

Permalink
Add context functions to retry and recover (grpc-ecosystem#172)
Browse files Browse the repository at this point in the history
* retry: add new BackoffFuncContext type

BackoffFuncContext adds the context parameter, allowing
the retry function access to the contents of the parent
context.

Fixes grpc-ecosystem#171

* recovery: add new RecoveryHandlerFuncContext type

Allows the user to configure a custom recovery handler
that has access to the request scoped values in the
context.

Fixes grpc-ecosystem#168
  • Loading branch information
johanbrandhorst authored and domgreen committed Nov 12, 2018
1 parent 498ae20 commit 3304cc8
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 9 deletions.
12 changes: 8 additions & 4 deletions recovery/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ import (
// RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`.
type RecoveryHandlerFunc func(p interface{}) (err error)

// RecoveryHandlerFuncContext is a function that recovers from the panic `p` by returning an `error`.
// The context can be used to extract request scoped metadata and context values.
type RecoveryHandlerFuncContext func(ctx context.Context, p interface{}) (err error)

// UnaryServerInterceptor returns a new unary server interceptor for panic recovery.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateOptions(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) {
defer func() {
if r := recover(); r != nil {
err = recoverFrom(r, o.recoveryHandlerFunc)
err = recoverFrom(ctx, r, o.recoveryHandlerFunc)
}
}()

Expand All @@ -32,17 +36,17 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
defer func() {
if r := recover(); r != nil {
err = recoverFrom(r, o.recoveryHandlerFunc)
err = recoverFrom(stream.Context(), r, o.recoveryHandlerFunc)
}
}()

return handler(srv, stream)
}
}

func recoverFrom(p interface{}, r RecoveryHandlerFunc) error {
func recoverFrom(ctx context.Context, p interface{}, r RecoveryHandlerFuncContext) error {
if r == nil {
return grpc.Errorf(codes.Internal, "%s", p)
}
return r(p)
return r(ctx, p)
}
13 changes: 12 additions & 1 deletion recovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

package grpc_recovery

import "golang.org/x/net/context"

var (
defaultOptions = &options{
recoveryHandlerFunc: nil,
}
)

type options struct {
recoveryHandlerFunc RecoveryHandlerFunc
recoveryHandlerFunc RecoveryHandlerFuncContext
}

func evaluateOptions(opts []Option) *options {
Expand All @@ -26,6 +28,15 @@ type Option func(*options)

// WithRecoveryHandler customizes the function for recovering from a panic.
func WithRecoveryHandler(f RecoveryHandlerFunc) Option {
return func(o *options) {
o.recoveryHandlerFunc = RecoveryHandlerFuncContext(func(ctx context.Context, p interface{}) error {
return f(p)
})
}
}

// WithRecoveryHandlerContext customizes the function for recovering from a panic.
func WithRecoveryHandlerContext(f RecoveryHandlerFuncContext) Option {
return func(o *options) {
o.recoveryHandlerFunc = f
}
Expand Down
26 changes: 23 additions & 3 deletions retry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package grpc_retry
import (
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
Expand All @@ -22,7 +23,9 @@ var (
perCallTimeout: 0, // disabled
includeHeader: true,
codes: DefaultRetriableCodes,
backoffFunc: BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt)
}),
}
)

Expand All @@ -34,6 +37,14 @@ var (
// with the next iteration.
type BackoffFunc func(attempt uint) time.Duration

// BackoffFuncContext denotes a family of functions that control the backoff duration between call retries.
//
// They are called with an identifier of the attempt, and should return a time the system client should
// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
// the deadline of the request takes precedence and the wait will be interrupted before proceeding
// with the next iteration. The context can be used to extract request scoped metadata and context values.
type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration

// Disable disables the retry behaviour on this call, or this interceptor.
//
// Its semantically the same to `WithMax`
Expand All @@ -48,8 +59,17 @@ func WithMax(maxRetries uint) CallOption {
}}
}

// WithBackoff sets the `BackoffFunc `used to control time between retries.
// WithBackoff sets the `BackoffFunc` used to control time between retries.
func WithBackoff(bf BackoffFunc) CallOption {
return CallOption{applyFunc: func(o *options) {
o.backoffFunc = BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration {
return bf(attempt)
})
}}
}

// WithBackoffContext sets the `BackoffFuncContext` used to control time between retries.
func WithBackoffContext(bf BackoffFuncContext) CallOption {
return CallOption{applyFunc: func(o *options) {
o.backoffFunc = bf
}}
Expand Down Expand Up @@ -87,7 +107,7 @@ type options struct {
perCallTimeout time.Duration
includeHeader bool
codes []codes.Code
backoffFunc BackoffFunc
backoffFunc BackoffFuncContext
}

// CallOption is a grpc.CallOption that is local to grpc_retry.
Expand Down
2 changes: 1 addition & 1 deletion retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx
func waitRetryBackoff(attempt uint, parentCtx context.Context, callOpts *options) error {
var waitTime time.Duration = 0
if attempt > 0 {
waitTime = callOpts.backoffFunc(attempt)
waitTime = callOpts.backoffFunc(parentCtx, attempt)
}
if waitTime > 0 {
logTrace(parentCtx, "grpc_retry attempt: %d, backoff for %v", attempt, waitTime)
Expand Down

0 comments on commit 3304cc8

Please sign in to comment.