Skip to content

Commit 771d704

Browse files
authored
Merge pull request #229 from ReactiveX/refine
Refine Create and Defer operators
2 parents 24db51b + 27a7871 commit 771d704

16 files changed

+208
-197
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ In this example, we create a pool of 32 goroutines that consume items concurrent
277277
* [Range](doc/range.md) — create an Observable that emits a range of sequential integers
278278
* [Repeat](doc/repeat.md) — create an Observable that emits a particular item or sequence of items repeatedly
279279
* [Start](doc/start.md) — create an Observable that emits the return value of a function
280-
* [Timer](doc/timer.md) — create an Observable that emits a single item after a given delay
280+
* [Timer](doc/timer.md) — create an Observable that completes after a specified delay
281281

282282
### Transforming Observables
283283
* [Buffer](doc/buffer.md) — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time

assert.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ loop:
233233
for _, v := range got {
234234
delete(m, v)
235235
}
236-
assert.Equal(t, 0, len(m))
236+
if len(m) != 0 {
237+
assert.Fail(t, "missing elements", "%v", got)
238+
}
237239
}
238240
if checkHasItem, value := ass.itemToBeChecked(); checkHasItem {
239241
length := len(got)

doc/create.md

+1-8
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ Create an Observable from scratch by calling observer methods programmatically.
99
## Example
1010

1111
```go
12-
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) {
12+
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
1313
next <- rxgo.Of(1)
1414
next <- rxgo.Of(2)
1515
next <- rxgo.Of(3)
16-
done()
1716
}})
1817
```
1918

@@ -25,12 +24,6 @@ Output:
2524
3
2625
```
2726

28-
There are two ways to close the Observable:
29-
* Closing the `next` channel.
30-
* Calling the `done()` function.
31-
32-
Yet, as we can pass multiple producers, using the `done()` function is the recommended approach.
33-
3427
## Options
3528

3629
### WithBufferedChannel

doc/defer.md

+1-8
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ do not create the Observable until the observer subscribes, and create a fresh O
99
## Example
1010

1111
```go
12-
observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) {
12+
observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
1313
next <- rxgo.Of(1)
1414
next <- rxgo.Of(2)
1515
next <- rxgo.Of(3)
16-
done()
1716
}})
1817
```
1918

@@ -25,12 +24,6 @@ Output:
2524
3
2625
```
2726

28-
There are two ways to close the Observable:
29-
* Closing the `next` channel.
30-
* Calling the `done()` function.
31-
32-
Yet, as we can pass multiple producers, using the `done()` function is the recommended approach.
33-
3427
## Options
3528

3629
### WithBufferedChannel

doc/timer.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Overview
44

5-
Create an Observable that emits a single item after a given delay.
5+
Create an Observable that completes after a specified delay.
66

77
![](http://reactivex.io/documentation/operators/images/timer.png)
88

factory.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ func Thrown(err error) Observable {
336336
}
337337
}
338338

339-
// Timer returns an Observable that emits an empty structure after a specified delay, and then completes.
339+
// Timer returns an Observable that completes after a specified delay.
340340
func Timer(d Duration, opts ...Option) Observable {
341341
option := parseOptions(opts...)
342342
next := make(chan Item, 1)
@@ -348,7 +348,7 @@ func Timer(d Duration, opts ...Option) Observable {
348348
case <-ctx.Done():
349349
return
350350
case <-time.After(d.duration()):
351-
next <- Of(struct{}{})
351+
return
352352
}
353353
}()
354354
return &ObservableImpl{

factory_test.go

+55-28
Original file line numberDiff line numberDiff line change
@@ -84,76 +84,97 @@ func Test_Concat_OneEmptyObservable(t *testing.T) {
8484
}
8585

8686
func Test_Create(t *testing.T) {
87-
obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
87+
obs := Create([]Producer{func(ctx context.Context, next chan<- Item) {
8888
next <- Of(1)
8989
next <- Of(2)
9090
next <- Of(3)
91-
done()
9291
}})
9392
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
9493
}
9594

9695
func Test_Create_SingleDup(t *testing.T) {
97-
obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
96+
obs := Create([]Producer{func(ctx context.Context, next chan<- Item) {
9897
next <- Of(1)
9998
next <- Of(2)
10099
next <- Of(3)
101-
done()
102100
}})
103101
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
104102
Assert(context.Background(), t, obs, IsEmpty(), HasNoError())
105103
}
106104

105+
func Test_Create_ContextCancelled(t *testing.T) {
106+
closed1 := make(chan struct{})
107+
ctx, cancel := context.WithCancel(context.Background())
108+
Create([]Producer{
109+
func(ctx context.Context, next chan<- Item) {
110+
cancel()
111+
}, func(ctx context.Context, next chan<- Item) {
112+
<-ctx.Done()
113+
closed1 <- struct{}{}
114+
},
115+
}, WithContext(ctx)).Run()
116+
117+
select {
118+
case <-time.Tick(time.Second):
119+
assert.FailNow(t, "producer not closed")
120+
case <-closed1:
121+
}
122+
}
123+
107124
func Test_Defer(t *testing.T) {
108-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
125+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
109126
next <- Of(1)
110127
next <- Of(2)
111128
next <- Of(3)
112-
done()
113129
}})
114130
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
115131
}
116132

117133
func Test_Defer_Multiple(t *testing.T) {
118-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
134+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
119135
next <- Of(1)
120136
next <- Of(2)
121-
done()
122-
}, func(ctx context.Context, next chan<- Item, done func()) {
137+
}, func(ctx context.Context, next chan<- Item) {
123138
next <- Of(10)
124139
next <- Of(20)
125-
done()
126140
}})
127141
Assert(context.Background(), t, obs, HasItemsNoOrder(1, 2, 10, 20), HasNoError())
128142
}
129143

