1818import static org .junit .Assert .*;
1919import static org .mockito .Matchers .*;
2020import static org .mockito .Mockito .*;
21- import static rx .operators .OperatorRetry .*;
2221
2322import java .util .concurrent .atomic .AtomicInteger ;
2423
2524import org .junit .Test ;
2625import org .mockito .InOrder ;
2726
2827import rx .Observable ;
28+ import rx .Observable .OnSubscribeFunc ;
2929import rx .Observer ;
3030import rx .Subscription ;
3131import rx .functions .Action1 ;
@@ -130,7 +130,7 @@ public Subscription onSubscribe(Observer<? super String> o) {
130130 return Subscriptions .empty ();
131131 }
132132 }
133-
133+
134134 @ Test
135135 public void testUnsubscribeFromRetry () {
136136 PublishSubject <Integer > subject = PublishSubject .create ();
@@ -139,10 +139,44 @@ public void testUnsubscribeFromRetry() {
139139 @ Override
140140 public void call (Integer n ) {
141141 count .incrementAndGet ();
142- }});
142+ }
143+ });
143144 subject .onNext (1 );
144145 sub .unsubscribe ();
145146 subject .onNext (2 );
146- assertEquals (1 ,count .get ());
147+ assertEquals (1 , count .get ());
148+ }
149+
150+ @ Test
151+ public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed () throws InterruptedException {
152+ final AtomicInteger subsCount = new AtomicInteger (0 );
153+ OnSubscribeFunc <String > onSubscribe = new OnSubscribeFunc <String >() {
154+ @ Override
155+ public Subscription onSubscribe (Observer <? super String > observer ) {
156+ subsCount .incrementAndGet ();
157+ return new Subscription () {
158+ boolean unsubscribed = false ;
159+
160+ @ Override
161+ public void unsubscribe () {
162+ subsCount .decrementAndGet ();
163+ unsubscribed = true ;
164+ }
165+
166+ @ Override
167+ public boolean isUnsubscribed () {
168+ return unsubscribed ;
169+ }
170+ };
171+ }
172+ };
173+ Observable <String > stream = Observable .create (onSubscribe );
174+ Observable <String > streamWithRetry = stream .retry ();
175+ Subscription sub = streamWithRetry .subscribe ();
176+ assertEquals (1 , subsCount .get ());
177+ sub .unsubscribe ();
178+ assertEquals (0 , subsCount .get ());
179+ streamWithRetry .subscribe ();
180+ assertEquals (1 , subsCount .get ());
147181 }
148182}
0 commit comments