Skip to content

Commit

Permalink
refactor(java-client): optimize code for meta session to eliminate po…
Browse files Browse the repository at this point in the history
…tential problems (#2186)
  • Loading branch information
empiredan authored Feb 5, 2025
1 parent 8f75b52 commit 89f43c3
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@

import com.google.common.net.InetAddresses;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pegasus.base.error_code;
import org.apache.pegasus.base.rpc_address;
import org.apache.pegasus.operator.client_operator;
Expand All @@ -48,7 +43,7 @@ public MetaSession(
EventLoopGroup g)
throws IllegalArgumentException {
clusterManager = manager;
metaList = new ArrayList<ReplicaSession>();
metaList = new ArrayList<>();

if (addrList.length == 1 && !InetAddresses.isInetAddress(addrList[0])) {
// if the given string is not a valid ip address,
Expand All @@ -59,8 +54,8 @@ public MetaSession(
}
} else {
for (String addr : addrList) {
rpc_address rpcAddr = new rpc_address();
if (rpcAddr.fromString(addr)) {
rpc_address rpcAddr = rpc_address.fromIpPort(addr);
if (rpcAddr != null) {
logger.info("add {} as meta server", addr);
metaList.add(clusterManager.getReplicaSession(rpcAddr));
} else {
Expand All @@ -79,8 +74,9 @@ public MetaSession(
}

public static error_code.error_types getMetaServiceError(client_operator metaQueryOp) {
if (metaQueryOp.rpc_error.errno != error_code.error_types.ERR_OK)
if (metaQueryOp.rpc_error.errno != error_code.error_types.ERR_OK) {
return metaQueryOp.rpc_error.errno;
}

if (metaQueryOp instanceof query_cfg_operator) {
return ((query_cfg_operator) (metaQueryOp)).get_response().getErr().errno;
Expand All @@ -97,17 +93,26 @@ public static error_code.error_types getMetaServiceError(client_operator metaQue
}

public static rpc_address getMetaServiceForwardAddress(client_operator metaQueryOp) {
if (metaQueryOp.rpc_error.errno != error_code.error_types.ERR_OK) return null;
if (metaQueryOp.rpc_error.errno != error_code.error_types.ERR_OK) {
return null;
}

rpc_address addr = null;
if (metaQueryOp instanceof query_cfg_operator) {
query_cfg_operator op = (query_cfg_operator) metaQueryOp;
if (op.get_response().getErr().errno != error_code.error_types.ERR_FORWARD_TO_OTHERS)
if (op.get_response().getErr().errno != error_code.error_types.ERR_FORWARD_TO_OTHERS) {
return null;
}

java.util.List<partition_configuration> partitions = op.get_response().getPartitions();
if (partitions == null || partitions.isEmpty()) return null;
if (partitions == null || partitions.isEmpty()) {
return null;
}

addr = partitions.get(0).getPrimary();
if (addr == null || addr.isInvalid()) return null;
if (addr == null || addr.isInvalid()) {
return null;
}
}

return addr;
Expand All @@ -125,14 +130,7 @@ public final void asyncExecute(client_operator op, Runnable callbackFunc, int ma
}

public final void execute(client_operator op, int maxExecuteCount) {
FutureTask<Void> v =
new FutureTask<Void>(
new Callable<Void>() {
@Override
public Void call() throws Exception {
return null;
}
});
FutureTask<Void> v = new FutureTask<>(() -> null);
asyncExecute(op, v, maxExecuteCount);
while (true) {
try {
Expand All @@ -156,15 +154,7 @@ public final void closeSession() {

private void asyncCall(final MetaRequestRound round) {
round.lastSession.asyncSend(
round.op,
new Runnable() {
@Override
public void run() {
onFinishQueryMeta(round);
}
},
eachQueryTimeoutInMills,
false);
round.op, () -> onFinishQueryMeta(round), eachQueryTimeoutInMills, false);
}

void onFinishQueryMeta(final MetaRequestRound round) {
Expand All @@ -181,9 +171,7 @@ void onFinishQueryMeta(final MetaRequestRound round) {
metaError = getMetaServiceError(op);
if (metaError == error_code.error_types.ERR_SERVICE_NOT_ACTIVE) {
needDelay = true;
needSwitchLeader = false;
} else if (metaError == error_code.error_types.ERR_FORWARD_TO_OTHERS) {
needDelay = false;
needSwitchLeader = true;
forwardAddress = getMetaServiceForwardAddress(op);
} else {
Expand All @@ -192,7 +180,6 @@ void onFinishQueryMeta(final MetaRequestRound round) {
}
} else if (op.rpc_error.errno == error_code.error_types.ERR_SESSION_RESET
|| op.rpc_error.errno == error_code.error_types.ERR_TIMEOUT) {
needDelay = false;
needSwitchLeader = true;
} else {
logger.error("unknown error: {}", op.rpc_error.errno.toString());
Expand All @@ -204,7 +191,7 @@ void onFinishQueryMeta(final MetaRequestRound round) {
"query meta got error, rpc error({}), meta error({}), forward address({}), current leader({}), "
+ "remain retry count({}), need switch leader({}), need delay({})",
op.rpc_error.errno.toString(),
metaError.toString(),
metaError,
forwardAddress,
round.lastSession.name(),
round.maxExecuteCount,
Expand Down Expand Up @@ -253,15 +240,7 @@ void onFinishQueryMeta(final MetaRequestRound round) {
}

void retryQueryMeta(final MetaRequestRound round, boolean needDelay) {
group.schedule(
new Runnable() {
@Override
public void run() {
asyncCall(round);
}
},
needDelay ? 1 : 0,
TimeUnit.SECONDS);
group.schedule(() -> asyncCall(round), needDelay ? 1 : 0, TimeUnit.SECONDS);
}

static final class MetaRequestRound {
Expand Down Expand Up @@ -290,36 +269,46 @@ void resolveHost(String hostPort) throws IllegalArgumentException {
return;
}

Set<rpc_address> newSet = new TreeSet<rpc_address>(Arrays.asList(addrs));
Set<rpc_address> oldSet = new TreeSet<rpc_address>();
for (ReplicaSession meta : metaList) {
oldSet.add(meta.getAddress());
}
Set<rpc_address> oldSet =
metaList.stream()
.map(ReplicaSession::getAddress)
.collect(Collectors.toCollection(TreeSet::new));
Set<rpc_address> newSet = new TreeSet<>(Arrays.asList(addrs));

// fast path: do nothing if meta list is unchanged.
// Do nothing if meta list is unchanged.
if (newSet.equals(oldSet)) {
return;
}

// removed metas
Set<rpc_address> removed = new HashSet<rpc_address>(oldSet);
removed.removeAll(newSet);
for (rpc_address addr : removed) {
logger.info("meta server {} was removed", addr);
for (int i = 0; i < metaList.size(); i++) {
if (metaList.get(i).getAddress().equals(addr)) {
ReplicaSession session = metaList.remove(i);
session.closeSession();
}
// Find the meta servers that should be removed.
Set<rpc_address> removedSet = new HashSet<>(oldSet);
removedSet.removeAll(newSet);

// Iterate over the current meta list: once a meta server is found in the removed set,
// it would be removed from the meta list after its session is closed.
Iterator<ReplicaSession> iterator = metaList.iterator();
while (iterator.hasNext()) {
ReplicaSession session = iterator.next();
rpc_address addr = session.getAddress();
if (!removedSet.contains(addr)) {
// This meta server is not found in the removed set, which means it should just be
// retained.
continue;
}

session.closeSession();
iterator.remove();
logger.info("meta server {} was removed", addr);
}

// newly added metas
Set<rpc_address> added = new HashSet<rpc_address>(newSet);
added.removeAll(oldSet);
for (rpc_address addr : added) {
// Find the meta servers that should be added.
Set<rpc_address> addedSet = new HashSet<>(newSet);
addedSet.removeAll(oldSet);

// Add each new meta servers to the meta list.
for (rpc_address addr : addedSet) {
metaList.add(clusterManager.getReplicaSession(addr));
logger.info("add {} as meta server", addr);
logger.info("meta server {} was added", addr);
}
}

Expand All @@ -328,12 +317,12 @@ List<ReplicaSession> getMetaList() {
return metaList;
}

private ClusterManager clusterManager;
private List<ReplicaSession> metaList;
private final ClusterManager clusterManager;
private final List<ReplicaSession> metaList;
private int curLeader;
private int eachQueryTimeoutInMills;
private int defaultMaxQueryCount;
private EventLoopGroup group;
private final int eachQueryTimeoutInMills;
private final int defaultMaxQueryCount;
private final EventLoopGroup group;
private String hostPort;

private static final org.slf4j.Logger logger =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -45,8 +42,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;

public class MetaSessionTest {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(MetaSessionTest.class);

// "Mockito.when(meta.resolve(("localhost:34601"))).thenReturn(addrs)" is for simulating DNS
// resolution: <localhost:34601>-><addrs>
Expand All @@ -56,23 +55,21 @@ public void before() throws Exception {}

@AfterEach
public void after() throws Exception {
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34602");
Toollet.tryStartServer(addr);
Toollet.tryStartServer(Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34602")));
}

private static void ensureNotLeader(rpc_address addr) {
Toollet.closeServer(addr);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("failed while sleeping: ", e);
}
Toollet.tryStartServer(addr);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
logger.error("failed while sleeping: ", e);
}
}

Expand All @@ -87,22 +84,18 @@ public void testMetaConnect() throws Exception {
new ClusterManager(ClientOptions.builder().metaServers(address_list).build());
MetaSession session = manager.getMetaSession();

rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34602");
rpc_address addr = Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34602"));
ensureNotLeader(addr);

ArrayList<FutureTask<Void>> callbacks = new ArrayList<FutureTask<Void>>();
ArrayList<FutureTask<Void>> callbacks = new ArrayList<>();
for (int i = 0; i < 1000; ++i) {
query_cfg_request req = new query_cfg_request("temp", new ArrayList<Integer>());
final client_operator op = new query_cfg_operator(new gpid(-1, -1), req);
FutureTask<Void> callback =
new FutureTask<Void>(
new Callable<Void>() {
@Override
public Void call() throws Exception {
assertEquals(error_code.error_types.ERR_OK, op.rpc_error.errno);
return null;
}
new FutureTask<>(
() -> {
assertEquals(error_code.error_types.ERR_OK, op.rpc_error.errno);
return null;
});
callbacks.add(callback);
session.asyncExecute(op, callback, 10);
Expand All @@ -113,7 +106,7 @@ public Void call() throws Exception {
try {
Tools.waitUninterruptable(cb, Integer.MAX_VALUE);
} catch (ExecutionException e) {
e.printStackTrace();
logger.error("failed while waiting for callback", e);
fail();
}
}
Expand Down Expand Up @@ -256,18 +249,10 @@ public void testMetaForwardUnknownPrimary() throws Exception {
FieldUtils.writeField(op, "response", new query_cfg_response(), true);
op.get_response().err = new error_code();
op.get_response().err.errno = error_code.error_types.ERR_FORWARD_TO_OTHERS;
op.get_response().partitions = Arrays.asList(new partition_configuration[1]);
op.get_response().partitions.set(0, new partition_configuration());
op.get_response().partitions = Collections.singletonList(new partition_configuration());
op.get_response().partitions.get(0).primary = rpc_address.fromIpPort("172.0.0.3:34601");
MetaSession.MetaRequestRound round =
new MetaSession.MetaRequestRound(
op,
new Runnable() {
@Override
public void run() {}
},
10,
meta.getMetaList().get(0));
new MetaSession.MetaRequestRound(op, () -> {}, 10, meta.getMetaList().get(0));

// do not retry after a failed QueryMeta.
Mockito.doNothing().when(meta).retryQueryMeta(round, false);
Expand All @@ -292,9 +277,15 @@ public void testDNSResetMetaMaxQueryCount() {

List<ReplicaSession> metaList = metaMock.getMetaList();
metaList.remove(0); // del the "localhost:34601"
metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34602")));
metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34603")));
metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34601")));
metaList.add(
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("172.0.0.1:34602"))));
metaList.add(
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("172.0.0.1:34603"))));
metaList.add(
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("172.0.0.1:34601"))));

rpc_address[] newAddrs = new rpc_address[5];
newAddrs[0] = rpc_address.fromIpPort("137.0.0.1:34602");
Expand Down Expand Up @@ -330,9 +321,15 @@ public void testDNSMetaUnavailable() {
MetaSession metaMock = Mockito.spy(manager.getMetaSession());
List<ReplicaSession> metaList = metaMock.getMetaList();
metaList.clear(); // del the "localhost:34601" resolve right results
metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34602")));
metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34603")));
metaList.add(manager.getReplicaSession(rpc_address.fromIpPort("172.0.0.1:34601")));
metaList.add(
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("172.0.0.1:34602"))));
metaList.add(
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("172.0.0.1:34603"))));
metaList.add(
manager.getReplicaSession(
Objects.requireNonNull(rpc_address.fromIpPort("172.0.0.1:34601"))));
rpc_address[] newAddrs =
new rpc_address[] {
rpc_address.fromIpPort("137.0.0.1:34602"),
Expand Down
Loading

0 comments on commit 89f43c3

Please sign in to comment.