Skip to content

Commit f29b586

Browse files
authored
Merge pull request #700 from tronprotocol/develop
merge develop
2 parents aed6daf + 27ab5ef commit f29b586

24 files changed

+324
-151
lines changed

src/main/java/org/tron/common/overlay/client/PeerClient.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,27 @@
22

33
import io.netty.bootstrap.Bootstrap;
44
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelFutureListener;
56
import io.netty.channel.ChannelOption;
67
import io.netty.channel.DefaultMessageSizeEstimator;
78
import io.netty.channel.EventLoopGroup;
89
import io.netty.channel.nio.NioEventLoopGroup;
910
import io.netty.channel.socket.nio.NioSocketChannel;
11+
import java.util.concurrent.ThreadFactory;
12+
import java.util.concurrent.atomic.AtomicInteger;
1013
import org.slf4j.Logger;
1114
import org.slf4j.LoggerFactory;
1215
import org.springframework.beans.factory.annotation.Autowired;
1316
import org.springframework.context.ApplicationContext;
1417
import org.springframework.context.annotation.Lazy;
1518
import org.springframework.stereotype.Component;
19+
import org.tron.common.overlay.discover.Node;
20+
import org.tron.common.overlay.discover.NodeHandler;
21+
import org.tron.common.overlay.message.ReasonCode;
1622
import org.tron.common.overlay.server.TronChannelInitializer;
1723
import org.tron.core.config.args.Args;
1824
import org.tron.core.net.node.NodeImpl;
1925

20-
import java.io.IOException;
21-
import java.util.concurrent.ThreadFactory;
22-
import java.util.concurrent.atomic.AtomicInteger;
23-
2426
@Component
2527
public class PeerClient {
2628

@@ -54,6 +56,20 @@ public void connect(String host, int port, String remoteId) {
5456
}
5557
}
5658

