Skip to content

Commit 552185a

Browse files
committed
prevent repeititious emissions, add unit tests
1 parent 3bd9e78 commit 552185a

File tree

2 files changed

+100
-12
lines changed

2 files changed

+100
-12
lines changed

src/main/java/rx/javafx/sources/CompositeObservable.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,38 @@
2222

2323
/**
2424
* A CompositeObservable can merge multiple Observables that can be added/removed at any time,
25-
* affecting all Subscribers regardless of when they subscribed. This is especailly helpful for merging
25+
* affecting all Subscribers regardless of when they subscribed. This is especially helpful for merging
2626
* multiple UI event sources.
27+
*
2728
* @param <T>
2829
*/
2930
public final class CompositeObservable<T> {
3031

3132
private final ObservableList<Observable<T>> sources;
32-
private final Observable<T> observable;
33+
private final int initialCapacity;
3334

3435
public CompositeObservable() {
3536
this(-1);
3637
}
3738

3839
public CompositeObservable(int initialCapacity) {
40+
this.initialCapacity = initialCapacity;
3941
sources = FXCollections.synchronizedObservableList(FXCollections.observableArrayList());
42+
}
4043

41-
Observable<T> observable = JavaFxObservable.fromObservableList(sources)
42-
.switchMap(list -> Observable.from(list).flatMap((Observable<T> obs) -> obs));
44+
public Observable<T> toObservable() {
45+
Observable<T> updatingSource = Observable.merge(
46+
Observable.from(sources).flatMap(obs -> obs.takeWhile(v -> sources.contains(obs))),
47+
JavaFxObservable.fromObservableListAdds(sources).flatMap(obs -> obs.takeWhile(v -> sources.contains(obs)))
48+
);
4349

4450
if (initialCapacity > 0) {
45-
this.observable = observable.cacheWithInitialCapacity(initialCapacity);
46-
}
47-
else {
48-
this.observable = observable;
51+
return updatingSource.cacheWithInitialCapacity(initialCapacity);
52+
} else {
53+
return updatingSource;
4954
}
5055
}
5156

52-
public Observable<T> toObservable() {
53-
return observable;
54-
}
55-
5657
public void add(Observable<T> observable) {
5758
sources.add(observable);
5859
}

src/test/java/rx/javafx/sources/JavaFxObservableTest.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import rx.observables.JavaFxObservable;
2929
import rx.schedulers.JavaFxScheduler;
3030
import rx.schedulers.Schedulers;
31+
import rx.subjects.PublishSubject;
3132

3233
import java.util.Arrays;
34+
import java.util.ArrayList;
35+
import java.util.List;
3336
import java.util.concurrent.CountDownLatch;
3437

3538
import static org.junit.Assert.assertTrue;
@@ -293,4 +296,88 @@ public String toString() {
293296
e.printStackTrace();
294297
}
295298
}
299+
300+
@Test
301+
public void testcompositeObservableInfinite() {
302+
303+
new JFXPanel();
304+
305+
CountDownLatch latch = new CountDownLatch(1);
306+
307+
Platform.runLater(() -> {
308+
final List<String> emissions = new ArrayList<>();
309+
CompositeObservable<String> compositeObservable = new CompositeObservable<>();
310+
311+
PublishSubject<String> source1 = PublishSubject.create();
312+
PublishSubject<String> source2 = PublishSubject.create();
313+
PublishSubject<String> source3 = PublishSubject.create();
314+
315+
compositeObservable.add(source1);
316+
compositeObservable.add(source2);
317+
compositeObservable.add(source3);
318+
319+
compositeObservable.toObservable().subscribe(emissions::add);
320+
321+
source1.onNext("Alpha");
322+
assertTrue(emissions.get(0).equals("Alpha"));
323+
324+
source2.onNext("Beta");
325+
assertTrue(emissions.get(1).equals("Beta"));
326+
327+
source3.onNext("Gamma");
328+
assertTrue(emissions.get(2).equals("Gamma"));
329+
330+
source1.onNext("Delta");
331+
assertTrue(emissions.get(3).equals("Delta"));
332+
333+
compositeObservable.remove(source2);
334+
335+
source2.onNext("Epsilon");
336+
assertTrue(emissions.size() == 4);
337+
338+
latch.countDown();
339+
});
340+
341+
try {
342+
latch.await();
343+
} catch (InterruptedException e) {
344+
e.printStackTrace();
345+
}
346+
}
347+
348+
@Test
349+
public void testcompositeObservableFinite() {
350+
351+
new JFXPanel();
352+
353+
CountDownLatch latch = new CountDownLatch(1);
354+
355+
Platform.runLater(() -> {
356+
final List<String> emissions = new ArrayList<>();
357+
CompositeObservable<String> compositeObservable = new CompositeObservable<>();
358+
359+
Observable<String> source1 = Observable.just("Alpha","Beta");
360+
Observable<String> source2 = Observable.just("Gamma","Delta");
361+
362+
compositeObservable.add(source1);
363+
364+
compositeObservable.toObservable().subscribe(emissions::add);
365+
366+
compositeObservable.add(source2);
367+
368+
assertTrue(emissions.size() == 4);
369+
370+
compositeObservable.remove(source2);
371+
372+
assertTrue(emissions.size() == 4);
373+
374+
latch.countDown();
375+
});
376+
377+
try {
378+
latch.await();
379+
} catch (InterruptedException e) {
380+
e.printStackTrace();
381+
}
382+
}
296383
}

0 commit comments

Comments
 (0)