Skip to content

Commit 308be13

Browse files
committedFeb 24, 2020
Connectable first version
1 parent 27a7871 commit 308be13

9 files changed

+235
-9
lines changed
 

‎factory.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,9 @@ func Empty() Observable {
185185
}
186186

187187
// FromChannel creates a cold observable from a channel.
188-
func FromChannel(next <-chan Item) Observable {
188+
func FromChannel(next <-chan Item, opts ...Option) Observable {
189189
return &ObservableImpl{
190-
iterable: newChannelIterable(next),
190+
iterable: newChannelIterable(next, opts...),
191191
}
192192
}
193193

‎factory_connectable_test.go

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package rxgo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/stretchr/testify/assert"
7+
"golang.org/x/sync/errgroup"
8+
"reflect"
9+
"sync"
10+
"testing"
11+
"time"
12+
)
13+
14+
func testConnectableSingle(t *testing.T, obs Observable) {
15+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
16+
defer cancel()
17+
eg, _ := errgroup.WithContext(ctx)
18+
19+
expected := []interface{}{1, 2, 3}
20+
21+
nbConsumers := 3
22+
wg := sync.WaitGroup{}
23+
wg.Add(nbConsumers)
24+
// Before Connect() is called we create multiple observers
25+
// We check all observers receive the same items
26+
for i := 0; i < nbConsumers; i++ {
27+
eg.Go(func() error {
28+
observer := obs.Observe(WithContext(ctx))
29+
wg.Done()
30+
got, err := collect(ctx, observer)
31+
if err != nil {
32+
return err
33+
}
34+
if !reflect.DeepEqual(got, expected) {
35+
return fmt.Errorf("expected: %v, got: %v", expected, got)
36+
}
37+
return nil
38+
})
39+
}
40+
41+
wg.Wait()
42+
obs.Connect()
43+
assert.NoError(t, eg.Wait())
44+
}
45+
46+
func testConnectableComposed(t *testing.T, obs Observable) {
47+
obs = obs.Map(func(_ context.Context, i interface{}) (interface{}, error) {
48+
return i.(int) + 1, nil
49+
}, WithPublishStrategy())
50+
51+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
52+
defer cancel()
53+
eg, _ := errgroup.WithContext(ctx)
54+
55+
expected := []interface{}{2, 3, 4}
56+
57+
nbConsumers := 3
58+
wg := sync.WaitGroup{}
59+
wg.Add(nbConsumers)
60+
// Before Connect() is called we create multiple observers
61+
// We check all observers receive the same items
62+
for i := 0; i < nbConsumers; i++ {
63+
eg.Go(func() error {
64+
observer := obs.Observe(WithContext(ctx))
65+
wg.Done()
66+
67+
got, err := collect(ctx, observer)
68+
if err != nil {
69+
return err
70+
}
71+
if !reflect.DeepEqual(got, expected) {
72+
return fmt.Errorf("expected: %v, got: %v", expected, got)
73+
}
74+
return nil
75+
})
76+
}
77+
78+
wg.Wait()
79+
obs.Connect()
80+
assert.NoError(t, eg.Wait())
81+
}
82+
83+
func testConnectableWithoutConnect(t *testing.T, obs Observable) {
84+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
85+
defer cancel()
86+
Assert(ctx, t, obs, IsEmpty())
87+
}
88+
89+
func Test_Connectable_IterableChannel_Single(t *testing.T) {
90+
ch := make(chan Item, 10)
91+
go func() {
92+
ch <- Of(1)
93+
ch <- Of(2)
94+
ch <- Of(3)
95+
close(ch)
96+
}()
97+
testConnectableSingle(t, FromChannel(ch, WithPublishStrategy()))
98+
}
99+
100+
func Test_Connectable_IterableChannel_Composed(t *testing.T) {
101+
ch := make(chan Item, 10)
102+
go func() {
103+
ch <- Of(1)
104+
ch <- Of(2)
105+
ch <- Of(3)
106+
close(ch)
107+
}()
108+
testConnectableComposed(t, FromChannel(ch, WithPublishStrategy()))
109+
}
110+
111+
func Test_Connectable_IterableChannel_WithoutConnect(t *testing.T) {
112+
ch := make(chan Item, 10)
113+
go func() {
114+
ch <- Of(1)
115+
ch <- Of(2)
116+
ch <- Of(3)
117+
close(ch)
118+
}()
119+
obs := FromChannel(ch, WithPublishStrategy(), WithBufferedChannel(10))
120+
testConnectableWithoutConnect(t, obs)
121+
}

‎factory_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,25 @@ import (
99
"github.com/stretchr/testify/assert"
1010
)
1111

