Skip to content

Commit 1912401

Browse files
Merge pull request #24 from ReferenceType/develop
Develop
2 parents bacdd7d + 7e5e4ec commit 1912401

File tree

100 files changed

+6428
-645
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+6428
-645
lines changed

Benchmarks/RelayBenchmark/Program.cs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -179,31 +179,43 @@ private static void SerializerTest()
179179
Payload = new byte[32]
180180
};
181181
static Stopwatch sw = new Stopwatch();
182+
static long sumsum = 0;
183+
182184
private static void RelayTest()
183185
{
186+
184187
//string ip = "79.52.134.220";
185188
string ip = "127.0.0.1";
186189

187190

188191
var cert = new X509Certificate2("client.pfx", "greenpass");
189192
var scert = new X509Certificate2("server.pfx", "greenpass");
190-
// var server = new SecureProtoRelayServer(20011, scert);
191-
// server.StartServer();
193+
if (Console.ReadLine() == "e")
194+
{
195+
var server = new SecureProtoRelayServer(20020, scert);
196+
server.StartServer();
197+
Task.Run(async () => { while (true) { await Task.Delay(5000); server.GetTcpStatistics(out var generalStats, out _); Console.WriteLine(generalStats.ToString()); } });
198+
199+
}
200+
Console.ReadLine();
192201
//Task.Run(async () => { while (true) { await Task.Delay(10000); server.GetTcpStatistics(out var generalStats, out _); Console.WriteLine(generalStats.ToString()); } });
193202
var clients = new List<RelayClient>();
194-
int numclients = 2;
203+
int numclients = 50;
195204
var pending = new Task[numclients];
205+
Task.Run(async () => { while (true) { await Task.Delay(1000); Console.WriteLine(Interlocked.Exchange(ref sumsum, 0).ToString("N0")); } });
206+
196207
// Parallel.For(0, numclients, (i) =>
197208
for (int i = 0; i < numclients; i++)
198209

199210
{
200-
var client = new RelayClient(cert);
211+
var client = new RelayClient(cert,0);
201212
client.OnMessageReceived += (reply) => ClientMsgReceived(client, reply);
202213
client.OnUdpMessageReceived += (reply) => ClientUdpReceived(client, reply);
203214
//client.OnPeerRegistered += (id) => { /*if (client.sessionId.CompareTo(id) > 0)*/ client.RequestHolePunchAsync(id, 10000, false); };
204215
try
205216
{
206-
pending[i] = client.ConnectAsync(ip, 20011);
217+
pending[i] = client.ConnectAsync(ip, 20020);
218+
// client.StartPingService();
207219
// client.Connect(ip, 20011);
208220
clients.Add(client);
209221
//client.StartPingService();
@@ -230,8 +242,10 @@ private static void RelayTest()
230242
if (peer.Key == Guid.Empty)
231243
throw new Exception();
232244

233-
//var a = client.RequestHolePunchAsync(peer.Key, 10000, false);
234-
//pndg.Add(a);
245+
//var a = client.RequestTcpHolePunchAsync(peer.Key);
246+
//pndg.Add(a);
247+
// var aa = client.RequestHolePunchAsync(peer.Key, 10000, false);
248+
//pndg.Add(aa);
235249
//client.TestHP(peer.Key, 10000, false);
236250
// Console.WriteLine(peer.Key+" cnt=> "+ ++cc);
237251
}
@@ -243,13 +257,15 @@ private static void RelayTest()
243257

244258

245259
Thread.Sleep(1000);
260+
Console.WriteLine("all good");
261+
246262
// Parallel.ForEach(clients, (client) =>
247263
foreach (var client in clients)
248264
{
249265
var testMessage = new MessageEnvelope()
250266
{
251267
Header = "Test",
252-
Payload = new byte[255000]
268+
Payload = new byte[32]
253269
};
254270
for (int i = 0; i < testMessage.PayloadCount; i++)
255271
{
@@ -261,11 +277,12 @@ private static void RelayTest()
261277
foreach (var peer in client.Peers.Keys)
262278
{
263279
//await client.SendRequestAndWaitResponse(peer, testMessage,1000);
264-
//client.SendAsyncMessage(peer, testMessage);
265-
//client.SendUdpMesssage(peer, testMessage);
280+
client.SendAsyncMessage(peer, testMessage);
281+
//client.SendUdpMessage(peer, testMessage);
282+
// client.SendRudpMessage(peer, testMessage);
266283
// client.BroadcastMessage(testMessage);
267284
//client.BroadcastUdpMessage(testMessage);
268-
client.SendRudpMessage(peer,testMessage);
285+
// client.SendRudpMessage(peer,testMessage);
269286
}
270287
}
271288
break;
@@ -276,13 +293,14 @@ private static void RelayTest()
276293

277294
sw.Start();
278295
sw2.Start();
279-
280296
void ClientMsgReceived(RelayClient client, MessageEnvelope reply)
281297
{
298+
//Interlocked.Add(ref sumsum, reply.PayloadCount);
299+
Interlocked.Increment(ref sumsum);
282300
//Interlocked.Increment(ref totMsgCl);
283301
client.SendAsyncMessage(reply.From, reply);
284302
// Console.WriteLine("R " + sw.ElapsedMilliseconds);
285-
sw.Restart();
303+
// sw.Restart();
286304
}
287305

288306

@@ -299,8 +317,8 @@ void ClientUdpReceived(RelayClient client, MessageEnvelope reply)
299317
// var b = (byte)i;
300318
// }
301319
//}
302-
//client.SendUdpMesssage(reply.From, reply);
303-
client.SendRudpMessage(reply.From, reply);
320+
client.SendUdpMessage(reply.From, reply);
321+
// client.SendRudpMessage(reply.From, reply);
304322
return;
305323
if (client == clients[0])
306324
{

Benchmarks/SslBenchmark/Program.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using NetworkLibrary.Components.Statistics;
1+
using NetworkLibrary.Components.Crypto.Certificate;
2+
using NetworkLibrary.Components.Statistics;
23
using NetworkLibrary.TCP.SSL.ByteMessage;
34
using NetworkLibrary.Utils;
45
using System.Diagnostics;
@@ -88,7 +89,7 @@ private static void InitializeClients()
8889
{
8990
clientMessage = new byte[messageSize];
9091
clients = new List<SslByteMessageClient>();
91-
var ccert = new X509Certificate2("client.pfx", "greenpass");
92+
var ccert = CertificateGenerator.GenerateSelfSignedCertificate();//new X509Certificate2("client.pfx", "greenpass");
9293

9394
for (int i = 0; i < numClients; i++)
9495
{
@@ -133,7 +134,7 @@ private static void InitializeServer()
133134
fixedMessage = isFixedMessage ? new byte[fixedMessageSize] : new byte[0];
134135
var scert = new X509Certificate2("server.pfx", "greenpass");
135136

136-
server = new SslByteMessageServer(port, scert);
137+
server = new SslByteMessageServer(port);
137138
server.RemoteCertificateValidationCallback
138139
+= ValidateCertAsClient;
139140

Benchmarks/SslBenchmark/SslBenchmark.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
<PropertyGroup>
44
<OutputType>Exe</OutputType>
5-
<TargetFramework>net7.0</TargetFramework>
5+
<TargetFramework>net8.0</TargetFramework>
66
<ImplicitUsings>enable</ImplicitUsings>
77
<Nullable>enable</Nullable>
8+
<PublishAot>False</PublishAot>
89
</PropertyGroup>
910

1011
<ItemGroup>

Benchmarks/TcpBenchmark/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ private static void InitializeClients()
115115
}
116116
Task.WaitAll(toWait);
117117
Console.WriteLine("All Clients Connected");
118+
Console.WriteLine(server?.SessionCount);
118119

119120
}
120121

Benchmarks/TcpBenchmark/TcpMessageBenchmark.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
<PropertyGroup>
44
<OutputType>Exe</OutputType>
5-
<TargetFramework>net7.0</TargetFramework>
5+
<TargetFramework>net8.0</TargetFramework>
66
<Nullable>enable</Nullable>
77
<Platforms>AnyCPU;ARM32</Platforms>
8+
<PublishAot>False</PublishAot>
89
</PropertyGroup>
910

1011
<ItemGroup>
Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
1-
using MessagePack;
2-
using MessagePackNetwork.Components;
1+
using MessagePackNetwork.Components;
32
using NetworkLibrary.MessageProtocol.Fast;
4-
using System;
5-
using System.Collections.Generic;
6-
using System.Linq;
7-
using System.Text;
8-
using System.Threading.Tasks;
93

104
namespace MessagePackNetwork.MessageProtocol
115
{
126
internal class MessagePackMessageClient:GenericMessageClientWrapper<MessagepackSerializer>
137
{
8+
149
}
1510
}

NetworkLibrary/Components/BufferPool.cs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
using System.Runtime.CompilerServices;
77
using System.Threading;
88
using System.Runtime.InteropServices;
9+
using System.Threading.Tasks;
10+
911
#if NET5_0_OR_GREATER
1012
using System.Runtime.Intrinsics.Arm;
1113
using System.Runtime.Intrinsics.X86;
@@ -24,12 +26,12 @@ namespace NetworkLibrary
2426
*/
2527
public class BufferPool
2628
{
29+
static ConcurrentDictionary<byte[], string> AAA = new ConcurrentDictionary<byte[], string>();
2730
public static bool ForceGCOnCleanup = true;
2831
public static int MaxMemoryBeforeForceGc = 100000000;
2932
public const int MaxBufferSize = 1073741824;
3033
public const int MinBufferSize = 256;
31-
private static readonly ConcurrentBag<WeakReference<byte[]>> weakReferencePool = new ConcurrentBag<WeakReference<byte[]>>();
32-
private static readonly ConcurrentBag<WeakReference<byte[]>>[] bufferBuckets = new ConcurrentBag<WeakReference<byte[]>>[32];
34+
private static readonly ConcurrentBag<byte[]>[] bufferBuckets = new ConcurrentBag<byte[]>[32];
3335
private static SortedDictionary<int, int> bucketCapacityLimits = new SortedDictionary<int, int>()
3436
{
3537
{ 256,10000 },
@@ -63,9 +65,8 @@ public class BufferPool
6365

6466
static BufferPool()
6567
{
66-
Init();
67-
memoryMaintainer = new Thread(MaintainMemory);
68-
memoryMaintainer.Priority = ThreadPriority.Lowest;
68+
Init();
69+
MaintainMemory();
6970
}
7071

7172
/// <summary>
@@ -94,25 +95,24 @@ private static void Init()
9495
//bufferBuckets = new ConcurrentDictionary<int, ConcurrentBag<byte[]>>();
9596
for (int i = 8; i < 31; i++)
9697
{
97-
bufferBuckets[i] = new ConcurrentBag<WeakReference<byte[]>>();
98+
bufferBuckets[i] = new ConcurrentBag<byte[]>();
9899
}
99100
}
100101

101-
private static void MaintainMemory()
102+
private static async void MaintainMemory()
102103
{
103-
var lastTime = process.TotalProcessorTime;
104104
while (true)
105105
{
106-
autoGcHandle.WaitOne();
107-
Thread.Sleep(10000);
108-
var currentProcTime = process.TotalProcessorTime;
109-
var deltaT = (lastTime - currentProcTime).TotalMilliseconds;
110-
lastTime = currentProcTime;
111-
112-
if (deltaT < 100 && process.WorkingSet64 < MaxMemoryBeforeForceGc)
113-
GC.Collect();
106+
await Task.Delay(10000);
107+
for (int i = 8; i < 31; i++)
108+
{
109+
while(bufferBuckets[i].Count> bucketCapacityLimits[GetBucketSize(i)])
110+
{
111+
if(bufferBuckets[i].TryTake(out var buffer))
112+
AAA.TryRemove(buffer, out _);
113+
}
114+
}
114115
}
115-
116116
}
117117

118118

@@ -125,6 +125,7 @@ private static void MaintainMemory()
125125
[MethodImpl(MethodImplOptions.AggressiveInlining)]
126126
public static byte[] RentBuffer(int size)
127127
{
128+
//return new byte[size];
128129
byte[] buffer;
129130
if (MaxBufferSize < size)
130131
throw new InvalidOperationException(
@@ -133,13 +134,10 @@ public static byte[] RentBuffer(int size)
133134

134135
int idx = GetBucketIndex(size);
135136

136-
while (bufferBuckets[idx].TryTake(out WeakReference<byte[]> bufferRef))
137+
if (bufferBuckets[idx].TryTake(out buffer))
137138
{
138-
if (bufferRef.TryGetTarget(out buffer))
139-
{
140-
weakReferencePool.Add(bufferRef);
141-
return buffer;
142-
}
139+
AAA.TryRemove(buffer,out _);
140+
return buffer;
143141
}
144142

145143
buffer = ByteCopy.GetNewArray(GetBucketSize(idx));
@@ -156,15 +154,14 @@ public static void ReturnBuffer(byte[] buffer)
156154
{
157155
if (buffer.Length <= MinBufferSize) return;
158156

159-
int idx = GetBucketIndex(buffer.Length);
160-
if (weakReferencePool.TryTake(out var wr))
157+
if (!AAA.TryAdd(buffer, null))
161158
{
162-
wr.SetTarget(buffer);
163-
bufferBuckets[idx - 1].Add(wr);
164-
159+
MiniLogger.Log(MiniLogger.LogLevel.Error, "Buffer Pool Duplicated return detected");
160+
return;
165161
}
166-
else
167-
bufferBuckets[idx - 1].Add(new WeakReference<byte[]>(buffer));
162+
163+
int idx = GetBucketIndex(buffer.Length);
164+
bufferBuckets[idx - 1].Add(buffer);
168165
buffer = null;
169166
}
170167

NetworkLibrary/Components/ByteMessageReader.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Runtime.CompilerServices;
3+
using System.Threading;
34

45
namespace NetworkLibrary.Components
56
{
@@ -16,16 +17,14 @@ public class ByteMessageReader
1617
private int currentExpectedByteLenght;
1718
private int originalCapacity;
1819

19-
private readonly Guid Guid;
2020
public event Action<byte[], int, int> OnMessageReady;
2121

2222
private bool awaitingHeader;
2323

24-
public ByteMessageReader(Guid guid, int bufferSize = 256000)
24+
public ByteMessageReader( int bufferSize = 256000)
2525
{
2626
awaitingHeader = true;
2727
currentExpectedByteLenght = 4;
28-
Guid = guid;
2928

3029
headerBuffer = new byte[HeaderLenght];
3130
originalCapacity = bufferSize;
@@ -227,10 +226,17 @@ private void FreeMemory()
227226
BufferPool.ReturnBuffer(internalBufer);
228227
internalBufer = BufferPool.RentBuffer(originalCapacity);
229228
}
230-
229+
231230
public void ReleaseResources()
232231
{
233-
if (internalBufer != null) { BufferPool.ReturnBuffer(internalBufer); internalBufer = null; }
232+
OnMessageReady = null;
233+
234+
var b = Interlocked.Exchange(ref internalBufer, null);
235+
236+
if (b != null)
237+
{
238+
BufferPool.ReturnBuffer(b);
239+
}
234240
}
235241
#endregion
236242
}

0 commit comments

Comments
 (0)