Skip to content

Commit f4282f7

Browse files
authored
Merge pull request #893 from tronprotocol/add-test-case
Add test case
2 parents 9c873e7 + b40c827 commit f4282f7

File tree

3 files changed

+450
-28
lines changed

3 files changed

+450
-28
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package org.tron.core.net.node;
2+
3+
import io.netty.bootstrap.Bootstrap;
4+
import io.netty.channel.Channel;
5+
import io.netty.channel.ChannelInitializer;
6+
import io.netty.channel.ChannelOption;
7+
import io.netty.channel.DefaultMessageSizeEstimator;
8+
import io.netty.channel.FixedRecvByteBufAllocator;
9+
import io.netty.channel.nio.NioEventLoopGroup;
10+
import io.netty.channel.socket.nio.NioSocketChannel;
11+
import io.netty.handler.codec.ByteToMessageDecoder;
12+
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
13+
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
14+
import io.netty.handler.timeout.ReadTimeoutHandler;
15+
import io.netty.handler.timeout.WriteTimeoutHandler;
16+
import java.io.File;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.TimeUnit;
20+
import lombok.extern.slf4j.Slf4j;
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
24+
import org.tron.common.application.Application;
25+
import org.tron.common.application.ApplicationFactory;
26+
import org.tron.common.overlay.client.PeerClient;
27+
import org.tron.common.overlay.server.ChannelManager;
28+
import org.tron.common.overlay.server.SyncPool;
29+
import org.tron.common.utils.FileUtil;
30+
import org.tron.core.config.DefaultConfig;
31+
import org.tron.core.config.args.Args;
32+
import org.tron.core.db.Manager;
33+
import org.tron.core.services.RpcApiService;
34+
import org.tron.core.services.WitnessService;
35+
36+
@Slf4j
37+
public abstract class BaseNetTest {
38+
39+
protected static AnnotationConfigApplicationContext context;
40+
protected NodeImpl node;
41+
protected RpcApiService rpcApiService;
42+
protected PeerClient peerClient;
43+
protected ChannelManager channelManager;
44+
protected SyncPool pool;
45+
protected Manager manager;
46+
private Application appT;
47+
48+
private String dbPath;
49+
private String dbDirectory;
50+
private String indexDirectory;
51+
52+
private int port;
53+
54+
private ExecutorService executorService = Executors.newFixedThreadPool(1);
55+
56+
public BaseNetTest(String dbPath, String dbDirectory, String indexDirectory, int port) {
57+
this.dbPath = dbPath;
58+
this.dbDirectory = dbDirectory;
59+
this.indexDirectory = indexDirectory;
60+
this.port = port;
61+
}
62+
63+
@Before
64+
public void init() {
65+
executorService.execute(new Runnable() {
66+
@Override
67+
public void run() {
68+
logger.info("Full node running.");
69+
Args.setParam(
70+
new String[]{
71+
"--output-directory", dbPath,
72+
"--storage-db-directory", dbDirectory,
73+
"--storage-index-directory", indexDirectory
74+
},
75+
"config.conf"
76+
);
77+
Args cfgArgs = Args.getInstance();
78+
cfgArgs.setNodeListenPort(port);
79+
cfgArgs.setNodeDiscoveryEnable(false);
80+
cfgArgs.getSeedNode().getIpList().clear();
81+
cfgArgs.setNeedSyncCheck(false);
82+
cfgArgs.setNodeExternalIp("127.0.0.1");
83+
84+
context = new AnnotationConfigApplicationContext(DefaultConfig.class);
85+
86+
if (cfgArgs.isHelp()) {
87+
logger.info("Here is the help message.");
88+
return;
89+
}
90+
appT = ApplicationFactory.create(context);
91+
rpcApiService = context.getBean(RpcApiService.class);
92+
appT.addService(rpcApiService);
93+
if (cfgArgs.isWitness()) {
94+
appT.addService(new WitnessService(appT));
95+
}
96+
appT.initServices(cfgArgs);
97+
appT.startServices();
98+
99+
node = context.getBean(NodeImpl.class);
100+
peerClient = context.getBean(PeerClient.class);
101+
channelManager = context.getBean(ChannelManager.class);
102+
pool = context.getBean(SyncPool.class);
103+
manager = context.getBean(Manager.class);
104+
NodeDelegate nodeDelegate = new NodeDelegateImpl(manager);
105+
node.setNodeDelegate(nodeDelegate);
106+
pool.init(node);
107+
108+
appT.startup();
109+
rpcApiService.blockUntilShutdown();
110+
}
111+
});
112+
int tryTimes = 1;
113+
while (tryTimes <= 30 && (node == null || peerClient == null
114+
|| channelManager == null || pool == null)) {
115+
try {
116+
logger.info("node:{},peerClient:{},channelManager:{},pool:{}", node, peerClient,
117+
channelManager, pool);
118+
Thread.sleep(1000 * tryTimes);
119+
} catch (InterruptedException e) {
120+
e.printStackTrace();
121+
} finally {
122+
++tryTimes;
123+
}
124+
}
125+
}
126+
127+
protected Channel createClient(ByteToMessageDecoder decoder)
128+
throws InterruptedException {
129+
NioEventLoopGroup group = new NioEventLoopGroup(1);
130+
Bootstrap b = new Bootstrap();
131+
b.group(group).channel(NioSocketChannel.class)
132+
.handler(new ChannelInitializer<Channel>() {
133+
@Override
134+
protected void initChannel(Channel ch) throws Exception {
135+
// limit the size of receiving buffer to 1024
136+
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(256 * 1024));
137+
ch.config().setOption(ChannelOption.SO_RCVBUF, 256 * 1024);
138+
ch.config().setOption(ChannelOption.SO_BACKLOG, 1024);
139+
ch.pipeline()
140+
.addLast("readTimeoutHandler", new ReadTimeoutHandler(600, TimeUnit.SECONDS))
141+
.addLast("writeTimeoutHandler", new WriteTimeoutHandler(600, TimeUnit.SECONDS));
142+
ch.pipeline().addLast("protoPender", new ProtobufVarint32LengthFieldPrepender());
143+
ch.pipeline().addLast("lengthDecode", new ProtobufVarint32FrameDecoder());
144+
ch.pipeline().addLast("handshakeHandler", decoder);
145+
146+
// be aware of channel closing
147+
ch.closeFuture();
148+
}
149+
}).option(ChannelOption.SO_KEEPALIVE, true)
150+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
151+
.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT);
152+
return b.connect("127.0.0.1", port).sync().channel();
153+
}
154+
155+
@After
156+
public void destroy() {
157+
executorService.shutdownNow();
158+
Args.clearParam();
159+
FileUtil.deleteDir(new File(dbPath));
160+
}
161+
}