130-
func Test_Defer_Close(t *testing.T) {
131-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
132-
next <- Of(1)
133-
next <- Of(2)
134-
next <- Of(3)
135-
done()
136-
}})
137-
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
144+
func Test_Defer_ContextCancelled(t *testing.T) {
145+
closed1 := make(chan struct{})
146+
ctx, cancel := context.WithCancel(context.Background())
147+
Defer([]Producer{
148+
func(ctx context.Context, next chan<- Item) {
149+
cancel()
150+
}, func(ctx context.Context, next chan<- Item) {
151+
<-ctx.Done()
152+
closed1 <- struct{}{}
153+
},
154+
}, WithContext(ctx)).Run()
155+
156+
select {
157+
case <-time.Tick(time.Second):
158+
assert.FailNow(t, "producer not closed")
159+
case <-closed1:
160+
}
138161
}
139162

140163
func Test_Defer_SingleDup(t *testing.T) {
141-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
164+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
142165
next <- Of(1)
143166
next <- Of(2)
144167
next <- Of(3)
145-
done()
146168
}})
147169
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
148170
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
149171
}
150172

151173
func Test_Defer_ComposedDup(t *testing.T) {
152-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
174+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
153175
next <- Of(1)
154176
next <- Of(2)
155177
next <- Of(3)
156-
done()
157178
}}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
158179
return i.(int) + 1, nil
159180
}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
@@ -164,11 +185,10 @@ func Test_Defer_ComposedDup(t *testing.T) {
164185
}
165186

166187
func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
167-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
188+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
168189
next <- Of(1)
169190
next <- Of(2)
170191
next <- Of(3)
171-
done()
172192
}}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
173193
return i.(int) + 1, nil
174194
}, WithObservationStrategy(Eager)).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
@@ -181,11 +201,10 @@ func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
181201
}
182202

183203
func Test_Defer_Error(t *testing.T) {
184-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
204+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
185205
next <- Of(1)
186206
next <- Of(2)
187207
next <- Error(errFoo)
188-
done()
189208
}})
190209
Assert(context.Background(), t, obs, HasItems(1, 2), HasError(errFoo))
191210
}
@@ -366,7 +385,11 @@ func Test_Thrown(t *testing.T) {
366385

367386
func Test_Timer(t *testing.T) {
368387
obs := Timer(WithDuration(time.Nanosecond))
369-
Assert(context.Background(), t, obs, IsNotEmpty())
388+
select {
389+
case <-time.Tick(time.Second):
390+
assert.FailNow(t, "observable not closed")
391+
case <-obs.Observe():
392+
}
370393
}
371394

372395
func Test_Timer_Empty(t *testing.T) {
@@ -376,5 +399,9 @@ func Test_Timer_Empty(t *testing.T) {
376399
time.Sleep(50 * time.Millisecond)
377400
cancel()
378401
}()
379-
Assert(context.Background(), t, obs, IsEmpty())
402+
select {
403+
case <-time.Tick(time.Second):
404+
assert.FailNow(t, "observable not closed")
405+
case <-obs.Observe():
406+
}
380407
}

item.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ func (i Item) SendBlocking(ch chan<- Item) {
8181
ch <- i
8282
}
8383

84-
// SendCtx sends an item and blocks until it is sent or a context canceled.
84+
// SendContext sends an item and blocks until it is sent or a context canceled.
8585
// It returns a boolean to indicate whether the item was sent.
86-
func (i Item) SendCtx(ctx context.Context, ch chan<- Item) bool {
86+
func (i Item) SendContext(ctx context.Context, ch chan<- Item) bool {
8787
select {
8888
case <-ctx.Done():
8989
return false

item_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func Test_Item_SendContext_True(t *testing.T) {
4343
defer close(ch)
4444
ctx, cancel := context.WithCancel(context.Background())
4545
defer cancel()
46-
assert.True(t, Of(5).SendCtx(ctx, ch))
46+
assert.True(t, Of(5).SendContext(ctx, ch))
4747
}
4848

4949
func Test_Item_SendNonBlocking(t *testing.T) {

iterable_create.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable {
1515
ctx := option.buildContext()
1616

1717
wg := sync.WaitGroup{}
18-
done := func() {
19-
wg.Done()
20-
}
2118
for _, f := range fs {
19+
f := f
2220
wg.Add(1)
23-
go f(ctx, next, done)
21+
go func() {
22+
defer wg.Done()
23+
f(ctx, next)
24+
}()
2425
}
2526
go func() {
2627
wg.Wait()

iterable_defer.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ func (i *deferIterable) Observe(opts ...Option) <-chan Item {
2222
ctx := option.buildContext()
2323

2424
wg := sync.WaitGroup{}
25-
done := func() {
26-
wg.Done()
27-
}
2825
for _, f := range i.f {
26+
f := f
2927
wg.Add(1)
30-
go f(ctx, next, done)
28+
go func() {
29+
defer wg.Done()
30+
f(ctx, next)
31+
}()
3132
}
3233
go func() {
3334
wg.Wait()

observable.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type ObservableImpl struct {
8888
}
8989

9090
func defaultErrorFuncOperator(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
91-
item.SendCtx(ctx, dst)
91+
item.SendContext(ctx, dst)
9292
operatorOptions.stop()
9393
}
9494

@@ -297,7 +297,7 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact
297297
case item, ok := <-observe:
298298
if !ok {
299299
if !bypassGather {
300-
Of(op).SendCtx(ctx, gather)
300+
Of(op).SendContext(ctx, gather)
301301
}
302302
return
303303
}

0 commit comments

Comments
 (0)