Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
douglascraigschmidt committed Mar 6, 2024
1 parent 4863a64 commit 78fc8df
Show file tree
Hide file tree
Showing 16 changed files with 129 additions and 97 deletions.
2 changes: 1 addition & 1 deletion Reactor/ex2/.run/ex2.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="MAIN_CLASS_NAME" value="ex2" />
<module name="ex2.main" />
<option name="PROGRAM_PARAMETERS" value="-c 100 -i 10 -d true -P 1 -T ex2,Emitter,BlockingSubscriber,PrimeUtils" />
<option name="PROGRAM_PARAMETERS" value="-c 100 -i 10 -d true -P 5 -T ex2,Emitter,BlockingSubscriber,PrimeUtils" />
<method v="2">
<option name="Make" enabled="true" />
</method>
Expand Down
83 changes: 44 additions & 39 deletions Reactor/ex2/src/main/java/ex2.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import java.util.function.Function;

/**
* This program applies Project Reactor features to check the primality
* of randomly generate {@link Integer} objects via a publisher and
* a subscriber that (conditionally) run in different threads/schedulers.
* This program applies Project Reactor features to check the
* primality of randomly generate {@link Integer} objects via a
* publisher and a subscriber that (conditionally) run in different
* threads/schedulers.
*/
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
public class ex2 {
/**
* Debugging tag used by the logger.
Expand All @@ -32,9 +34,9 @@ public class ex2 {
.newParallel("publisher", 1);

/**
* The {@link Scheduler} used to consume random integers by checking
* if they are prime or not. This {@link Scheduler} can be either
* sequential or parallel, depending on program options.
* The {@link Scheduler} used to consume random integers by
* checking if they are prime or not. This {@link Scheduler} can
* be either sequential or parallel, depending on program options.
*/
private final Scheduler mSubscriberScheduler;

Expand All @@ -51,14 +53,15 @@ public class ex2 {
// Print the results of prime number checking.
PrimeUtils.printResult(result);
Options.debug(TAG, "subscriber pending items: "
+ pendingItems);
+ pendingItems);
}
},
throwable -> {
Options.print("failure " + throwable);
},
() -> Options.print("completed"),
Integer.MAX_VALUE);
},
throwable -> {
Options.print("failure " + throwable);
},
() -> Options.print("completed"),
// Disable backpressure.
Integer.MAX_VALUE);

/**
* Track all disposables to dispose them all at once.
Expand All @@ -71,7 +74,7 @@ public class ex2 {
static public void main(String[] argv) {
// Create an instance to run the test.
new ex2(argv).run(PrimeUtils::isPrime,
"pub/sub prime checker");
"pub/sub prime checker");
}

/**
Expand All @@ -83,17 +86,18 @@ static public void main(String[] argv) {

// Conditionally run the subscriber in a different thread.
mSubscriberScheduler = Options.instance().parallel()
// Choose a different scheduler if we're running in parallel.
// Choose a different scheduler if we're running in
// parallel.
? Schedulers.newParallel("subscriber",
Options.instance().parallelism())
Options.instance().parallelism())

// Otherwise run everything on the publisher's scheduler.
// Otherwise, run everything on the publisher's scheduler.
: mPublisherScheduler;

mDisposables = Disposables
.composite(mPublisherScheduler,
mSubscriberScheduler,
mSubscriber);
mSubscriberScheduler,
mSubscriber);
}

/**
Expand All @@ -106,19 +110,19 @@ static public void main(String[] argv) {
*/
@SuppressWarnings("SameParameterValue")
private void run
(Function<Integer, Integer> primeChecker,
String testName) {
(Function<Integer, Integer> primeChecker,
String testName) {
Options.print("Starting "
+ testName
+ " with count = "
+ Options.instance().count());
+ testName
+ " with count = "
+ Options.instance().count());

// Generate a list of 'count' odd random Integers whose values
// don't exceed the given maximum.
var randomIntegers = RandomUtils
.generateRandomIntegers(Options.instance().count(),
Options.instance().maxValue(),
true);
Options.instance().maxValue(),
true);

// This Function asynchronously determines if a random # is
// prime or not.
Expand All @@ -129,15 +133,15 @@ static public void main(String[] argv) {
// Create a publisher that runs on its own scheduler and
// returns a Flux that emits random Integer objects.
.publishIntegers(mPublisherScheduler,
randomIntegers)
randomIntegers)

// Conditionally enable logging.
.transform(ReactorUtils
// Conditionally enable logging.
.logIf(Options.instance().loggingEnabled()))
// Conditionally enable logging.
.logIf(Options.instance().loggingEnabled()))

