Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
douglascraigschmidt committed Apr 3, 2023
1 parent 3d346ae commit cc679ec
Show file tree
Hide file tree
Showing 65 changed files with 2,459 additions and 796 deletions.
14 changes: 7 additions & 7 deletions Loom/ex1/src/main/java/utils/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class Options {
/**
* The iteration when a diagnostic should be printed.
*/
private int mPrintDiagnostic = 100;
private int mPrintDiagnosticOnIteration = 100;

/**
* @return The one and only singleton uniqueInstance
Expand Down Expand Up @@ -72,7 +72,7 @@ public boolean virtualThreads() {
*/
public boolean printDiagnostic(int i) {
return mDiagnosticsEnabled
&& (i % mPrintDiagnostic) == 0;
&& (i % mPrintDiagnosticOnIteration) == 0;
}

/**
Expand All @@ -84,7 +84,7 @@ public void parseArgs(String[] argv) {
switch (argv[argc]) {
case "-d" -> mDiagnosticsEnabled = argv[argc + 1].equals("true");
case "-n" -> mNumberOfElements = Integer.parseInt(argv[argc + 1]);
case "-p" -> mPrintDiagnostic = Integer.parseInt(argv[argc + 1]);
case "-p" -> mPrintDiagnosticOnIteration = Integer.parseInt(argv[argc + 1]);
case "-t" -> mVirtualThreads = argv[argc + 1].equals("v");
default -> {
printUsage();
Expand All @@ -99,10 +99,10 @@ public void parseArgs(String[] argv) {
*/
private void printUsage() {
System.out.println("Usage: ");
System.out.println("-d [true|false]");
System.out.println("-n [numberOfElements]");
System.out.println("-p [printDiagnostic]");
System.out.println("-t [p|v]");
System.out.println("-d [true|false]\n"
+ "-i [iteration]\n"
+ "-n [numberOfElements]\n"
+ "-t [p|v]";
}

/**
Expand Down
12 changes: 6 additions & 6 deletions Reactor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ Here's an overview of what's included:
various types of backpressure strategies (e.g., ignore,
buffer, error, latest, drop, and push/pull) between a
publisher and a subscriber that (conditionally) run in
different threads/schedulers. This program also measures the
different threads/schedulers.

. ex4 - This program applies Project Reactor features to measure the
performance of checking random numbers for primality with and
without various types of memoizers (e.g., untimed and timed)
based on Java ConcurrentHashMap. In addition, it demonstrates
the use of slicing with the Flux takeWhile() and skipWhile()
operations.
based on Java ConcurrentHashMap.

. ex4 - This example shows how to apply timeouts with the Project
. ex5 - This example shows how to apply timeouts with the Project
Reactor framework.

. ex5 - This program shows performance differences between Project
. ex6 - This program shows performance differences between Project
Reactor's single-threaded Flux, the flatMap() concurrency
idiom, and several variants of ParallelFlux when creating Set
objects containing the unique words appearing in the complete
Expand Down
2 changes: 1 addition & 1 deletion Reactor/ex2/.run/ex2.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="MAIN_CLASS_NAME" value="ex2" />
<module name="ex2.main" />
<option name="PROGRAM_PARAMETERS" value="-c 5000 -d true -P 5 -T ex2,Emitter,BlockingSubscriber" />
<option name="PROGRAM_PARAMETERS" value="-c 100 -d true -P 5 -T ex2,Emitter,BlockingSubscriber,PrimeUtils" />
<method v="2">
<option name="Make" enabled="true" />
</method>
Expand Down
28 changes: 22 additions & 6 deletions Reactor/ex2/src/main/java/common/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public class Options {
*/
private boolean mDiagnosticsEnabled = false;

/**
* The iteration when a diagnostic should be printed.
*/
private int mPrintDiagnosticOnIteration = 10;

/**
* Controls whether backpressure is enabled (defaults to true).
*/
Expand Down Expand Up @@ -83,6 +88,15 @@ public boolean diagnosticsEnabled() {
return mDiagnosticsEnabled;
}

/**
* @return True if {@code i} modulus the print diagnostic == 0,
* else false
*/
public boolean printDiagnostic(int i) {
return mDiagnosticsEnabled
&& (i % mPrintDiagnosticOnIteration) == 0;
}

/**
* @return True the producer and consumer should run in parallel,
* else false.
Expand Down Expand Up @@ -161,6 +175,7 @@ public void parseArgs(String[] argv) {
case "-d" -> mDiagnosticsEnabled = argv[argc + 1].equals("true");
case "-l" -> mLoggingEnabled = argv[argc + 1].equals("true");
case "-c" -> mCount = Integer.parseInt(argv[argc + 1]);
case "-i" -> mPrintDiagnosticOnIteration = Integer.parseInt(argv[argc + 1]);
case "-m" -> mMaxValue = Integer.parseInt(argv[argc + 1]);
case "-p" -> mParallel = argv[argc + 1].equals("true");
case "-P" -> mParallelism = Integer.parseInt(argv[argc + 1]);
Expand All @@ -183,12 +198,13 @@ public void parseArgs(String[] argv) {
*/
private void printUsage() {
System.out.println("Usage: ");
System.out.println("-c [n] "
+ "-d [true|false] "
+ "-l [true|false] "
+ "-m [maxValue] "
+ "-p [true|false]"
+ "-P [parallelism]"
System.out.println("-c [n]\n"
+ "-d [true|false]\n"
+ "-i [iteration]\n"
+ "-l [true|false]\n"
+ "-m [maxValue]\n"
+ "-p [true|false]\n"
+ "-P [parallelism]\n"
+ "-T [tag,...]");
}

Expand Down
73 changes: 46 additions & 27 deletions Reactor/ex2/src/main/java/ex2.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import static publisher.Publisher.sPendingItemCount;

/**
* This program applies Project Reactor features to check the primality
* of randomly generate {@link Integer} objects via a publisher and
* a subscriber that (conditionally) run in different threads/schedulers.
* This program applies Project Reactor features to check the primality
* of randomly generate {@link Integer} objects via a publisher and
* a subscriber that (conditionally) run in different threads/schedulers.
*/
public class ex2 {
/**
Expand All @@ -31,7 +31,7 @@ public class ex2 {
* objects in its own thread.
*/
private final Scheduler mPublisherScheduler = Schedulers
.newParallel("publisher", 1);
.newParallel("publisher", 1);

/**
* The {@link Scheduler} used to consume random integers by checking
Expand All @@ -40,11 +40,30 @@ public class ex2 {
*/
private final Scheduler mSubscriberScheduler;

// Track all disposables to dispose them all at once.
/**
* A {@link Subscriber} that blocks the caller.
*/
private final BlockingSubscriber mSubscriber =
new BlockingSubscriber(sPendingItemCount);
private final BlockingSubscriber<PrimeUtils.Result> mSubscriber =
new BlockingSubscriber<>(result -> {


// Store the current pending item count.
int pendingItems = Publisher
.sPendingItemCount.decrementAndGet();

if (Options.instance().printDiagnostic(pendingItems)) {
// Print the results of prime number checking.
PrimeUtils.printResult(result);
Options.debug(TAG, "subscriber pending items: "
+ pendingItems);
}
},
throwable -> {
Options.print("failure " + throwable);
},
() -> Options.print("completed"),
Integer.MAX_VALUE);

/**
* Track all disposables to dispose them all at once.
Expand All @@ -57,7 +76,7 @@ public class ex2 {
static public void main(String[] argv) {
// Create an instance to run the test.
new ex2(argv).run(PrimeUtils::isPrime,
"pub/sub prime checker");
"pub/sub prime checker");
}

/**
Expand All @@ -71,16 +90,15 @@ static public void main(String[] argv) {
mSubscriberScheduler = Options.instance().parallel()
// Choose a different scheduler if we're running in parallel.
? Schedulers.newParallel("subscriber",
Options.instance().parallelism())
Options.instance().parallelism())

// Otherwise run everything on the publisher's scheduler.
: mPublisherScheduler;

// Track all disposables to dispose them all at once.
mDisposables = Disposables
.composite(mPublisherScheduler,
mSubscriberScheduler,
mSubscriber);
mSubscriberScheduler,
mSubscriber);
}

/**
Expand All @@ -89,22 +107,23 @@ static public void main(String[] argv) {
* @param primeChecker A {@link Function} that maps candidate
* primes to their smallest factor (if they
* aren't prime) or 0 if they are prime
* @param testName Name of the test
* @param testName Name of the test
*/
@SuppressWarnings("SameParameterValue")
private void run
(Function<Integer, Integer> primeChecker,
String testName) {
(Function<Integer, Integer> primeChecker,
String testName) {
Options.print("Starting "
+ testName
+ " with count = "
+ Options.instance().count());
+ testName
+ " with count = "
+ Options.instance().count());

// Generate a list of 'count' random Integers whose values
// Generate a list of 'count' odd random Integers whose values
// don't exceed the given maximum.
var randomIntegers = RandomUtils
.generateRandomIntegers(Options.instance().count(),
Options.instance().maxValue());
Options.instance().maxValue(),
true);

// This Function asynchronously determines if a random # is
// prime or not.
Expand All @@ -115,12 +134,12 @@ static public void main(String[] argv) {
// Create a publisher that runs on its own scheduler and
// returns a Flux that emits random Integer objects.
.publishIntegers(mPublisherScheduler,
randomIntegers)
randomIntegers)

// Conditionally enable logging.
.transform(ReactorUtils
// Conditionally enable logging.
.logIf(Options.instance().loggingEnabled()))
// Conditionally enable logging.
.logIf(Options.instance().loggingEnabled()))

// Concurrently (maybe) check each random # to see if it's
// prime. This operation may run on the subscriber's
Expand Down Expand Up @@ -150,11 +169,11 @@ static public void main(String[] argv) {
* @param primeChecker A {@link Function} that checks a number's
* primality
* @return A {@link Function} that asynchronously determines if a
* random # is prime or not
* random # is prime or not
*/
private Function<Integer,
Mono<PrimeUtils.Result>> makePrimeCheckFunction
(Function<Integer, Integer> primeChecker) {
Mono<PrimeUtils.Result>> makePrimeCheckFunction
(Function<Integer, Integer> primeChecker) {
return number -> Mono
.fromSupplier(() -> number)

Expand All @@ -163,8 +182,8 @@ static public void main(String[] argv) {

// Check if the # is prime.
.map(__ -> PrimeUtils
.checkIfPrime(number,
primeChecker));
.checkIfPrime(number,
primeChecker));
}

/**
Expand Down
11 changes: 6 additions & 5 deletions Reactor/ex2/src/main/java/publisher/Emitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ private Emitter() {}
int pendingItems =
pendingItemCount.incrementAndGet();

Options.debug(TAG,
"published item: "
+ item
+ ", pending items = "
+ pendingItems);
if (Options.instance().printDiagnostic(pendingItems))
Options.debug(TAG,
"published item: "
+ item
+ ", pending items = "
+ pendingItems);

// Only publish an item if the sink hasn't been
// cancelled.
Expand Down
Loading

0 comments on commit cc679ec

Please sign in to comment.