Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
douglascraigschmidt committed Apr 22, 2023
1 parent c2faac5 commit 3b9d32d
Show file tree
Hide file tree
Showing 45 changed files with 1,238 additions and 641 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;
import java.util.UUID;

/**
Expand Down Expand Up @@ -31,9 +32,9 @@ public class Subscription {
private SubscriptionStatus status = SubscriptionStatus.PENDING;

/**
* The type of the subscription, e.g., ZIPPY vs. HANDEY.
* The type of the subscriptions, e.g., ZIPPY, HANDEY, or both.
*/
private SubscriptionType type = SubscriptionType.NONE;
private List<SubscriptionType> type;

/**
* The constructor initializes the field.
Expand All @@ -42,7 +43,7 @@ public class Subscription {
* @param type The type of the subscription
*/
public Subscription(UUID requestId,
SubscriptionType type) {
List<SubscriptionType> type) {
this.requestId = requestId;
this.type = type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import org.springframework.stereotype.Repository;
import quotes.common.model.Quote;

import quotes.common.model.SubscriptionType;
import reactor.core.publisher.Flux;

import java.util.List;

/**
* A persistent repository containing information about
* {@link Quote} objects using the R2DBC reactive database.
Expand All @@ -18,10 +21,14 @@
public interface ReactiveQuoteRepository
extends ReactiveCrudRepository<Quote, Integer> {
/**
* This method finds all quotes of the given type.
* This method finds all {@link Quote} objects of the
* given {@link SubscriptionType}.
*
* @param type The type of quote to find, e.g., Zippy = 1, Handey = 2, etc.
* @return A {@link Flux} that emits all quotes of the given type.
* @param types A {@link List} of {@link SubscriptionType}
* objects to find, e.g., ZIPPY = 1, HANDEY
* = 2, etc.
* @return A {@link Flux} that emits all {@link Quote} objects
* of the given {@link SubscriptionType}
*/
Flux<Quote> findAllByType(int type);
Flux<Quote> findAllByTypeIn(List<Integer> types);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* This controller enables RSocket clients to get random Zippy th'
* Pinhead quotes, subscribe to receive Flux streams of these quotes,
* as well as cancel earlier subscriptions. It demonstrates the
* following RSocket interaction models
* following RSocket interaction models:
*
* <ul>
* <li>Request/Response, where each two-way async request receives a
Expand Down
44 changes: 27 additions & 17 deletions RSocket/ex2/src/main/java/quotes/server/QuotesMessageService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package quotes.server;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import quotes.common.Options;
import quotes.common.model.Quote;
Expand All @@ -11,15 +10,10 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static quotes.common.Constants.HANDEY_QUOTES;
import static quotes.common.Constants.ZIPPY_QUOTES;
import static quotes.common.model.SubscriptionType.ZIPPY;

/**
* This class defines methods that return quotes from Zippy th'
* Pinhead and Jack Handey.
Expand Down Expand Up @@ -185,35 +179,51 @@ public class QuotesMessageService {
* where each request receives a stream of responses from the
* server.
*
* @param subscriptionRequest A {@link Mono} that emits a {@link
* Subscription} request
* @param subscription A {@link Mono} that emits a {@link
* Subscription}
* @return A {@link Flux} that emits a {@link Quote} of the type
* associated with a {@link Subscription} or an empty
* {@link Flux} otherwise.
*/
Flux<Quote> getAllQuotes
(Mono<Subscription> subscriptionRequest) {
return subscriptionRequest
.doOnNext(sr -> Options
(Mono<Subscription> subscription) {
return subscription
.doOnNext(sub -> Options
.debug(TAG,
"getAllQuotes::"
+ sr.getType()
+ sub.getType()
+ ":"
+ sr.getStatus()
+ sub.getStatus()
+ ":"
+ sr.getRequestId()))
+ sub.getRequestId()))

// Check to ensure the subscription request is registered
// and confirmed.
.flatMapMany(sr -> mSubscriptions
.contains(sr)
.flatMapMany(sub -> mSubscriptions
.contains(sub)
// If the request is not confirmed return a
// Flux that emits the list of quotes.
? mQuoteRepository
.findAllByType(sr.getType().ordinal())
.findAllByTypeIn(type2Int(sub))

// If the request is not confirmed return an
// empty Flux.
: Flux.empty());
}

/**
* Convert a {@link Subscription} type into a {@link List} of
* {@link Integer} representing the type.
*
* @param subscription The {@link Subscription}
* @return A {@link List} of {@link Integer} representing the type
*/
private static List<Integer> type2Int
(Subscription subscription) {
return subscription
.getType()
.stream()
.map(Enum::ordinal)
.toList();
}
}
34 changes: 0 additions & 34 deletions RSocket/ex2/src/main/java/quotes/utils/RandomUtils.java

This file was deleted.

Loading

0 comments on commit 3b9d32d

Please sign in to comment.