Skip to content

Commit 2c86562

Browse files
authored
GroupBy dynamic distribution function type change (#267)
1 parent c016903 commit 2c86562

4 files changed

+17
-14
lines changed

doc/groupbydynamic.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@
44

55
Divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
66

7-
`GroupByDyDynamic` differs from [GroupBy](groupby.md) in the sense that it does not require to pass the set length.
7+
`GroupByDyDynamic` differs from [GroupBy](groupby.md) for two reasons:
8+
* We don't need to pass a fixed set length.
9+
* The distribution function is a `func(rxgo.Item) string` instead of a `func(rxgo.Item) int`. The rationale is because of possible collisions. For example, if our distribution function produces 128-bit UUIDs, there is a collision risk if such a UUID has to be casted into an int.
810

911
![](http://reactivex.io/documentation/operators/images/groupBy.c.png)
1012

1113
## Example
1214

1315
```go
1416
count := 3
15-
observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) int {
16-
return item.V.(int) % count
17+
observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) string {
18+
return strconv.Itoa(item.V.(int) % count)
1719
}, rxgo.WithBufferedChannel(10))
1820

1921
for i := range observable.Observe() {

observable.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Observable interface {
4646
FlatMap(apply ItemToObservable, opts ...Option) Observable
4747
ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed
4848
GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
49-
GroupByDynamic(distribution func(Item) int, opts ...Option) Observable
49+
GroupByDynamic(distribution func(Item) string, opts ...Option) Observable
5050
IgnoreElements(opts ...Option) Observable
5151
Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable
5252
Last(opts ...Option) OptionalSingle

observable_operator.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1353,15 +1353,15 @@ func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts .
13531353
type GroupedObservable struct {
13541354
Observable
13551355
// Key is the distribution key
1356-
Key int
1356+
Key string
13571357
}
13581358

13591359
// GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
1360-
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) int, opts ...Option) Observable {
1360+
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) string, opts ...Option) Observable {
13611361
option := parseOptions(opts...)
13621362
next := option.buildChannel()
13631363
ctx := option.buildContext()
1364-
chs := make(map[int]chan Item)
1364+
chs := make(map[string]chan Item)
13651365

13661366
go func() {
13671367
observe := o.Observe(opts...)

observable_operator_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"strconv"
89
"testing"
910
"time"
1011

@@ -960,11 +961,11 @@ func Test_Observable_GroupByDynamic(t *testing.T) {
960961
count := 3
961962
max := 10
962963

963-
obs := Range(0, max).GroupByDynamic(func(item Item) int {
964+
obs := Range(0, max).GroupByDynamic(func(item Item) string {
964965
if item.V == 10 {
965-
return 10
966+
return "10"
966967
}
967-
return item.V.(int) % count
968+
return strconv.Itoa(item.V.(int) % count)
968969
}, WithBufferedChannel(max))
969970
s, err := obs.ToSlice(0)
970971
if err != nil {
@@ -975,13 +976,13 @@ func Test_Observable_GroupByDynamic(t *testing.T) {
975976
}
976977

977978
Assert(ctx, t, s[0].(GroupedObservable), HasItems(0, 3, 6, 9), HasNoError())
978-
assert.Equal(t, 0, s[0].(GroupedObservable).Key)
979+
assert.Equal(t, "0", s[0].(GroupedObservable).Key)
979980
Assert(ctx, t, s[1].(GroupedObservable), HasItems(1, 4, 7), HasNoError())
980-
assert.Equal(t, 1, s[1].(GroupedObservable).Key)
981+
assert.Equal(t, "1", s[1].(GroupedObservable).Key)
981982
Assert(ctx, t, s[2].(GroupedObservable), HasItems(2, 5, 8), HasNoError())
982-
assert.Equal(t, 2, s[2].(GroupedObservable).Key)
983+
assert.Equal(t, "2", s[2].(GroupedObservable).Key)
983984
Assert(ctx, t, s[3].(GroupedObservable), HasItems(10), HasNoError())
984-
assert.Equal(t, 10, s[3].(GroupedObservable).Key)
985+
assert.Equal(t, "10", s[3].(GroupedObservable).Key)
985986
}
986987

987988
func joinTest(ctx context.Context, t *testing.T, left, right []interface{}, window Duration, expected []int64) {

0 commit comments

Comments
 (0)