// Concurrently (maybe) check each random # to see if it's
// prime. This operation may run on the subscriber's
// Check each random # (potentially concurrenty) to see if
// it's prime. This operation may run on the subscriber's
// scheduler, depending on the options used to run the
// program.
.flatMap(check4Prime)
Expand All @@ -163,12 +167,12 @@ static public void main(String[] argv) {
*
* @param primeChecker A {@link Function} that checks a number's
* primality
* @return A {@link Function} that asynchronously determines if a
* random # is prime or not
* @return A {@link Function} that asynchronously determines if a
* random # is prime or not
*/
private Function<Integer,
Mono<PrimeUtils.Result>> makePrimeCheckFunction
(Function<Integer, Integer> primeChecker) {
Mono<PrimeUtils.Result>> makePrimeCheckFunction
(Function<Integer, Integer> primeChecker) {
return number -> Mono
.fromSupplier(() -> number)

Expand All @@ -177,8 +181,8 @@ static public void main(String[] argv) {

// Check if the # is prime.
.map(__ -> PrimeUtils
.checkIfPrime(number,
primeChecker));
.checkIfPrime(number,
primeChecker));
}

/**
Expand All @@ -187,8 +191,9 @@ static public void main(String[] argv) {
*
* @return The formatted exit string
*/
private String makeExitString(String testName,
Function<Integer, Integer> primeChecker) {
private String makeExitString
(String testName,
Function<Integer, Integer> primeChecker) {
return "Leaving "
+ testName
+ " with "
Expand Down
2 changes: 1 addition & 1 deletion Reactor/ex2/src/main/java/publisher/Emitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private Emitter() {}
+ pendingItems);

// Only publish an item if the sink hasn't been
// cancelled.
// canceled.
if (!sink.isCancelled())
// Publish the next item.
sink.next(item);
Expand Down
2 changes: 2 additions & 0 deletions Reactor/ex2/src/main/java/publisher/Publisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class Publisher {
*
* @param scheduler {@link Scheduler} to publish the random
* numbers on
* @param randomIntegers The {@link List} of random {@link Integer}
* objects to publish
* @return Return a {@link Flux} that publishes random numbers
*/
public static Flux<Integer> publishIntegers
Expand Down
12 changes: 10 additions & 2 deletions Reactor/ex2/src/main/java/subscriber/BackpressureSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* that handles blocking (which is otherwise not well-supported by
* Project Reactor).
*/
@SuppressWarnings({"UnusedReturnValue", "BlockingMethodInNonBlockingContext", "CallToPrintStackTrace"})
public class BackpressureSubscriber<T>
implements CoreSubscriber<T>,
Disposable {
Expand Down Expand Up @@ -95,7 +96,9 @@ public BackpressureSubscriber(Consumer<? super T> consumer,
public void onSubscribe(Subscription subscription) {
mSubscription = subscription;

// Set the backpressure value.
// Set the backpressure value (mRequestSize could be
// Long.MAX_VALUE, in which case there is no back-
// pressure at all.
mSubscription.request(mRequestSize);
}

Expand All @@ -110,6 +113,9 @@ public void onNext(T element) {
mConsumer.accept(element);

mTotalEvents.incrementAndGet();

// This code is only called for truly backpressure aware
// applications.
if (mEventsProcessedThusFar.incrementAndGet() == mRequestSize) {
mSubscription.request(mRequestSize);
mEventsProcessedThusFar.set(0);
Expand All @@ -135,7 +141,7 @@ public void onError(Throwable t) {
*/
@Override
public void onComplete() {
// Run the completeRunnable's hook method.
// Run the mCompleteRunnable's hook method.
mCompleteRunnable.run();

// Release the latch.
Expand All @@ -157,6 +163,8 @@ public int totalEvents() {
*/
public Mono<Void> await() {
try {
// This await() call will block until all the
// asynchronous processing completes.
mLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
Expand Down
7 changes: 4 additions & 3 deletions Reactor/ex2/src/main/java/utils/PrimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ public Result(int primeCandidate, int smallestFactor) {
* {@code primeCandidate} and either 0 if it's prime or its
* smallest factor if it's not prime.
*/
public static PrimeUtils.Result checkIfPrime(Integer primeCandidate,
Function<Integer, Integer> primeChecker) {
public static PrimeUtils.Result checkIfPrime
(Integer primeCandidate,
Function<Integer, Integer> primeChecker) {
// Return a tuple containing the prime candidate and the
// result of checking if it's prime.
return new Result(primeCandidate,
primeChecker.apply(primeCandidate));
primeChecker.apply(primeCandidate));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import java.util.List;

import static edu.vandy.quoteservices.common.Constants.EndPoint.*;
import static edu.vandy.quoteservices.common.Constants.Params.PARALLEL;
import static edu.vandy.quoteservices.common.Constants.Params.ROUTENAME;
import static edu.vandy.quoteservices.common.Constants.Params.*;

/**
* This interface provides the contract for the RESTful {@code
Expand All @@ -30,15 +29,15 @@ public interface QuoteAPI {
* Get a {@link List} containing the requested {@link Quote}
* objects.
*
* @param routename The microservice that performs the request,
* @param service The microservice that performs the request,
* which is dynamically inserted into the URI via
* the {@code Path} annotation
* @return An {@link Call} object that yields a {@link List}
* containing all the {@link Quote} objects on success and
* an error message on failure
*/
@GET("{routename}" + "/" + GET_ALL_QUOTES)
Call<List<Quote>> getAllQuotes(@Path("routename") String routename);
@GET(SERVICE_PREFIX + "/" + GET_ALL_QUOTES)
Call<List<Quote>> getAllQuotes(@Path(SERVICE) String service);

/**
* Get a {@link Quote} corresponding to the given id.
Expand All @@ -47,15 +46,15 @@ public interface QuoteAPI {
* {@code quoteId}
* @return A {@link Quote} containing the requested {@code quoteId}
*/
@GET("{routename}" + "/" + GET_QUOTE)
Call<Quote> getQuote(@Path("routename") String routename,
@GET(SERVICE_PREFIX + "/" + GET_QUOTE)
Call<Quote> getQuote(@Path(SERVICE) String service,
@Query("quoteId") Integer quoteId);

/**
* Get a {@link List} containing the requested {@link Quote}
* objects.
*
* @param routename The microservice that performs the request,
* @param service The microservice that performs the request,
* which is dynamically inserted into the URI via
* the {@code Path} annotation
* @param quoteIds A {@link List} containing the given random
Expand All @@ -68,16 +67,16 @@ Call<Quote> getQuote(@Path("routename") String routename,
* containing the {@link Quote} objects on success and
* an error message on failure
*/
@POST("{routename}" + "/" + POST_QUOTES)
Call<List<Quote>> postQuotes(@Path(ROUTENAME) String routename,
@POST(SERVICE_PREFIX + "/" + POST_QUOTES)
Call<List<Quote>> postQuotes(@Path(SERVICE) String service,
@Body List<Integer> quoteIds,
@Query(PARALLEL) Boolean parallel);

/**
* Search for quotes containing any of the given {@link List} of
* {@code queries}.
*
* @param routename The microservice that performs the request,
* @param service The microservice that performs the request,
* which is dynamically inserted into the URI via
* the {@code Path} annotation
* @param queries The {@link List} of {@code queries} to search
Expand All @@ -90,16 +89,16 @@ Call<List<Quote>> postQuotes(@Path(ROUTENAME) String routename,
* containing all the {@link Quote} objects on success and
* an error message on failure
*/
@POST("{routename}" + "/" + POST_SEARCHES)
Call<List<Quote>> search(@Path(ROUTENAME) String routename,
@POST(SERVICE_PREFIX + "/" + POST_SEARCHES)
Call<List<Quote>> search(@Path(SERVICE) String service,
@Body List<String> queries,
@Query(PARALLEL) Boolean parallel);

/**
* Search for quotes containing all the given {@link List} of
* {@code queries}.
*
* @param routename The microservice that performs the request,
* @param service The microservice that performs the request,
* which is dynamically inserted into the URI via
* the {@code Path} annotation
* @param queries The {@link List} of {@code queries} to search
Expand All @@ -112,8 +111,8 @@ Call<List<Quote>> search(@Path(ROUTENAME) String routename,
* containing the {@link Quote} objects on success and
* an error message on failure
*/
@POST("{routename}" + "/" + POST_SEARCHES_EX)
Call<List<Quote>> searchEx(@Path(ROUTENAME) String routename,
@POST(SERVICE_PREFIX + "/" + POST_SEARCHES_EX)
Call<List<Quote>> searchEx(@Path(SERVICE) String service,
@Body List<String> queries,
@Query(PARALLEL) Boolean parallel);
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public List<Quote> getAllQuotes(String route) {
*
* @param quoteId An {@link Integer} containing the given
* {@code quoteId}
* @return A {@link Quote} containing the requested {@code quoteId}
* @return A {@link Quote} containing the requested {@code
* quoteId}
*/
public Quote getQuote(String route,
Integer quoteId) {
Expand All @@ -70,8 +71,8 @@ public Quote getQuote(String route,
// Invoke the remote call and return the result.
return WebUtils
.makeGetRequest(mRestTemplate,
uri,
Quote.class);
uri,
Quote.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public static class Service {
* Supported HTTP request parameters identifiers.
*/
public static class Params {
public static final String ROUTENAME = "routename";
public static final String SERVICE = "service";
public static final String SERVICE_PREFIX = "{service}";
public static final String PARALLEL = "parallel";
public static final String QUOTE_ID = "quoteId";
}
Expand Down
Loading

0 comments on commit 78fc8df

Please sign in to comment.