@@ -9,10 +9,12 @@ import (
9
9
"time"
10
10
11
11
"github.com/stretchr/testify/assert"
12
+ "go.uber.org/goleak"
12
13
"golang.org/x/sync/errgroup"
13
14
)
14
15
15
16
func Test_Connectable_IterableChannel_Single (t * testing.T ) {
17
+ defer goleak .VerifyNone (t )
16
18
ch := make (chan Item , 10 )
17
19
go func () {
18
20
ch <- Of (1 )
@@ -27,6 +29,7 @@ func Test_Connectable_IterableChannel_Single(t *testing.T) {
27
29
}
28
30
29
31
func Test_Connectable_IterableChannel_Composed (t * testing.T ) {
32
+ defer goleak .VerifyNone (t )
30
33
ch := make (chan Item , 10 )
31
34
go func () {
32
35
ch <- Of (1 )
@@ -41,6 +44,7 @@ func Test_Connectable_IterableChannel_Composed(t *testing.T) {
41
44
}
42
45
43
46
func Test_Connectable_IterableChannel_Disposed (t * testing.T ) {
47
+ defer goleak .VerifyNone (t )
44
48
ch := make (chan Item , 10 )
45
49
go func () {
46
50
ch <- Of (1 )
@@ -51,15 +55,16 @@ func Test_Connectable_IterableChannel_Disposed(t *testing.T) {
51
55
obs := & ObservableImpl {
52
56
iterable : newChannelIterable (ch , WithPublishStrategy ()),
53
57
}
54
- _ , disposable := obs .Connect ()
55
- disposable ()
56
58
ctx , cancel := context .WithTimeout (context .Background (), 50 * time .Millisecond )
57
59
defer cancel ()
60
+ _ , disposable := obs .Connect (ctx )
61
+ disposable ()
58
62
time .Sleep (50 * time .Millisecond )
59
63
Assert (ctx , t , obs , IsEmpty ())
60
64
}
61
65
62
66
func Test_Connectable_IterableChannel_WithoutConnect (t * testing.T ) {
67
+ defer goleak .VerifyNone (t )
63
68
ch := make (chan Item , 10 )
64
69
go func () {
65
70
ch <- Of (1 )
@@ -74,6 +79,7 @@ func Test_Connectable_IterableChannel_WithoutConnect(t *testing.T) {
74
79
}
75
80
76
81
func Test_Connectable_IterableCreate_Single (t * testing.T ) {
82
+ defer goleak .VerifyNone (t )
77
83
ctx , cancel := context .WithCancel (context .Background ())
78
84
defer cancel ()
79
85
obs := & ObservableImpl {
@@ -88,6 +94,7 @@ func Test_Connectable_IterableCreate_Single(t *testing.T) {
88
94
}
89
95
90
96
func Test_Connectable_IterableCreate_Composed (t * testing.T ) {
97
+ defer goleak .VerifyNone (t )
91
98
ctx , cancel := context .WithCancel (context .Background ())
92
99
defer cancel ()
93
100
obs := & ObservableImpl {
@@ -102,6 +109,7 @@ func Test_Connectable_IterableCreate_Composed(t *testing.T) {
102
109
}
103
110
104
111
func Test_Connectable_IterableCreate_Disposed (t * testing.T ) {
112
+ defer goleak .VerifyNone (t )
105
113
ctx , cancel := context .WithCancel (context .Background ())
106
114
defer cancel ()
107
115
obs := & ObservableImpl {
@@ -112,14 +120,15 @@ func Test_Connectable_IterableCreate_Disposed(t *testing.T) {
112
120
cancel ()
113
121
}}, WithPublishStrategy (), WithContext (ctx )),
114
122
}
115
- obs .Connect ()
123
+ obs .Connect (ctx )
116
124
_ , cancel2 := context .WithTimeout (context .Background (), 550 * time .Millisecond )
117
125
defer cancel2 ()
118
126
time .Sleep (50 * time .Millisecond )
119
127
Assert (ctx , t , obs , IsEmpty ())
120
128
}
121
129
122
130
func Test_Connectable_IterableCreate_WithoutConnect (t * testing.T ) {
131
+ defer goleak .VerifyNone (t )
123
132
ctx , cancel := context .WithCancel (context .Background ())
124
133
defer cancel ()
125
134
obs := & ObservableImpl {
@@ -128,12 +137,13 @@ func Test_Connectable_IterableCreate_WithoutConnect(t *testing.T) {
128
137
ch <- Of (2 )
129
138
ch <- Of (3 )
130
139
cancel ()
131
- }}, WithPublishStrategy (), WithContext (ctx )),
140
+ }}, WithBufferedChannel ( 3 ), WithPublishStrategy (), WithContext (ctx )),
132
141
}
133
142
testConnectableWithoutConnect (t , obs )
134
143
}
135
144
136
145
func Test_Connectable_IterableDefer_Single (t * testing.T ) {
146
+ defer goleak .VerifyNone (t )
137
147
ctx , cancel := context .WithCancel (context .Background ())
138
148
defer cancel ()
139
149
obs := & ObservableImpl {
@@ -142,12 +152,13 @@ func Test_Connectable_IterableDefer_Single(t *testing.T) {
142
152
ch <- Of (2 )
143
153
ch <- Of (3 )
144
154
cancel ()
145
- }}, WithPublishStrategy (), WithContext (ctx )),
155
+ }}, WithBufferedChannel ( 3 ), WithPublishStrategy (), WithContext (ctx )),
146
156
}
147
157
testConnectableSingle (t , obs )
148
158
}
149
159
150
160
func Test_Connectable_IterableDefer_Composed (t * testing.T ) {
161
+ defer goleak .VerifyNone (t )
151
162
ctx , cancel := context .WithCancel (context .Background ())
152
163
defer cancel ()
153
164
obs := & ObservableImpl {
@@ -156,12 +167,13 @@ func Test_Connectable_IterableDefer_Composed(t *testing.T) {
156
167
ch <- Of (2 )
157
168
ch <- Of (3 )
158
169
cancel ()
159
- }}, WithPublishStrategy (), WithContext (ctx )),
170
+ }}, WithBufferedChannel ( 3 ), WithPublishStrategy (), WithContext (ctx )),
160
171
}
161
172
testConnectableComposed (t , obs )
162
173
}
163
174
164
175
func Test_Connectable_IterableJust_Single (t * testing.T ) {
176
+ defer goleak .VerifyNone (t )
165
177
ctx , cancel := context .WithCancel (context .Background ())
166
178
defer cancel ()
167
179
obs := & ObservableImpl {
@@ -171,6 +183,7 @@ func Test_Connectable_IterableJust_Single(t *testing.T) {
171
183
}
172
184
173
185
func Test_Connectable_IterableJust_Composed (t * testing.T ) {
186
+ defer goleak .VerifyNone (t )
174
187
ctx , cancel := context .WithCancel (context .Background ())
175
188
defer cancel ()
176
189
obs := & ObservableImpl {
@@ -180,6 +193,7 @@ func Test_Connectable_IterableJust_Composed(t *testing.T) {
180
193
}
181
194
182
195
func Test_Connectable_IterableRange_Single (t * testing.T ) {
196
+ defer goleak .VerifyNone (t )
183
197
ctx , cancel := context .WithCancel (context .Background ())
184
198
defer cancel ()
185
199
obs := & ObservableImpl {
@@ -189,6 +203,7 @@ func Test_Connectable_IterableRange_Single(t *testing.T) {
189
203
}
190
204
191
205
func Test_Connectable_IterableRange_Composed (t * testing.T ) {
206
+ defer goleak .VerifyNone (t )
192
207
ctx , cancel := context .WithCancel (context .Background ())
193
208
defer cancel ()
194
209
obs := & ObservableImpl {
@@ -198,6 +213,7 @@ func Test_Connectable_IterableRange_Composed(t *testing.T) {
198
213
}
199
214
200
215
func Test_Connectable_IterableSlice_Single (t * testing.T ) {
216
+ defer goleak .VerifyNone (t )
201
217
ctx , cancel := context .WithCancel (context .Background ())
202
218
defer cancel ()
203
219
obs := & ObservableImpl {iterable : newSliceIterable ([]Item {Of (1 ), Of (2 ), Of (3 )},
@@ -206,6 +222,7 @@ func Test_Connectable_IterableSlice_Single(t *testing.T) {
206
222
}
207
223
208
224
func Test_Connectable_IterableSlice_Composed (t * testing.T ) {
225
+ defer goleak .VerifyNone (t )
209
226
ctx , cancel := context .WithCancel (context .Background ())
210
227
defer cancel ()
211
228
obs := & ObservableImpl {iterable : newSliceIterable ([]Item {Of (1 ), Of (2 ), Of (3 )},
@@ -241,7 +258,7 @@ func testConnectableSingle(t *testing.T, obs Observable) {
241
258
}
242
259
243
260
wg .Wait ()
244
- obs .Connect ()
261
+ obs .Connect (ctx )
245
262
assert .NoError (t , eg .Wait ())
246
263
}
247
264
@@ -278,7 +295,7 @@ func testConnectableComposed(t *testing.T, obs Observable) {
278
295
}
279
296
280
297
wg .Wait ()
281
- obs .Connect ()
298
+ obs .Connect (ctx )
282
299
assert .NoError (t , eg .Wait ())
283
300
}
284
301
0 commit comments