59+
public ChannelFuture connectAsync(NodeHandler nodeHandler, boolean discoveryMode) {
60+
Node node = nodeHandler.getNode();
61+
return connectAsync(node.getHost(), node.getPort(), node.getHexId(), discoveryMode)
62+
.addListener((ChannelFutureListener) future -> {
63+
if (!future.isSuccess()) {
64+
logger.error("connect to {}:{} fail,cause:{}", node.getHost(), node.getPort(),
65+
future.cause().getMessage());
66+
nodeHandler.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.CONNECT_FAIL);
67+
nodeHandler.getNodeStatistics().notifyDisconnect();
68+
future.channel().close();
69+
}
70+
});
71+
}
72+
5773
public ChannelFuture connectAsync(String host, int port, String remoteId, boolean discoveryMode) {
5874

5975
logger.info("connect peer {} {} {}", host, port, remoteId);

src/main/java/org/tron/common/overlay/discover/NodeStatistics.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import static java.lang.Math.min;
2222

2323
import java.util.concurrent.atomic.AtomicLong;
24-
2524
import org.tron.common.overlay.message.ReasonCode;
2625

2726
public class NodeStatistics {
2827

2928
public final static int REPUTATION_PREDEFINED = 100000;
3029
public final static long TOO_MANY_PEERS_PENALIZE_TIMEOUT = 60 * 1000;
30+
private static final long CLEAR_CYCLE_TIME = 60 * 60 * 1000;
3131

3232
public class StatHandler {
3333

@@ -92,7 +92,7 @@ private int getSessionFairReputation() {
9292
int discoverReput = 0;
9393

9494
discoverReput +=
95-
min(discoverInPong.get(), 1) * (discoverOutPing.get() == discoverInPong.get() ? 50 : 1);
95+
min(discoverInPong.get(), 1) * (discoverOutPing.get() == discoverInPong.get() ? 51 : 1);
9696
discoverReput += min(discoverInNeighbours.get(), 10) * 10;
9797
discoverReput += min(discoverInFind.get(), 50);
9898

@@ -117,7 +117,10 @@ private int getSessionFairReputation() {
117117
}
118118
}
119119
}
120-
return discoverReput + 10 * reput;
120+
int score =
121+
discoverReput + 10 * reput - (int) Math.pow(2, disconnectTimes) * (disconnectTimes > 0 ? 10
122+
: 0);
123+
return score > 0 ? score : 0;
121124
}
122125

123126
public int getReputation() {
@@ -146,7 +149,15 @@ public boolean isReputationPenalized() {
146149
return true;
147150
}
148151

149-
return tronLastLocalDisconnectReason == ReasonCode.NULL_IDENTITY ||
152+
if (lastDisconnectedTime > 0
153+
&& (System.currentTimeMillis() - lastDisconnectedTime) > CLEAR_CYCLE_TIME) {
154+
tronLastLocalDisconnectReason = null;
155+
tronLastRemoteDisconnectReason = null;
156+
disconnectTimes = 0;
157+
persistedReputation = 0;
158+
}
159+
160+
if (tronLastLocalDisconnectReason == ReasonCode.NULL_IDENTITY ||
150161
tronLastRemoteDisconnectReason == ReasonCode.NULL_IDENTITY ||
151162
tronLastLocalDisconnectReason == ReasonCode.INCOMPATIBLE_PROTOCOL ||
152163
tronLastRemoteDisconnectReason == ReasonCode.INCOMPATIBLE_PROTOCOL ||
@@ -163,26 +174,38 @@ public boolean isReputationPenalized() {
163174
tronLastLocalDisconnectReason == ReasonCode.INCOMPATIBLE_VERSION ||
164175
tronLastRemoteDisconnectReason == ReasonCode.INCOMPATIBLE_VERSION ||
165176
tronLastLocalDisconnectReason == ReasonCode.INCOMPATIBLE_CHAIN ||
166-
tronLastRemoteDisconnectReason == ReasonCode.INCOMPATIBLE_CHAIN;
177+
tronLastRemoteDisconnectReason == ReasonCode.INCOMPATIBLE_CHAIN ||
178+
tronLastRemoteDisconnectReason == ReasonCode.SYNC_FAIL ||
179+
tronLastLocalDisconnectReason == ReasonCode.SYNC_FAIL) {
180+
persistedReputation = 0;
181+
return true;
182+
}
183+
return false;
167184
}
168185

169186
public boolean isPenalized() {
170187
return tronLastLocalDisconnectReason == ReasonCode.NULL_IDENTITY ||
171-
tronLastRemoteDisconnectReason == ReasonCode.NULL_IDENTITY ||
172-
tronLastLocalDisconnectReason == ReasonCode.BAD_PROTOCOL ||
173-
tronLastRemoteDisconnectReason == ReasonCode.BAD_PROTOCOL;
188+
tronLastRemoteDisconnectReason == ReasonCode.NULL_IDENTITY ||
189+
tronLastLocalDisconnectReason == ReasonCode.BAD_PROTOCOL ||
190+
tronLastRemoteDisconnectReason == ReasonCode.BAD_PROTOCOL ||
191+
tronLastLocalDisconnectReason == ReasonCode.SYNC_FAIL ||
192+
tronLastRemoteDisconnectReason == ReasonCode.SYNC_FAIL;
174193
}
175194

176195
public void nodeDisconnectedRemote(ReasonCode reason) {
177196
lastDisconnectedTime = System.currentTimeMillis();
178197
tronLastRemoteDisconnectReason = reason;
179-
disconnectTimes++;
180198
}
181199

182200
public void nodeDisconnectedLocal(ReasonCode reason) {
183201
lastDisconnectedTime = System.currentTimeMillis();
184202
tronLastLocalDisconnectReason = reason;
203+
}
204+
205+
public void notifyDisconnect() {
206+
lastDisconnectedTime = System.currentTimeMillis();
185207
disconnectTimes++;
208+
persistedReputation = persistedReputation / 2;
186209
}
187210

188211
public boolean wasDisconnected() {

src/main/java/org/tron/common/overlay/message/Message.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import org.tron.common.utils.Sha256Hash;
1010
import org.tron.core.net.message.MessageTypes;
1111

12-
1312
public abstract class Message {
1413

15-
protected static final Logger logger = LoggerFactory.getLogger("Net");
14+
protected static final Logger logger = LoggerFactory.getLogger("Message");
1615

1716
protected boolean unpacked;
1817
protected byte[] data;
@@ -32,7 +31,6 @@ public Message(byte type, byte[] packed) {
3231
unpacked = false;
3332
}
3433

35-
3634
public ByteBuf getSendData(){
3735
return Unpooled.wrappedBuffer(ArrayUtils.add(this.getData(), 0 ,type));
3836
}
@@ -41,13 +39,25 @@ public Sha256Hash getMessageId() {
4139
return Sha256Hash.of(getData());
4240
}
4341

44-
public abstract byte[] getData();
42+
public byte[] getData(){
43+
return this.data;
44+
}
45+
46+
public MessageTypes getType(){
47+
return MessageTypes.fromByte(this.type);
48+
}
49+
50+
public abstract Class<?> getAnswerMessage();
4551

52+
@Override
4653
public String toString() {
4754
return "[Message Type: " + getType() + ", Message Hash: " + getMessageId() + "]";
4855
}
4956

50-
public abstract Class<?> getAnswerMessage();
57+
@Override
58+
public int hashCode() {
59+
return Arrays.hashCode(data);
60+
}
5161

5262
@Override
5363
public boolean equals(Object o) {
@@ -60,12 +70,4 @@ public boolean equals(Object o) {
6070
Message message = (Message) o;
6171
return Arrays.equals(data, message.data);
6272
}
63-
64-
@Override
65-
public int hashCode() {
66-
return Arrays.hashCode(data);
67-
}
68-
69-
public abstract MessageTypes getType();
70-
7173
}

src/main/java/org/tron/common/overlay/message/MessageFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
public abstract class MessageFactory {
1818

19-
public static String ERR_NO_SUCH_MSG = "No such message";
20-
public static String ERR_PARSE_FAILED = "parse message failed";
21-
2219
protected abstract Message create(byte[] data) throws Exception;
2320

2421
}

src/main/java/org/tron/common/overlay/message/ReasonCode.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public enum ReasonCode {
8989

9090
TIME_OUT(0x20),
9191

92+
CONNECT_FAIL(0x21),
93+
9294
/**
9395
* [0xFF] Reason not specified
9496
*/

src/main/java/org/tron/common/overlay/server/Channel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public void processException(Throwable throwable){
162162
if (throwable instanceof ReadTimeoutException){
163163
logger.error("Read timeout, {}", address);
164164
}else if(baseThrowable instanceof P2pException){
165-
logger.error("type: {}, info: {}, {}", ((P2pException) throwable).getType(), errMsg, address);
165+
logger.error("type: {}, info: {}, {}", ((P2pException) baseThrowable).getType(), errMsg, address);
166166
}else if (errMsg != null && errMsg.contains("Connection reset by peer")){
167167
logger.error("{}, {}", errMsg, address);
168168
}else {

src/main/java/org/tron/common/overlay/server/ChannelManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public void notifyDisconnect(Channel channel) {
150150
|| channel.getChannelHandlerContext().channel() == null) {
151151
return;
152152
}
153+
channel.getNodeStatistics().notifyDisconnect();
153154
InetSocketAddress socketAddress = (InetSocketAddress) channel.getChannelHandlerContext()
154155
.channel().remoteAddress();
155156
recentlyDisconnected.put(socketAddress.getAddress(), new Date());

src/main/java/org/tron/common/overlay/server/MessageQueue.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class MessageQueue {
2626

2727
private static final Logger logger = LoggerFactory.getLogger("MessageQueue");
2828

29-
private boolean sendMsgFlag = false;
29+
private volatile boolean sendMsgFlag = false;
3030

3131
private Thread sendMsgThread;
3232

@@ -70,7 +70,7 @@ public void activate(ChannelHandlerContext ctx) {
7070
Message msg = msgQueue.take();
7171
ctx.writeAndFlush(msg.getSendData()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
7272
}catch (Exception e) {
73-
logger.error("send message failed, {}, error info: {}", ctx.channel().remoteAddress(), e.getMessage());
73+
logger.error("Send message failed, {}, error info: {}", ctx.channel().remoteAddress(), e.getMessage());
7474
}
7575
}
7676
});
@@ -102,6 +102,15 @@ public void close() {
102102
sendMsgFlag = false;
103103
if(sendTask != null && !sendTask.isCancelled()){
104104
sendTask.cancel(false);
105+
sendTask = null;
106+
}
107+
if (sendMsgThread != null){
108+
try{
109+
sendMsgThread.join(20);
110+
sendMsgThread = null;
111+
}catch (Exception e){
112+
logger.warn("Join send thread failed, peer {}", ctx.channel().remoteAddress());
113+
}
105114
}
106115
}
107116

src/main/java/org/tron/common/overlay/server/SyncPool.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ private void fillUp() {
102102
nodesInUse.add(nodeManager.getPublicHomeNode().getHexId());
103103

104104
List<NodeHandler> newNodes = nodeManager.getNodes(new NodeSelector(nodesInUse), lackSize);
105-
newNodes.forEach(n -> peerClient.connectAsync(n.getNode().getHost(), n.getNode().getPort(),
106-
n.getNode().getHexId(), false));
105+
newNodes.forEach(n -> peerClient.connectAsync(n, false));
107106
}
108107

109108
// for test only

src/main/java/org/tron/common/utils/ByteUtil.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,6 @@ public static int byteArrayToInt(byte[] b) {
155155
return new BigInteger(1, b).intValue();
156156
}
157157

158-
public static boolean isNullOrZeroArray(byte[] array) {
159-
return (array == null) || (array.length == 0);
160-
}
161-
162158
public static boolean isSingleZero(byte[] array) {
163159
return (array.length == 1 && array[0] == 0);
164160
}

0 commit comments

Comments
 (0)