From 238b4badc4b544ad682bbefa548fc771fb5cb5c1 Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Sun, 15 Feb 2015 19:38:49 -0500 Subject: [PATCH 1/2] Implementation for an RxJava top tags module --- rxjava-top-tags/README.md | 57 +++++++++++++ rxjava-top-tags/pom.xml | 61 ++++++++++++++ .../src/main/java/com/acme/TopTags.java | 81 +++++++++++++++++++ .../java/com/acme/TopTagsOptionsMetadata.java | 49 +++++++++++ .../main/resources/config/rxjava-top-tags.xml | 29 +++++++ .../resources/config/spring-module.properties | 1 + 6 files changed, 278 insertions(+) create mode 100644 rxjava-top-tags/README.md create mode 100644 rxjava-top-tags/pom.xml create mode 100644 rxjava-top-tags/src/main/java/com/acme/TopTags.java create mode 100644 rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java create mode 100644 rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml create mode 100644 rxjava-top-tags/src/main/resources/config/spring-module.properties diff --git a/rxjava-top-tags/README.md b/rxjava-top-tags/README.md new file mode 100644 index 0000000..074c4ef --- /dev/null +++ b/rxjava-top-tags/README.md @@ -0,0 +1,57 @@ +Spring XD Reactor Stream Example +================================ + +This is an example of a custom module that uses RxJava's Observable API. + +## Requirements + +In order to install the module run it in your Spring XD installation, you will need to have installed: + +* Spring XD version 1.1.x ([Instructions](http://docs.spring.io/spring-xd/docs/current/reference/html/#getting-started)). You'll need to build Spring XD with Java 8+ to use this sample (which uses lambda expressions). + +## Code Tour + +The heart of the sample is the processing module named [TopTags.java](src/main/java/com/acme/TopTags.java). +This uses the Observable API to calculate the most referenced tags in a given time window. The[Tuple](http://docs.spring.io/spring-xd/docs/current/reference/html/#tuples) data type is used as a generic container for keyed data. + + +## Building + + $ mvn package + +## Using the Custom Module + +The uber-jar will be in `target/rxjava-top-tags-1.0.0.BUILD-SNAPSHOT.jar`. To install and register the module to your Spring XD distribution, use the `module upload` Spring XD shell command. Start Spring XD and the shell: + +``` + _____ __ _______ +/ ___| (-) \ \ / / _ \ +\ `--. _ __ _ __ _ _ __ __ _ \ V /| | | | +`--. \ '_ \| '__| | '_ \ / _` | / ^ \| | | | +/\__/ / |_) | | | | | | | (_| | / / \ \ |/ / +\____/| .__/|_| |_|_| |_|\__, | \/ \/___/ + | | __/ | + |_| |___/ +eXtreme Data +1.1.0.BUILD-SNAPSHOT | Admin Server Target: http://localhost:9393 +Welcome to the Spring XD shell. For assistance hit TAB or type "help". +xd:>module upload --file [path-to]/spring-xd-samples/rxjava-top-tags/target/rxjava-top-tags-1.0.0.BUILD-SNAPSHOT.jar --name rxjava-top-tags --type processor +Successfully uploaded module 'processor:reactor-top-tags' +xd:> +``` + +Now create an deploy a stream: + +``` +xd:>stream create reactor --definition "tweetstream | rxjava-top-tags | log" --deploy +``` + +The `rxjava-top-tags` processor also supports the `timeWindow` and `topN` parameters for customizing the processor's +behavior. + +You should see the stream output in the Spring XD log, indicating the top N tags for the given interval: + +``` +2015-02-15 20:13:49,077 1.1.0.RELEASE INFO RxComputationThreadPool-3 sink.top-tags - {"id":"8df84f9b-40ee-23c3-7473-fa611c43a19d","timestamp":1424049229077,"topTags":{"SNL40":18,"NBAAllStarNYC":4,"SpringXD":4}} + +``` diff --git a/rxjava-top-tags/pom.xml b/rxjava-top-tags/pom.xml new file mode 100644 index 0000000..720b838 --- /dev/null +++ b/rxjava-top-tags/pom.xml @@ -0,0 +1,61 @@ + + 4.0.0 + com.acme + rxjava-top-tags + 1.0.0.BUILD-SNAPSHOT + + org.springframework.xd + spring-xd-module-parent + 1.1.0.RELEASE + + + + + spring-milestones + Spring Milestones + http://repo.spring.io/libs-milestone + + false + + + + spring-releases + Spring Releases + http://repo.spring.io/release + + false + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.8 + 1.8 + + + + + + + + org.springframework.xd. + spring-xd-rxjava + 1.1.0.RELEASE + + + + io.reactivex + rxjava + 1.0.0 + + + + + diff --git a/rxjava-top-tags/src/main/java/com/acme/TopTags.java b/rxjava-top-tags/src/main/java/com/acme/TopTags.java new file mode 100644 index 0000000..5f907e8 --- /dev/null +++ b/rxjava-top-tags/src/main/java/com/acme/TopTags.java @@ -0,0 +1,81 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.acme; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.springframework.xd.tuple.TupleBuilder.tuple; + +import java.util.LinkedHashMap; +import java.util.stream.Collectors; + +import com.gs.collections.api.tuple.Pair; +import com.gs.collections.impl.tuple.Tuples; +import com.jayway.jsonpath.JsonPath; +import net.minidev.json.JSONArray; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import rx.Observable; + +import org.springframework.xd.rxjava.Processor; +import org.springframework.xd.tuple.Tuple; + +/** + * @author Marius Bogoevici + */ +public class TopTags implements Processor { + + private int timeWindow; + + private int topN; + + public TopTags(int timeWindow, int topN) { + this.timeWindow = timeWindow; + this.topN = topN; + } + + private static Log logger = LogFactory.getLog(TopTags.class); + + @Override + public Observable process(Observable inputStream) { + return inputStream.flatMap(tweet -> { + JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text"); + return Observable.from(array.toArray(new String[array.size()])); + }) + // create (tag,1) tuple for each incoming tag + .map(tag -> Tuples.pair(tag, 1)) + // batch all tags in the time window + .window(timeWindow, SECONDS) + // with each time window stream + .flatMap(windowBuffer -> + windowBuffer + // reduce by tag, counting all entries with the same tag + .groupBy(Pair::getOne) + .flatMap( + groupedStream -> + groupedStream.reduce((acc, v) -> Tuples.pair(acc.getOne(), acc.getTwo() + v.getTwo())) + ) + // sort the results + .toSortedList((a, b) -> -a.getTwo().compareTo(b.getTwo())) + // convert the output to a friendlier format + .map(l -> tuple().of("topTags", + l.subList(0, Math.min(topN, l.size())) + .stream().collect(Collectors.toMap(Pair::getOne, Pair::getTwo, (v1, v2) -> v1, LinkedHashMap::new) + ) + ) + ) + ); + } +} diff --git a/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java b/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java new file mode 100644 index 0000000..55101c0 --- /dev/null +++ b/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java @@ -0,0 +1,49 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.acme; + +import org.springframework.xd.module.options.spi.ModuleOption; + +/** + * Holds options for the TopTags module + * + * @author Mark Pollack + * @author Marius Bogoevici + */ +public class TopTagsOptionsMetadata { + + private int timeWindow = 1; + + private int topN = 10; + + public int getTopN() { + return topN; + } + + @ModuleOption("The number of entires to include in the top N listing") + public void setTopN(int topN) { + this.topN = topN; + } + + public int getTimeWindow() { + return timeWindow; + } + + @ModuleOption("The length in seconds of the time window") + public void setTimeWindow(int timeWindow) { + this.timeWindow = timeWindow; + } +} diff --git a/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml b/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml new file mode 100644 index 0000000..04df222 --- /dev/null +++ b/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/rxjava-top-tags/src/main/resources/config/spring-module.properties b/rxjava-top-tags/src/main/resources/config/spring-module.properties new file mode 100644 index 0000000..4aea295 --- /dev/null +++ b/rxjava-top-tags/src/main/resources/config/spring-module.properties @@ -0,0 +1 @@ +options_class = com.acme.TopTagsOptionsMetadata From 439a82c90a849770a9fbb6327d8bb6591a36677b Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Tue, 17 Feb 2015 14:35:00 -0500 Subject: [PATCH 2/2] Convert example to a sliding time window --- rxjava-top-tags/src/main/java/com/acme/TopTags.java | 7 +++++-- .../main/java/com/acme/TopTagsOptionsMetadata.java | 13 ++++++++++++- .../src/main/resources/config/rxjava-top-tags.xml | 3 ++- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/rxjava-top-tags/src/main/java/com/acme/TopTags.java b/rxjava-top-tags/src/main/java/com/acme/TopTags.java index 5f907e8..167765b 100644 --- a/rxjava-top-tags/src/main/java/com/acme/TopTags.java +++ b/rxjava-top-tags/src/main/java/com/acme/TopTags.java @@ -39,10 +39,13 @@ public class TopTags implements Processor { private int timeWindow; + private int timeShift; + private int topN; - public TopTags(int timeWindow, int topN) { + public TopTags(int timeWindow, int timeShift, int topN) { this.timeWindow = timeWindow; + this.timeShift = timeShift; this.topN = topN; } @@ -57,7 +60,7 @@ public Observable process(Observable inputStream) { // create (tag,1) tuple for each incoming tag .map(tag -> Tuples.pair(tag, 1)) // batch all tags in the time window - .window(timeWindow, SECONDS) + .window(timeWindow, timeShift, SECONDS) // with each time window stream .flatMap(windowBuffer -> windowBuffer diff --git a/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java b/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java index 55101c0..d5310d3 100644 --- a/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java +++ b/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java @@ -27,6 +27,8 @@ public class TopTagsOptionsMetadata { private int timeWindow = 1; + private int timeShift = 1; + private int topN = 10; public int getTopN() { @@ -42,8 +44,17 @@ public int getTimeWindow() { return timeWindow; } - @ModuleOption("The length in seconds of the time window") + @ModuleOption("The length in seconds of the time window over which the top N tags are calculated") public void setTimeWindow(int timeWindow) { this.timeWindow = timeWindow; } + + public int getTimeShift() { + return timeShift; + } + + @ModuleOption("The frequency in seconds with which the top N tags are calculated") + public void setTimeShift(int timeShift) { + this.timeShift = timeShift; + } } diff --git a/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml b/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml index 04df222..2ce2e36 100644 --- a/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml +++ b/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml @@ -8,7 +8,8 @@ - + +