Skip to content

Commit 3da0034

Browse files
committed
Initial implementation
1 parent 33c069f commit 3da0034

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+3457
-145
lines changed

.github/workflows/java_coverage.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ jobs:
1919
java-version: "11"
2020
distribution: "adopt"
2121
- name: Generate coverage report
22-
run: mvn test --file ./java/pom.xml
22+
working-directory: ./java
23+
run: mvn test --file ./pom.xml
2324
- name: Test summary
2425
uses: test-summary/action@v1
2526
with:

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientServiceProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.Iterator;
2121
import java.util.ServiceLoader;
22+
import org.apache.rocketmq.client.apis.consumer.PullConsumerBuilder;
2223
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
2324
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
2425
import org.apache.rocketmq.client.apis.message.MessageBuilder;
@@ -65,4 +66,11 @@ static ClientServiceProvider loadService() {
6566
* @return the simple consumer builder instance.
6667
*/
6768
SimpleConsumerBuilder newSimpleConsumerBuilder();
69+
70+
/**
71+
* Get the pull consumer builder by the current provider.
72+
*
73+
* @return the pull consumer builder instance.
74+
*/
75+
PullConsumerBuilder newPullConsumerBuilder();
6876
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.time.Duration;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.Optional;
26+
import org.apache.rocketmq.client.apis.ClientException;
27+
import org.apache.rocketmq.client.apis.message.MessageQueue;
28+
import org.apache.rocketmq.client.apis.message.MessageView;
29+
30+
public interface PullConsumer extends Closeable {
31+
/**
32+
* Get the consumer group of the consumer.
33+
*/
34+
String getConsumerGroup();
35+
36+
/**
37+
* @param topic the topic that needs to be monitored.
38+
* @param listener the callback to detect the message queue changes.
39+
*/
40+
void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener);
41+
42+
/**
43+
* Fetch message queues of the topic.
44+
*/
45+
Collection<MessageQueue> fetchMessageQueues(String topic) throws ClientException;
46+
47+
/**
48+
* Manually assign a list of message queues to this consumer.
49+
*
50+
* <p>This interface does not allow for incremental assignment and will replace the previous assignment (if
51+
* previous assignment existed).
52+
*
53+
* @param messageQueues the list of message queues that are to be assigned to this consumer.
54+
*/
55+
void assign(Collection<MessageQueue> messageQueues);
56+
57+
/**
58+
* Fetch messages from assigned message queues specified by {@link #assign(Collection)}.
59+
*
60+
* @param timeout the maximum time to block.
61+
* @return list of fetched messages.
62+
*/
63+
List<MessageView> poll(Duration timeout) throws InterruptedException;
64+
65+
/**
66+
* Overrides the fetch offsets that the consumer will use on the next poll. If this method is invoked for the same
67+
* message queue more than once, the latest offset will be used on the next {@link #poll(Duration)}.
68+
*
69+
* @param messageQueue the message queue to override the fetch offset.
70+
* @param offset message offset.
71+
*/
72+
void seek(MessageQueue messageQueue, long offset);
73+
74+
/**
75+
* Suspending message pulling from the message queues.
76+
*
77+
* @param messageQueues message queues that need to be suspended.
78+
*/
79+
void pause(Collection<MessageQueue> messageQueues);
80+
81+
/**
82+
* Resuming message pulling from the message queues.
83+
*
84+
* @param messageQueues message queues that need to be resumed.
85+
*/
86+
void resume(Collection<MessageQueue> messageQueues);
87+
88+
/**
89+
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
90+
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
91+
* queue.
92+
*
93+
* @param messageQueue message queue that needs to be looked up.
94+
* @param timestamp the timestamp for which to search.
95+
* @return the offset of the message queue, or {@link Optional#empty()} if there is no message.
96+
*/
97+
Optional<Long> offsetForTimestamp(MessageQueue messageQueue, long timestamp) throws ClientException;
98+
99+
/**
100+
* Get the latest committed offset for the given message queue.
101+
*
102+
* @return the latest committed offset, or {@link Optional#empty()} if there was no prior commit.
103+
*/
104+
Optional<Long> committed(MessageQueue messageQueue) throws ClientException;
105+
106+
/**
107+
* Commit offset manually.
108+
*/
109+
void commit() throws ClientException;
110+
111+
/**
112+
* Overrides the fetch offsets with the beginning offset that the consumer will use on the next poll. If this
113+
* method is invoked for the same message queue more than once, the latest offset will be used on the next
114+
* {@link #poll(Duration)}.
115+
*
116+
* @param messageQueue the message queue to seek.
117+
*/
118+
void seekToBegin(MessageQueue messageQueue) throws ClientException;
119+
120+
/**
121+
* Overrides the fetch offsets with the end offset that the consumer will use on the next poll. If this method is
122+
* invoked for the same message queue more than once, the latest offset will be used on the next
123+
* {@link #poll(Duration)}.
124+
*
125+
* @param messageQueue the message queue to seek.
126+
*/
127+
void seekToEnd(MessageQueue messageQueue) throws ClientException;
128+
129+
/**
130+
* Close the pull consumer and release all related resources.
131+
*/
132+
@Override
133+
void close() throws IOException;
134+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.time.Duration;
21+
import org.apache.rocketmq.client.apis.ClientConfiguration;
22+
import org.apache.rocketmq.client.apis.ClientException;
23+
24+
public interface PullConsumerBuilder {
25+
/**
26+
* Set the client configuration for the consumer.
27+
*
28+
* @param clientConfiguration client's configuration.
29+
* @return the consumer builder instance.
30+
*/
31+
PullConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
32+
33+
/**
34+
* Set the load balancing group for the consumer.
35+
*
36+
* @param consumerGroup consumer load balancing group.
37+
* @return the consumer builder instance.
38+
*/
39+
PullConsumerBuilder setConsumerGroup(String consumerGroup);
40+
41+
/**
42+
* Set the consumer's offset commit interval if auto commit is enabled.
43+
*
44+
* @param duration offset commit interval
45+
* @return the consumer builder instance.
46+
*/
47+
PullConsumerBuilder setAutoCommitInterval(Duration duration);
48+
49+
/**
50+
* Set the maximum number of messages cached locally.
51+
*
52+
* @param count message count.
53+
* @return the consumer builder instance.
54+
*/
55+
PullConsumerBuilder setMaxCacheMessageCountEachQueue(int count);
56+
57+
/**
58+
* Set the maximum bytes of messages cached locally.
59+
*
60+
* @param bytes message size.
61+
* @return the consumer builder instance.
62+
*/
63+
PullConsumerBuilder setMaxCacheMessageSizeInBytesEachQueue(int bytes);
64+
65+
/**
66+
* Finalize the build of {@link PullConsumer} and start.
67+
*
68+
* <p>This method will block until the pull consumer starts successfully.
69+
*
70+
* <p>Especially, if this method is invoked more than once, different pull consumer will be created and started.
71+
*
72+
* @return the pull consumer instance.
73+
*/
74+
PullConsumer build() throws ClientException;
75+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.util.Set;
21+
import org.apache.rocketmq.client.apis.message.MessageQueue;
22+
23+
public interface TopicMessageQueueChangeListener {
24+
/**
25+
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
26+
* expanded or shrunk.
27+
*
28+
* @param topic the topic to listen.
29+
* @param messageQueues latest message queues of the topic.
30+
*/
31+
void onChanged(String topic, Set<MessageQueue> messageQueues);
32+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.message;
19+
20+
public interface MessageQueue {
21+
/**
22+
* Topic of the current message queue.
23+
*/
24+
String getTopic();
25+
26+
/**
27+
* Get the identifier of the current message queue.
28+
*/
29+
String getId();
30+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.java.example;
19+
20+
import java.time.Duration;
21+
import java.util.Collection;
22+
import java.util.List;
23+
import org.apache.rocketmq.client.apis.ClientConfiguration;
24+
import org.apache.rocketmq.client.apis.ClientException;
25+
import org.apache.rocketmq.client.apis.ClientServiceProvider;
26+
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
27+
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
28+
import org.apache.rocketmq.client.apis.consumer.PullConsumer;
29+
import org.apache.rocketmq.client.apis.message.MessageQueue;
30+
import org.apache.rocketmq.client.apis.message.MessageView;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
public class PullConsumerExample {
35+
private static final Logger log = LoggerFactory.getLogger(PullConsumerExample.class);
36+
37+
private PullConsumerExample() {
38+
}
39+
40+
@SuppressWarnings({"resource", "InfiniteLoopStatement"})
41+
public static void main(String[] args) throws ClientException, InterruptedException {
42+
final ClientServiceProvider provider = ClientServiceProvider.loadService();
43+
44+
// Credential provider is optional for client configuration.
45+
String accessKey = "yourAccessKey";
46+
String secretKey = "yourSecretKey";
47+
SessionCredentialsProvider sessionCredentialsProvider =
48+
new StaticSessionCredentialsProvider(accessKey, secretKey);
49+
50+
String endpoints = "foobar.com:8080";
51+
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
52+
.setEndpoints(endpoints)
53+
.setCredentialProvider(sessionCredentialsProvider)
54+
.build();
55+
String topic = "yourTopic";
56+
final PullConsumer consumer = provider.newPullConsumerBuilder()
57+
.setClientConfiguration(clientConfiguration)
58+
.setAutoCommitInterval(Duration.ofSeconds(3))
59+
.build();
60+
61+
final Collection<MessageQueue> mqs = consumer.fetchMessageQueues(topic);
62+
consumer.assign(mqs);
63+
64+
do {
65+
final List<MessageView> messages = consumer.poll(Duration.ofSeconds(5));
66+
log.info("Pulled {} message(s)", messages.size());
67+
for (MessageView message : messages) {
68+
log.info("message is pulled, messageId={}", message.getMessageId());
69+
}
70+
} while (true);
71+
// Close the pull consumer when you don't need it anymore.
72+
// consumer.close();
73+
}
74+
}

0 commit comments

Comments
 (0)