-
Notifications
You must be signed in to change notification settings - Fork 341
/
Copy pathobservable_operator_option_test.go
102 lines (91 loc) · 2.91 KB
/
observable_operator_option_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package rxgo
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_Observable_Option_WithOnErrorStrategy_Single(t *testing.T) {
obs := testObservable(1, 2, 3).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
if i == 2 {
return nil, errFoo
}
return i, nil
}, WithErrorStrategy(ContinueOnError))
Assert(context.Background(), t, obs, HasItems(1, 3), HasError(errFoo))
}
func Test_Observable_Option_WithOnErrorStrategy_Propagate(t *testing.T) {
obs := testObservable(1, 2, 3).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
if i == 1 {
return nil, errFoo
}
return i, nil
}).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
if i == 2 {
return nil, errBar
}
return i, nil
}, WithErrorStrategy(ContinueOnError))
Assert(context.Background(), t, obs, HasItems(3), HasErrors(errFoo, errBar))
}
func Test_Observable_Option_SimpleCapacity(t *testing.T) {
ch := Just(1)(WithBufferedChannel(5)).Observe()
assert.Equal(t, 5, cap(ch))
}
func Test_Observable_Option_ComposedCapacity(t *testing.T) {
obs1 := Just(1)().Map(func(_ context.Context, _ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(11))
obs2 := obs1.Map(func(_ context.Context, _ interface{}) (interface{}, error) {
return 1, nil
}, WithBufferedChannel(12))
assert.Equal(t, 11, cap(obs1.Observe()))
assert.Equal(t, 12, cap(obs2.Observe()))
}
func Test_Observable_Option_ContextPropagation(t *testing.T) {
expectedCtx := context.Background()
var gotCtx context.Context
<-Just(1)().Map(func(ctx context.Context, i interface{}) (interface{}, error) {
gotCtx = ctx
return i, nil
}, WithContext(expectedCtx)).Run()
assert.Equal(t, expectedCtx, gotCtx)
}
func Test_Observable_Option_Serialize(t *testing.T) {
idx := 0
<-Range(0, 10000).Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
}, WithBufferedChannel(10), WithCPUPool(), Serialize(func(i interface{}) int {
return i.(int)
})).DoOnNext(func(i interface{}) {
v := i.(int)
if v != idx {
assert.FailNow(t, "not sequential", "expected=%d, got=%d", idx, v)
}
idx++
})
}
func Test_Observable_Option_Serialize_SingleElement(t *testing.T) {
idx := 0
<-Just(0)().Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
}, WithBufferedChannel(10), WithCPUPool(), Serialize(func(i interface{}) int {
return i.(int)
})).DoOnNext(func(i interface{}) {
v := i.(int)
if v != idx {
assert.FailNow(t, "not sequential", "expected=%d, got=%d", idx, v)
}
idx++
})
}
func Test_Observable_Option_Serialize_Error(t *testing.T) {
obs := testObservable(errFoo, 2, 3, 4).Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
}, WithBufferedChannel(10), WithCPUPool(), Serialize(func(i interface{}) int {
return i.(int)
}))
Assert(context.Background(), t, obs, IsEmpty(), HasError(errFoo))
}