Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/java_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ jobs:
java-version: "11"
distribution: "adopt"
- name: Generate coverage report
run: mvn test --file ./java/pom.xml
working-directory: ./java
run: mvn test --file ./pom.xml
- name: Test summary
uses: test-summary/action@v1
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,39 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.time.Duration;
import java.util.Optional;

/**
* Session credentials used in service authentications.
*/
public class SessionCredentials {
private static final Duration EXPIRATION_BUFFER_TIME = Duration.ofSeconds(1);
private final String accessKey;
private final String accessSecret;
private final String securityToken;
private final long expiredTimestampMillis;

public SessionCredentials(String accessKey, String accessSecret, String securityToken,
long expiredTimestampMillis) {
this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
this.securityToken = checkNotNull(securityToken, "securityToken should not be null");
this.expiredTimestampMillis = expiredTimestampMillis;
}

public SessionCredentials(String accessKey, String accessSecret, String securityToken) {
this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
this.securityToken = checkNotNull(securityToken, "securityToken should not be null");
this.expiredTimestampMillis = Long.MAX_VALUE;
}

public SessionCredentials(String accessKey, String accessSecret) {
this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
this.securityToken = null;
this.expiredTimestampMillis = Long.MAX_VALUE;
}

public String getAccessKey() {
Expand All @@ -52,4 +65,8 @@ public String getAccessSecret() {
public Optional<String> tryGetSecurityToken() {
return null == securityToken ? Optional.empty() : Optional.of(securityToken);
}

public boolean expiredSoon() {
return System.currentTimeMillis() + EXPIRATION_BUFFER_TIME.toMillis() > expiredTimestampMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.consumer;

import java.util.Set;
import org.apache.rocketmq.client.apis.message.MessageQueue;

public interface TopicMessageQueueChangeListener {
/**
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
* expanded or shrunk.
*
* @param topic the topic to listen.
* @param messageQueues latest message queues of the topic.
*/
void onChanged(String topic, Set<MessageQueue> messageQueues);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.message;

public interface MessageQueue {
/**
* Topic of the current message queue.
*/
String getTopic();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface ProducerBuilder {
* ArrayList<String> topicList = new ArrayList<>();
* topicList.add("topicA");
* topicList.add("topicB");
* producerBuilder.setTopics(topicList);
* producerBuilder.setTopics(topicList.toArray(new String[0]));
* }</pre>
*
* @param topics topics to send/prepare.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.rocketmq.client.java.exception;

import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Status;
import org.apache.rocketmq.client.apis.ClientException;
Expand Down Expand Up @@ -61,6 +63,11 @@ public static void check(Status status, RpcFuture<?, ?> future) throws ClientExc
case CLIENT_ID_REQUIRED:
case ILLEGAL_POLLING_TIME:
throw new BadRequestException(codeNumber, requestId, statusMessage);
case ILLEGAL_OFFSET:
if (future.getRequest() instanceof PullMessageRequest) {
return;
}
// fall through on purpose.
case UNAUTHORIZED:
throw new UnauthorizedException(codeNumber, requestId, statusMessage);
case PAYMENT_REQUIRED:
Expand All @@ -71,11 +78,19 @@ public static void check(Status status, RpcFuture<?, ?> future) throws ClientExc
if (future.getRequest() instanceof ReceiveMessageRequest) {
return;
}
if (future.getRequest() instanceof PullMessageRequest) {
return;
}
// fall through on purpose.
case NOT_FOUND:
case TOPIC_NOT_FOUND:
case CONSUMER_GROUP_NOT_FOUND:
throw new NotFoundException(codeNumber, requestId, statusMessage);
case OFFSET_NOT_FOUND:
if (future.getRequest() instanceof GetOffsetRequest) {
return;
}
// fall through on purpose.
case PAYLOAD_TOO_LARGE:
case MESSAGE_BODY_TOO_LARGE:
throw new PayloadTooLargeException(codeNumber, requestId, statusMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public enum MessageHookPoints {
* The hook point of message reception.
*/
RECEIVE,
/**
* The hook point of message pulling.
*/
PULL,
/**
* The hook point of message consumption.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public void onFailure(Throwable t) {
public void doStats() {
}

private ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
protected ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
final ListenableFuture<TopicRouteData> future0 = fetchTopicRoute0(topic);
final ListenableFuture<TopicRouteData> future = Futures.transformAsync(future0,
topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import com.google.common.util.concurrent.AbstractIdleService;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
Expand Down Expand Up @@ -147,6 +155,18 @@ public abstract RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(Endp
ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(Endpoints endpoints,
ForwardMessageToDeadLetterQueueRequest request, Duration duration);

public abstract RpcFuture<PullMessageRequest, List<PullMessageResponse>> pullMessage(Endpoints endpoints,
PullMessageRequest request, Duration duration);

public abstract RpcFuture<UpdateOffsetRequest, UpdateOffsetResponse> updateOffset(Endpoints endpoints,
UpdateOffsetRequest request, Duration duration);

public abstract RpcFuture<GetOffsetRequest, GetOffsetResponse> getOffset(Endpoints endpoints,
GetOffsetRequest request, Duration duration);

public abstract RpcFuture<QueryOffsetRequest, QueryOffsetResponse> queryOffset(Endpoints endpoints,
QueryOffsetRequest request, Duration duration);

/**
* Submit transaction resolution asynchronously, the method ensures no throwable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
Expand Down Expand Up @@ -283,8 +291,7 @@ public RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(Endpoints end

@Override
public RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse>
changeInvisibleDuration(Endpoints endpoints, ChangeInvisibleDurationRequest request,
Duration duration) {
changeInvisibleDuration(Endpoints endpoints, ChangeInvisibleDurationRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
Expand Down Expand Up @@ -313,6 +320,66 @@ public RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(Endpoints end
}
}

@Override
public RpcFuture<PullMessageRequest, List<PullMessageResponse>> pullMessage(Endpoints endpoints,
PullMessageRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<List<PullMessageResponse>> future =
rpcClient.pullMessage(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<UpdateOffsetRequest, UpdateOffsetResponse> updateOffset(Endpoints endpoints,
UpdateOffsetRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<UpdateOffsetResponse> future =
rpcClient.updateOffset(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<GetOffsetRequest, GetOffsetResponse> getOffset(Endpoints endpoints, GetOffsetRequest request,
Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<GetOffsetResponse> future =
rpcClient.getOffset(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<QueryOffsetRequest, QueryOffsetResponse> queryOffset(Endpoints endpoints,
QueryOffsetRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<QueryOffsetResponse> future =
rpcClient.queryOffset(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<EndTransactionRequest, EndTransactionResponse> endTransaction(Endpoints endpoints,
EndTransactionRequest request, Duration duration) {
Expand Down Expand Up @@ -395,9 +462,9 @@ protected void startUp() {
() -> {
try {
log.info("Start to log statistics, clientVersion={}, clientWrapperVersion={}, "
+ "clientEndpoints={}, os description=[{}], java description=[{}], clientId={}",
+ "clientEndpoints={}, os description=[{}], java environment=[{}], clientId={}",
MetadataUtils.getVersion(), MetadataUtils.getWrapperVersion(), client.getEndpoints(),
Utilities.getOsDescription(), Utilities.getJavaDescription(), clientId);
Utilities.getOsDescription(), Utilities.getJavaEnvironmentSummary(), clientId);
client.doStats();
} catch (Throwable t) {
log.error("Exception raised during statistics logging, clientId={}", clientId, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
public enum ClientType {
PRODUCER,
PUSH_CONSUMER,
SIMPLE_CONSUMER;
SIMPLE_CONSUMER,
PULL_CONSUMER;

public apache.rocketmq.v2.ClientType toProtobuf() {
if (PRODUCER.equals(this)) {
Expand All @@ -32,6 +33,9 @@ public apache.rocketmq.v2.ClientType toProtobuf() {
if (SIMPLE_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
}
if (PULL_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.PULL_CONSUMER;
}
return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
}
}
Loading