diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..3de2178 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -29,7 +29,9 @@ public class Practice1 { * 参数 Observable["a","b","c"] * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ - public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + public static Observable> indexable(Observable observable) { + return observable.zipWith(Observable.range(1, Integer.MAX_VALUE), (a1,a2)->{return new Tuple2<>(a2,a1);}); } + + } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..2583007 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,10 +17,12 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; +import java.util.HashMap; import java.util.Map; /** @@ -34,7 +36,7 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(e -> e).flatMap(e -> e.count().toObservable().map(x -> Tuples.of(e.getKey(), x.intValue()))); } /* @@ -43,7 +45,10 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.reduce(new HashMap(), (a, b) -> { + if (a.containsKey(b)) { a.put(b, a.get(b) + 1); } else { a.put(b, 1); } + return a; + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..bb6fd7c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -23,13 +23,18 @@ * @author Baoyi Chen */ public class Practice3 { - + public static void main(String[] args) { + Observable.just(4,3,5,6,7,8).reduce((x,y)->x+y).subscribe(e->System.out.println(e));; + } /* * 根据iterate的结果求和 */ - public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } + public Maybe sum(Observable observable) { + return this.iterate(observable).reduce((x, y) -> { + return x + y; + }); + + } /* * 举例: @@ -42,7 +47,22 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + Observable ob = Observable.create( emitter -> { + observable.map( e -> { + if (e.left != null) { + //left = iterate(Observable.just(e.left)).elementAt(0, Observable.just(0)).subscribe() + iterate(Observable.just(e.left)).subscribe(l -> emitter.onNext(l)); + } + if (e.right != null) { + iterate(Observable.just(e.right)).subscribe(r -> emitter.onNext(r)); + } + emitter.onNext(e.value); + return e.value; + }).subscribe(); + emitter.onComplete(); + }); + + return ob; } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..c1c713f 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,7 @@ import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +45,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.concatMap(e->Observable.just(e).observeOn(Schedulers.newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..6434fa3 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,14 +16,17 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + /** * @author Baoyi Chen */ @@ -35,7 +38,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.toList().map(e->(long)e.size()); } /* @@ -44,7 +47,10 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + List r = new ArrayList<>(); + source.subscribe(e->r.addAll(e)); + return Observable.fromIterable(r); + } /* @@ -52,9 +58,17 @@ public Observable convert(Observable> source) { * param: Observable["a", "a", "b", "b", "c"] * return: Observable["a", "b", "c"] */ - public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); - } + public Observable distinct(Observable source) { + Map a = new HashMap<>(); + return source.filter(e -> { + if (a.containsKey(e)) + return false; + else { + a.put(e, e); + return true; + } + }); + } /* * example: @@ -62,7 +76,9 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(e -> { + if (conditon.test(e)) return Observable.just(e); else return Observable.empty(); + }); } /* @@ -71,16 +87,17 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.skip(index).take(1).firstElement(); } + /* * example: * param: Observable["a", "b"] , count = 2 * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return Observable.range(0, count).concatMap(e -> source); } /* @@ -89,7 +106,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).concatMap(e -> e); } /* @@ -98,7 +115,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(e -> e); } /* @@ -107,7 +124,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(e -> Observable.just(e).delay(delay, unit)); } }