11package routing
22
33import (
4+ "errors"
45 "fmt"
56 "math"
67 "sync"
@@ -9,6 +10,13 @@ import (
910 "github.com/redis/go-redis/v9/internal/util"
1011)
1112
13+ var (
14+ ErrMaxAggregation = errors .New ("redis: no valid results to aggregate for max operation" )
15+ ErrMinAggregation = errors .New ("redis: no valid results to aggregate for min operation" )
16+ ErrAndAggregation = errors .New ("redis: no valid results to aggregate for logical AND operation" )
17+ ErrOrAggregation = errors .New ("redis: no valid results to aggregate for logical OR operation" )
18+ )
19+
1220// ResponseAggregator defines the interface for aggregating responses from multiple shards.
1321type ResponseAggregator interface {
1422 // Add processes a single shard response.
@@ -171,11 +179,6 @@ func (a *AggSumAggregator) Result() (interface{}, error) {
171179type AggMinAggregator struct {
172180 err atomic.Value
173181 res * util.AtomicMin
174-
175- mu sync.Mutex
176- min int64
177- hasResult bool
178- firstErr error
179182}
180183
181184func (a * AggMinAggregator ) Add (result interface {}, err error ) error {
@@ -207,7 +210,7 @@ func (a *AggMinAggregator) Result() (interface{}, error) {
207210
208211 val , hasVal := a .res .Min ()
209212 if ! hasVal {
210- return nil , fmt . Errorf ( "redis: no valid results to aggregate for min operation" )
213+ return nil , ErrMinAggregation
211214 }
212215 return val , nil
213216}
@@ -247,7 +250,7 @@ func (a *AggMaxAggregator) Result() (interface{}, error) {
247250
248251 val , hasVal := a .res .Max ()
249252 if ! hasVal {
250- return nil , fmt . Errorf ( "redis: no valid results to aggregate for max operation" )
253+ return nil , ErrMaxAggregation
251254 }
252255 return val , nil
253256}
@@ -293,7 +296,7 @@ func (a *AggLogicalAndAggregator) Result() (interface{}, error) {
293296 }
294297
295298 if ! a .hasResult .Load () {
296- return nil , fmt . Errorf ( "redis: no valid results to aggregate for logical AND operation" )
299+ return nil , ErrAndAggregation
297300 }
298301 return a .res .Load () != 0 , nil
299302}
@@ -339,7 +342,7 @@ func (a *AggLogicalOrAggregator) Result() (interface{}, error) {
339342 }
340343
341344 if ! a .hasResult .Load () {
342- return nil , fmt . Errorf ( "redis: no valid results to aggregate for logical OR operation" )
345+ return nil , ErrOrAggregation
343346 }
344347 return a .res .Load () != 0 , nil
345348}
@@ -533,13 +536,6 @@ func (a *SpecialAggregator) Result() (interface{}, error) {
533536 return nil , nil
534537}
535538
536- // SetAggregatorFunc allows setting custom aggregation logic for special commands.
537- func (a * SpecialAggregator ) SetAggregatorFunc (fn func ([]interface {}, []error ) (interface {}, error )) {
538- a .mu .Lock ()
539- defer a .mu .Unlock ()
540- a .aggregatorFunc = fn
541- }
542-
543539// SpecialAggregatorRegistry holds custom aggregation functions for specific commands.
544540var SpecialAggregatorRegistry = make (map [string ]func ([]interface {}, []error ) (interface {}, error ))
545541
@@ -552,7 +548,7 @@ func RegisterSpecialAggregator(cmdName string, fn func([]interface{}, []error) (
552548func NewSpecialAggregator (cmdName string ) * SpecialAggregator {
553549 agg := & SpecialAggregator {}
554550 if fn , exists := SpecialAggregatorRegistry [cmdName ]; exists {
555- agg .SetAggregatorFunc ( fn )
551+ agg .aggregatorFunc = fn
556552 }
557553 return agg
558554}
0 commit comments