12+
func collect(ctx context.Context, ch <-chan Item) ([]interface{}, error) {
13+
s := make([]interface{}, 0)
14+
for {
15+
select {
16+
case <-ctx.Done():
17+
return nil, ctx.Err()
18+
case item, ok := <-ch:
19+
if !ok {
20+
return s, nil
21+
}
22+
if item.Error() {
23+
s = append(s, item.E)
24+
} else {
25+
s = append(s, item.V)
26+
}
27+
}
28+
}
29+
}
30+
1231
func Test_Amb1(t *testing.T) {
1332
obs := Amb([]Observable{testObservable(1, 2, 3), Empty()})
1433
Assert(context.Background(), t, obs, HasItems(1, 2, 3))

‎go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ require (
66
github.com/cenkalti/backoff/v4 v4.0.0
77
github.com/emirpasic/gods v1.12.0
88
github.com/stretchr/testify v1.4.0
9+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
910
)

‎go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
1010
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
1111
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
1212
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
13+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
14+
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
1315
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
1416
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
1517
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=

‎iterable_channel.go

+58-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,66 @@
11
package rxgo
22

3+
import (
4+
"sync"
5+
)
6+
37
type channelIterable struct {
4-
next <-chan Item
8+
next <-chan Item
9+
opts []Option
10+
subscribers []chan Item
11+
mutex sync.RWMutex
12+
producerAlreadyCreated bool
513
}
614

7-
func newChannelIterable(next <-chan Item) Iterable {
8-
return &channelIterable{next: next}
15+
func newChannelIterable(next <-chan Item, opts ...Option) Iterable {
16+
return &channelIterable{
17+
next: next,
18+
subscribers: make([]chan Item, 0),
19+
opts: opts,
20+
}
921
}
1022

11-
func (i *channelIterable) Observe(_ ...Option) <-chan Item {
12-
return i.next
23+
func (i *channelIterable) Observe(opts ...Option) <-chan Item {
24+
mergedOptions := append(i.opts, opts...)
25+
option := parseOptions(mergedOptions...)
26+
27+
if !option.isConnectable() {
28+
return i.next
29+
}
30+
31+
if option.isConnectOperation() {
32+
i.connect()
33+
return nil
34+
}
35+
36+
ch := option.buildChannel()
37+
i.mutex.Lock()
38+
i.subscribers = append(i.subscribers, ch)
39+
i.mutex.Unlock()
40+
return ch
41+
}
42+
43+
func (i *channelIterable) connect() {
44+
i.mutex.Lock()
45+
if !i.producerAlreadyCreated {
46+
go i.produce()
47+
i.producerAlreadyCreated = true
48+
}
49+
i.mutex.Unlock()
50+
}
51+
52+
func (i *channelIterable) produce() {
53+
for item := range i.next {
54+
i.mutex.RLock()
55+
for _, subscriber := range i.subscribers {
56+
subscriber <- item
57+
}
58+
i.mutex.RUnlock()
59+
}
60+
61+
i.mutex.RLock()
62+
for _, subscriber := range i.subscribers {
63+
close(subscriber)
64+
}
65+
i.mutex.RUnlock()
1366
}

‎observable.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
// Observable is the standard interface for Observables.
1111
type Observable interface {
1212
Iterable
13+
Connect() Observable
1314
All(predicate Predicate, opts ...Option) Single
1415
AverageFloat32(opts ...Option) Single
1516
AverageFloat64(opts ...Option) Single
@@ -117,7 +118,7 @@ func observable(iterable Iterable, operatorFactory func() operator, forceSeq, by
117118
return &ObservableImpl{
118119
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
119120
mergedOptions := append(opts, propagatedOptions...)
120-
option = parseOptions(mergedOptions...)
121+
option := parseOptions(mergedOptions...)
121122

122123
next := option.buildChannel()
123124
ctx := option.buildContext()
@@ -196,10 +197,10 @@ func optionalSingle(iterable Iterable, operatorFactory func() operator, forceSeq
196197
}
197198

198199
func runSeq(ctx context.Context, next chan Item, iterable Iterable, operatorFactory func() operator, option Option, opts ...Option) {
200+
observe := iterable.Observe(opts...)
199201
go func() {
200202
op := operatorFactory()
201203
stopped := false
202-
observe := iterable.Observe(opts...)
203204
operator := operatorOptions{
204205
stop: func() {
205206
if option.getErrorStrategy() == StopOnError {

‎observable_operator.go

+5
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,11 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
541541
return customObservableOperator(f, opts...)
542542
}
543543

544+
func (o *ObservableImpl) Connect() Observable {
545+
o.Observe(connect())
546+
return o
547+
}
548+
544549
// Contains determines whether an Observable emits a particular item or not.
545550
func (o *ObservableImpl) Contains(equal Predicate, opts ...Option) Single {
546551
return single(o, func() operator {

‎options.go

+24
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type Option interface {
1515
buildContext() context.Context
1616
getBackPressureStrategy() BackpressureStrategy
1717
getErrorStrategy() OnErrorStrategy
18+
isConnectable() bool
19+
isConnectOperation() bool
1820
}
1921

2022
type funcOption struct {
@@ -27,6 +29,8 @@ type funcOption struct {
2729
backPressureStrategy BackpressureStrategy
2830
onErrorStrategy OnErrorStrategy
2931
propagate bool
32+
connectable bool
33+
connectOperation bool
3034
}
3135

3236
func (fdo *funcOption) toPropagate() bool {
@@ -63,6 +67,14 @@ func (fdo *funcOption) getErrorStrategy() OnErrorStrategy {
6367
return fdo.onErrorStrategy
6468
}
6569

70+
func (fdo *funcOption) isConnectable() bool {
71+
return fdo.connectable
72+
}
73+
74+
func (fdo *funcOption) isConnectOperation() bool {
75+
return fdo.connectOperation
76+
}
77+
6678
func (fdo *funcOption) apply(do *funcOption) {
6779
fdo.f(do)
6880
}
@@ -131,3 +143,15 @@ func WithErrorStrategy(strategy OnErrorStrategy) Option {
131143
options.onErrorStrategy = strategy
132144
})
133145
}
146+
147+
func WithPublishStrategy() Option {
148+
return newFuncOption(func(options *funcOption) {
149+
options.connectable = true
150+
})
151+
}
152+
153+
func connect() Option {
154+
return newFuncOption(func(options *funcOption) {
155+
options.connectOperation = true
156+
})
157+
}

0 commit comments

Comments
 (0)
Please sign in to comment.