src/test/java/org/tron/core/net/node/BroadTest.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.concurrent.ConcurrentHashMap;
99
import java.util.concurrent.ExecutorService;
1010
import lombok.extern.slf4j.Slf4j;
11+
import org.apache.commons.collections4.MapUtils;
1112
import org.junit.After;
1213
import org.junit.Assert;
1314
import org.junit.Before;
@@ -18,6 +19,7 @@
1819
import org.tron.common.overlay.client.PeerClient;
1920
import org.tron.common.overlay.discover.Node;
2021
import org.tron.common.overlay.message.Message;
22+
import org.tron.common.overlay.server.Channel;
2123
import org.tron.common.overlay.server.ChannelManager;
2224
import org.tron.common.overlay.server.MessageQueue;
2325
import org.tron.common.overlay.server.SyncPool;
@@ -27,6 +29,7 @@
2729
import org.tron.core.capsule.BlockCapsule;
2830
import org.tron.core.config.DefaultConfig;
2931
import org.tron.core.config.args.Args;
32+
import org.tron.core.db.ByteArrayWrapper;
3033
import org.tron.core.db.Manager;
3134
import org.tron.core.net.message.BlockMessage;
3235
import org.tron.core.net.message.MessageTypes;
@@ -242,10 +245,6 @@ private void prepare() {
242245
ReflectUtils.setFieldValue(node, "isAdvertiseActive", false);
243246
ReflectUtils.setFieldValue(node, "isFetchActive", false);
244247

245-
// ScheduledExecutorService mainWorker = ReflectUtils
246-
// .getFieldValue(channelManager, "mainWorker");
247-
// mainWorker.shutdownNow();
248-
249248
Node node = new Node(
250249
"enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889");
251250
new Thread(new Runnable() {
@@ -254,30 +253,13 @@ public void run() {
254253
peerClient.connect(node.getHost(), node.getPort(), node.getHexId());
255254
}
256255
}).start();
257-
Thread.sleep(5000);
258-
// List<Channel> newChanelList = ReflectUtils.getFieldValue(channelManager, "newPeers");
259-
// int tryTimes = 0;
260-
// while (CollectionUtils.isEmpty(newChanelList) && ++tryTimes < 10) {
261-
// Thread.sleep(1000);
262-
// }
263-
// logger.info("newChanelList size : {}", newChanelList.size());
264-
265-
// Field activePeersField = channelManager.getClass().getDeclaredField("activePeers");
266-
// activePeersField.setAccessible(true);
267-
// Map<ByteArrayWrapper, Channel> activePeersMap = (Map<ByteArrayWrapper, Channel>) activePeersField
268-
// .get(channelManager);
269-
//
270-
// Field apField = pool.getClass().getDeclaredField("activePeers");
271-
// apField.setAccessible(true);
272-
// List<PeerConnection> activePeers = (List<PeerConnection>) apField.get(pool);
273-
274-
// for (Channel channel : newChanelList) {
275-
// activePeersMap.put(channel.getNodeIdWrapper(), channel);
276-
// activePeers.add((PeerConnection) channel);
277-
// }
278-
// apField.set(pool, activePeers);
279-
// activePeersField.set(channelManager, activePeersMap);
280-
//
256+
Thread.sleep(2000);
257+
Map<ByteArrayWrapper, Channel> activePeers = ReflectUtils
258+
.getFieldValue(channelManager, "activePeers");
259+
int tryTimes = 0;
260+
while (MapUtils.isEmpty(activePeers) && ++tryTimes < 10) {
261+
Thread.sleep(1000);
262+
}
281263
go = true;
282264
} catch (Exception e) {
283265
e.printStackTrace();

0 commit comments

Comments
 (0)