Skip to content

Commit fddb351

Browse files
committed
more apis to enqueue messages.
1 parent 66e108d commit fddb351

File tree

5 files changed

+100
-42
lines changed

5 files changed

+100
-42
lines changed

CHANGELOG.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
**NOTE**: The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
44

5+
## [2.1.1] - 24-Sep-2020
6+
### Added
7+
* More apis to enqueue unique message
8+
59
## [2.1.0] - 16-Sep-2020
610
### Added
711
* Allow application to provide message id while enqueuing messages
@@ -102,5 +106,6 @@ Fixes:
102106
[2.0.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.0-RELEASE
103107
[2.0.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.1-RELEASE
104108
[2.0.2]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.2-RELEASE
105-
[2.0.4]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.4-RELEASE
106-
[2.1.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.1.0-RELEASE
109+
[2.0.4]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.0.4-RELEASE
110+
[2.1.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.1.0-RELEASE
111+
[2.1.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.1.1-RELEASE

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ ext {
6767

6868
subprojects {
6969
group = 'com.github.sonus21'
70-
version = '2.1.0-RELEASE'
70+
version = '2.1.1-RELEASE'
7171

7272
dependencies {
7373
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/application/ApplicationBasicConfiguration.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,20 @@ public abstract class ApplicationBasicConfiguration {
4141
private static final Logger monitorLogger = LoggerFactory.getLogger("monitor");
4242
protected RedisServer redisServer;
4343
protected ExecutorService executorService;
44-
protected List<RProcess> processes;
44+
protected List<MonitorProcess> processes;
45+
4546
@Value("${mysql.db.name}")
4647
protected String dbName;
48+
4749
@Value("${spring.redis.port}")
4850
protected int redisPort;
51+
4952
@Value("${spring.redis.host}")
5053
protected String redisHost;
54+
5155
@Value("${use.system.redis:false}")
5256
protected boolean useSystemRedis;
57+
5358
@Value("${monitor.thread.count:0}")
5459
protected int monitorThreads;
5560

@@ -73,10 +78,10 @@ protected void destroy() {
7378
}
7479

7580
if (processes != null) {
76-
for (RProcess rProcess : processes) {
77-
rProcess.process.destroy();
78-
monitorLogger.info("RedisNode {} ", rProcess.redisNode);
79-
for (String line : rProcess.out) {
81+
for (MonitorProcess monitorProcess : processes) {
82+
monitorProcess.process.destroy();
83+
monitorLogger.info("RedisNode {} ", monitorProcess.redisNode);
84+
for (String line : monitorProcess.out) {
8085
monitorLogger.info("{}", line);
8186
}
8287
}
@@ -94,8 +99,9 @@ protected void monitor(String host, int port) {
9499
Runtime.getRuntime()
95100
.exec("redis-cli " + " -h " + host + " -p " + port + " monitor");
96101
List<String> lines = new LinkedList<>();
97-
RProcess rProcess = new RProcess(process, new RedisNode(host, port), lines);
98-
processes.add(rProcess);
102+
MonitorProcess monitorProcess =
103+
new MonitorProcess(process, new RedisNode(host, port), lines);
104+
processes.add(monitorProcess);
99105
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
100106
String s;
101107
while ((s = br.readLine()) != null) {
@@ -131,7 +137,7 @@ public ObjectMapper objectMapper() {
131137
}
132138

133139
@AllArgsConstructor
134-
public static class RProcess {
140+
public static class MonitorProcess {
135141
Process process;
136142
RedisNode redisNode;
137143
List<String> out;

rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/application/MultiRedisSprigBaseApplication.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public void preDestroy() {
5454
if (redisServer2 != null) {
5555
redisServer2.stop();
5656
}
57-
for (RProcess rProcess : processes) {
58-
for (String line : rProcess.out) {
57+
for (MonitorProcess monitorProcess : processes) {
58+
for (String line : monitorProcess.out) {
5959
assert line.equals("OK");
6060
}
6161
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageEnqueuer.java

+76-29
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,19 @@
2929
* com.github.sonus21.rqueue.exception.QueueDoesNotExist}. In such case register your queue using
3030
* {@link RqueueEndpointManager#registerQueue(String, String...)} method.
3131
*
32-
* <p>There are three types of interfaces in this 1. enqueueXYZ 2. enqueueInXYZ 3. enqueueAtXYZ
32+
* <p>There are four types of interfaces in this
3333
*
34-
* <p>Messages enqueue using enqueueXYZ shall be consumed as soon as possible
34+
* <p>enqueueXYZ : Messages enqueue using this method shall be consumed as soon as possible
3535
*
36-
* <p>Messages enqueue using enqueueInXYZ shall be consumed once the given time is elapsed, like in
37-
* 30 seconds.
36+
* <p>enqueueInXYZ: Messages enqueue using enqueueInXYZ shall be consumed once the given time is
37+
* elapsed, like in 30 seconds.
3838
*
39-
* <p>Messages send using enqueueAtXYZ shall be consumed as soon as the given time is reached for
40-
* example 3PM tomorrow.
39+
* <p>enqueueAtXYZ: Messages send using enqueueAtXYZ shall be consumed as soon as the given time is
40+
* reached for example 3PM tomorrow.
41+
*
42+
* <p>enqueueUniqueXYZ: This method enqueue unique messages on a queue. New messages overwrite the
43+
* existing message, the overwriting only works till the point message is not consumed, once message
44+
* is consumed it's of no use.
4145
*
4246
* @author Sonu Kumar
4347
*/
@@ -61,6 +65,18 @@ public interface RqueueMessageEnqueuer {
6165
*/
6266
boolean enqueue(String queueName, String messageId, Object message);
6367

68+
/**
69+
* Enqueue unique message on a given queue without any delay, consume as soon as possible.
70+
*
71+
* @param queueName on which queue message has to be send
72+
* @param messageId the message id for uniqueness
73+
* @param message message object it could be any arbitrary object.
74+
* @return message id on successful enqueue otherwise null.
75+
*/
76+
default boolean enqueueUnique(String queueName, String messageId, Object message) {
77+
return enqueue(queueName, messageId, message);
78+
}
79+
6480
/**
6581
* Enqueue a message on the given queue with the given retry count. This message would not be
6682
* consumed more than the specified time due to failure in underlying systems.
@@ -107,18 +123,6 @@ public interface RqueueMessageEnqueuer {
107123
*/
108124
boolean enqueueWithPriority(String queueName, String priority, String messageId, Object message);
109125

110-
/**
111-
* Enqueue unique message on a given queue without any delay, consume as soon as possible.
112-
*
113-
* @param queueName on which queue message has to be send
114-
* @param messageId the message id for uniqueness
115-
* @param message message object it could be any arbitrary object.
116-
* @return message id on successful enqueue otherwise null.
117-
*/
118-
default boolean enqueueUnique(String queueName, String messageId, Object message) {
119-
return enqueue(queueName, messageId, message);
120-
}
121-
122126
/**
123127
* Enqueue unique message on given queue, that will be consumed as soon as possible.
124128
*
@@ -213,6 +217,20 @@ default boolean enqueueIn(
213217
return enqueueIn(queueName, messageId, message, unit.toMillis(delay));
214218
}
215219

220+
/**
221+
* Enqueue a message on given queue with delay, consume as soon as the delayed is expired.
222+
*
223+
* @param queueName on which queue message has to be send
224+
* @param messageId the message id for uniqueness
225+
* @param message message object it could be any arbitrary object.
226+
* @param delayInMillisecond total execution delay
227+
* @return message id on successful enqueue otherwise {@literal null}.
228+
*/
229+
default boolean enqueueUniqueIn(
230+
String queueName, String messageId, Object message, long delayInMillisecond) {
231+
return enqueueIn(queueName, messageId, message, delayInMillisecond);
232+
}
233+
216234
/**
217235
* Enqueue a task that would be scheduled to run in the specified milli seconds.
218236
*
@@ -339,17 +357,25 @@ default boolean enqueueInWithPriority(
339357
}
340358

341359
/**
342-
* Enqueue a message on given queue with delay, consume as soon as the delayed is expired.
360+
* Schedule unique message on the given queue at the provided time. It will be executed as soon as
361+
* the given delay is elapse.
343362
*
344363
* @param queueName on which queue message has to be send
345-
* @param messageId the message id for uniqueness
364+
* @param priority the name of the priority level
365+
* @param messageId the message id
346366
* @param message message object it could be any arbitrary object.
347-
* @param delayInMillisecond total execution delay
348-
* @return message id on successful enqueue otherwise {@literal null}.
367+
* @param delay time to wait before it can be consumed.
368+
* @param unit unit of the delay
369+
* @return message was enqueue successfully or failed.
349370
*/
350-
default boolean enqueueUniqueIn(
351-
String queueName, String messageId, Object message, long delayInMillisecond) {
352-
return enqueueIn(queueName, messageId, message, delayInMillisecond);
371+
default boolean enqueueUniqueInWithPriority(
372+
String queueName,
373+
String priority,
374+
String messageId,
375+
Object message,
376+
long delay,
377+
TimeUnit unit) {
378+
return enqueueInWithPriority(queueName, priority, messageId, message, unit.toMillis(delay));
353379
}
354380

355381
/**
@@ -435,6 +461,22 @@ default boolean enqueueAt(String queueName, String messageId, Object message, Da
435461
return enqueueAt(queueName, messageId, message, starTime.toInstant());
436462
}
437463

464+
/**
465+
* Schedule unique messages on the given queue at the provided time. It will be available to
466+
* consume as soon as the given time is reached.
467+
*
468+
* @param queueName on which queue message has to be send
469+
* @param message message object it could be any arbitrary object.
470+
* @param messageId a unique identifier message id for this message
471+
* @param timeInMilliSeconds time at which this message has to be consumed.
472+
* @return message was enqueue successfully or failed.
473+
*/
474+
default boolean enqueueUniqueAt(
475+
String queueName, String messageId, Object message, long timeInMilliSeconds) {
476+
return enqueueUniqueIn(
477+
queueName, messageId, message, timeInMilliSeconds - System.currentTimeMillis());
478+
}
479+
438480
/**
439481
* Schedule a message on the given queue at the provided time. It will be executed as soon as the
440482
* given time is reached, time must be in the future.
@@ -543,14 +585,19 @@ default boolean enqueueAtWithPriority(
543585
* consume as soon as the given time is reached.
544586
*
545587
* @param queueName on which queue message has to be send
588+
* @param priority priority of the given message
546589
* @param message message object it could be any arbitrary object.
547590
* @param messageId a unique identifier message id for this message
548591
* @param timeInMilliSeconds time at which this message has to be consumed.
549592
* @return message was enqueue successfully or failed.
550593
*/
551-
default boolean enqueueUniqueAt(
552-
String queueName, String messageId, Object message, long timeInMilliSeconds) {
553-
return enqueueUniqueIn(
554-
queueName, messageId, message, timeInMilliSeconds - System.currentTimeMillis());
594+
default boolean enqueueUniqueAtWithPriority(
595+
String queueName,
596+
String priority,
597+
String messageId,
598+
Object message,
599+
long timeInMilliSeconds) {
600+
return enqueueAtWithPriority(
601+
queueName, priority, messageId, message, timeInMilliSeconds - System.currentTimeMillis());
555602
}
556603
}

0 commit comments

Comments
 (0)