diff --git a/recovery/interceptors.go b/recovery/interceptors.go index c0fb5ac85..f687e2ba8 100644 --- a/recovery/interceptors.go +++ b/recovery/interceptors.go @@ -12,13 +12,17 @@ import ( // RecoveryHandlerFunc is a function that recovers from the panic `p` by returning an `error`. type RecoveryHandlerFunc func(p interface{}) (err error) +// RecoveryHandlerFuncContext is a function that recovers from the panic `p` by returning an `error`. +// The context can be used to extract request scoped metadata and context values. +type RecoveryHandlerFuncContext func(ctx context.Context, p interface{}) (err error) + // UnaryServerInterceptor returns a new unary server interceptor for panic recovery. func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { o := evaluateOptions(opts) return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ interface{}, err error) { defer func() { if r := recover(); r != nil { - err = recoverFrom(r, o.recoveryHandlerFunc) + err = recoverFrom(ctx, r, o.recoveryHandlerFunc) } }() @@ -32,7 +36,7 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { defer func() { if r := recover(); r != nil { - err = recoverFrom(r, o.recoveryHandlerFunc) + err = recoverFrom(stream.Context(), r, o.recoveryHandlerFunc) } }() @@ -40,9 +44,9 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor { } } -func recoverFrom(p interface{}, r RecoveryHandlerFunc) error { +func recoverFrom(ctx context.Context, p interface{}, r RecoveryHandlerFuncContext) error { if r == nil { return grpc.Errorf(codes.Internal, "%s", p) } - return r(p) + return r(ctx, p) } diff --git a/recovery/options.go b/recovery/options.go index e482d7a57..07bacda3a 100644 --- a/recovery/options.go +++ b/recovery/options.go @@ -3,6 +3,8 @@ package grpc_recovery +import "golang.org/x/net/context" + var ( defaultOptions = &options{ recoveryHandlerFunc: nil, @@ -10,7 +12,7 @@ var ( ) type options struct { - recoveryHandlerFunc RecoveryHandlerFunc + recoveryHandlerFunc RecoveryHandlerFuncContext } func evaluateOptions(opts []Option) *options { @@ -26,6 +28,15 @@ type Option func(*options) // WithRecoveryHandler customizes the function for recovering from a panic. func WithRecoveryHandler(f RecoveryHandlerFunc) Option { + return func(o *options) { + o.recoveryHandlerFunc = RecoveryHandlerFuncContext(func(ctx context.Context, p interface{}) error { + return f(p) + }) + } +} + +// WithRecoveryHandlerContext customizes the function for recovering from a panic. +func WithRecoveryHandlerContext(f RecoveryHandlerFuncContext) Option { return func(o *options) { o.recoveryHandlerFunc = f } diff --git a/retry/options.go b/retry/options.go index 90aea19be..71a03caa3 100644 --- a/retry/options.go +++ b/retry/options.go @@ -6,6 +6,7 @@ package grpc_retry import ( "time" + "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) @@ -22,7 +23,9 @@ var ( perCallTimeout: 0, // disabled includeHeader: true, codes: DefaultRetriableCodes, - backoffFunc: BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10), + backoffFunc: BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration { + return BackoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10)(attempt) + }), } ) @@ -34,6 +37,14 @@ var ( // with the next iteration. type BackoffFunc func(attempt uint) time.Duration +// BackoffFuncContext denotes a family of functions that control the backoff duration between call retries. +// +// They are called with an identifier of the attempt, and should return a time the system client should +// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request +// the deadline of the request takes precedence and the wait will be interrupted before proceeding +// with the next iteration. The context can be used to extract request scoped metadata and context values. +type BackoffFuncContext func(ctx context.Context, attempt uint) time.Duration + // Disable disables the retry behaviour on this call, or this interceptor. // // Its semantically the same to `WithMax` @@ -48,8 +59,17 @@ func WithMax(maxRetries uint) CallOption { }} } -// WithBackoff sets the `BackoffFunc `used to control time between retries. +// WithBackoff sets the `BackoffFunc` used to control time between retries. func WithBackoff(bf BackoffFunc) CallOption { + return CallOption{applyFunc: func(o *options) { + o.backoffFunc = BackoffFuncContext(func(ctx context.Context, attempt uint) time.Duration { + return bf(attempt) + }) + }} +} + +// WithBackoffContext sets the `BackoffFuncContext` used to control time between retries. +func WithBackoffContext(bf BackoffFuncContext) CallOption { return CallOption{applyFunc: func(o *options) { o.backoffFunc = bf }} @@ -87,7 +107,7 @@ type options struct { perCallTimeout time.Duration includeHeader bool codes []codes.Code - backoffFunc BackoffFunc + backoffFunc BackoffFuncContext } // CallOption is a grpc.CallOption that is local to grpc_retry. diff --git a/retry/retry.go b/retry/retry.go index 42b2fe994..bf098582f 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -254,7 +254,7 @@ func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx func waitRetryBackoff(attempt uint, parentCtx context.Context, callOpts *options) error { var waitTime time.Duration = 0 if attempt > 0 { - waitTime = callOpts.backoffFunc(attempt) + waitTime = callOpts.backoffFunc(parentCtx, attempt) } if waitTime > 0 { logTrace(parentCtx, "grpc_retry attempt: %d, backoff for %v", attempt, waitTime)