diff --git a/TestApp/Program.cs b/TestApp/Program.cs index dc26fa8..3f82db1 100644 --- a/TestApp/Program.cs +++ b/TestApp/Program.cs @@ -1,55 +1,75 @@ using Microsoft.Extensions.DependencyInjection; using msgqNET; -using msgqNET.implementations.zmq; +using msgqNET.implementations.msgq.queues; using msgqNET.interfaces; using TestApp; // Set up the dependency injection container -var serviceProvider = new ServiceCollection() - .AddSingleton() - .AddMessaging() - .BuildServiceProvider(); + +internal class Program +{ + public static void Main(string endpoint = "backupManagerSP") + { + var serviceProvider = new ServiceCollection() + .AddSingleton() + .AddMessaging() + .BuildServiceProvider(); // Get the event processor from the service provider -var messagingFactory = serviceProvider.GetRequiredService(); -var context = messagingFactory.CreateContext(); + var messagingFactory = serviceProvider.GetRequiredService(); + var context = messagingFactory.CreateContext(); -// var subsocket = messagingFactory.CreateSubSocket(context, "42979", "127.0.0.1", false, false); -// subsocket.SetTimeout(TimeSpan.FromSeconds(5)); + var subsocket = messagingFactory.CreateSubSocket(context, endpoint, "127.0.0.1"); + subsocket.SetTimeout(TimeSpan.FromSeconds(30)); +// + long counter = 0; +// Event processing loop + while (true) + { + Console.WriteLine($"Processing event {counter++}"); + // pubsocket.SendMessage(new FakeMessage()); + // await Task.Delay(1000); // Sleep for 1 second + var x = subsocket.Receive(); + HexDumper.PrintHexDump(x?.GetData() ?? []); + Console.WriteLine(); + Thread.Sleep(1); + } + + + + +// var pubsocket = messagingFactory.CreatePubSocket(context, "example-endpoint"); +// var subsocket = messagingFactory.CreateSubSocket(context, "example-endpoint"); +// +// var publisherTask = Task.Run(async () => +// { +// long counter = 0; +// // Event processing loop +// while (true) +// { +// pubsocket.SendMessage(new ZmqMessage($"hello world [{counter++}]")); +// await Task.Delay(3000); +// } +// }); +// // -// long counter = 0; -// // Event processing loop +// subsocket.SetTimeout(TimeSpan.FromSeconds(5)); // while (true) // { -// Console.WriteLine($"Processing event {counter++}"); -// // pubsocket.SendMessage(new FakeMessage()); -// // await Task.Delay(1000); // Sleep for 1 second -// var x = subsocket.Receive(); -// HexDumper.PrintHexDump(x?.GetData() ?? []); -// Console.WriteLine(); +// HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []); // } + var pubqueue = new MessageQueuePublisher("backupManagerSP", 10486144); + var queue = new MessageQueueSubscriber("backupManagerSP", 10486144); + var queue2 = new MessageQueueSubscriber("backupManagerSP", 10486144); +// Perform operations... - -var pubsocket = messagingFactory.CreatePubSocket(context, "example-endpoint"); -var subsocket = messagingFactory.CreateSubSocket(context, "example-endpoint"); - -var publisherTask = Task.Run(async () => -{ - long counter = 0; - // Event processing loop - while (true) - { - pubsocket.SendMessage(new ZmqMessage($"hello world [{counter++}]")); - await Task.Delay(3000); + pubqueue.Close(); + queue.Close(); + queue2.Close(); } -}); +} -subsocket.SetTimeout(TimeSpan.FromSeconds(5)); -while (true) -{ - HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []); -} diff --git a/TestApp/TestApp.csproj b/TestApp/TestApp.csproj index 54fdfa2..b4c5e8c 100644 --- a/TestApp/TestApp.csproj +++ b/TestApp/TestApp.csproj @@ -5,17 +5,23 @@ net8.0 enable enable - true + true + true + + + + + diff --git a/msgqNET/MessagingServiceExtensions.cs b/msgqNET/MessagingServiceExtensions.cs index a87898e..8956c63 100644 --- a/msgqNET/MessagingServiceExtensions.cs +++ b/msgqNET/MessagingServiceExtensions.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.DependencyInjection; using msgqNET.implementations.fake; +using msgqNET.implementations.msgq; using msgqNET.implementations.zmq; using msgqNET.interfaces; @@ -20,13 +21,26 @@ public static IServiceCollection AddMessaging( // Register your specific implementations // You would replace these with your actual implementations - services.AddTransient(); - services.AddTransient(); - services.AddTransient(); + // RegisterZmqSocket(services); + RegisterMsqSocket(services); services.AddTransient(); return services; } + + private static void RegisterZmqSocket(IServiceCollection services) + { + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + } + + private static void RegisterMsqSocket(IServiceCollection services) + { + services.AddTransient(); + services.AddTransient(); + // services.AddTransient(); + } } public class MessagingOptions diff --git a/msgqNET/implementations/fake/FakeMessage.cs b/msgqNET/implementations/fake/FakeMessage.cs index 4030d72..4f8fba3 100644 --- a/msgqNET/implementations/fake/FakeMessage.cs +++ b/msgqNET/implementations/fake/FakeMessage.cs @@ -16,7 +16,7 @@ public void Init(byte[] data, int size) public void Close() => _data = []; - public int GetSize() => _data.Length; + public long GetSize() => _data.Length; public byte[] GetData() => _data; diff --git a/msgqNET/implementations/msgq/IMsgqMessage.cs b/msgqNET/implementations/msgq/IMsgqMessage.cs new file mode 100644 index 0000000..bba5e83 --- /dev/null +++ b/msgqNET/implementations/msgq/IMsgqMessage.cs @@ -0,0 +1,8 @@ +using msgqNET.interfaces; + +namespace msgqNET.implementations.msgq; + +public interface IMsgqMessage : IMessage +{ + void TakeOwnership(byte[] data, long size); +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqContext.cs b/msgqNET/implementations/msgq/MsgqContext.cs new file mode 100644 index 0000000..f15e477 --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqContext.cs @@ -0,0 +1,11 @@ +using msgqNET.interfaces; + +namespace msgqNET.implementations.msgq; + +public class MsgqContext : IContext +{ + public IntPtr GetRawContext() + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqDeep.cs b/msgqNET/implementations/msgq/MsgqDeep.cs new file mode 100644 index 0000000..ae7d177 --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqDeep.cs @@ -0,0 +1,121 @@ +// using System; +// using System.Diagnostics; +// using System.Threading; +// +// namespace msgqNET.implementations.msgq; +// +// public static class Msgq +// { +// public const int NUM_READERS = 32; // Assuming a constant value +// +// public static void MsgqInitSubscriber(ref MsgqQueue q) +// { +// Debug.Assert(q != null); +// Debug.Assert(q.NumReaders != null); +// +// ulong uid = MsgqGetUid(); +// +// while (true) +// { +// ulong curNumReaders = q.NumReaders.Value; +// ulong newNumReaders = curNumReaders + 1; +// +// // No more slots available. Reset all subscribers to kick out inactive ones +// if (newNumReaders > NUM_READERS) +// { +// Console.WriteLine("Warning, evicting all subscribers!"); +// q.NumReaders.Value = 0; +// +// for (int i = 0; i < NUM_READERS; i++) +// { +// q.ReadValids[i].Value = false; +// +// ulong oldUid = q.ReadUids[i].Value; +// q.ReadUids[i].Value = 0; +// +// // Wake up reader in case they are in a poll +// ThreadSignal((int)(oldUid & 0xFFFFFFFF)); +// } +// +// continue; +// } +// +// // Use atomic compare and exchange to handle race condition where two subscribers start at the same time +// if (Interlocked.CompareExchange(ref q.NumReaders.Value, newNumReaders, curNumReaders) == curNumReaders) +// { +// q.ReaderId = curNumReaders; +// q.ReadUidLocal = uid; +// +// q.ReadValids[curNumReaders].Value = false; +// q.ReadPointers[curNumReaders].Value = 0; +// q.ReadUids[curNumReaders].Value = uid; +// break; +// } +// } +// +// Console.WriteLine($"New subscriber id: {q.ReaderId} uid: {q.ReadUidLocal} {q.Endpoint}"); +// MsgqResetReader(ref q); +// } +// +// // Assuming implementation of GetUid, ThreadSignal, ResetReader and supporting classes +// private static ulong MsgqGetUid() +// { +// // Example UUID generation +// return (ulong)DateTime.Now.Ticks; +// } +// +// private static void ThreadSignal(int uid) +// { +// // Example signal to a specific thread (Placeholder implementation) +// Console.WriteLine($"ThreadSignal called for uid: {uid}"); +// } +// +// private static void MsgqResetReader(ref MsgqQueue q) +// { +// // Reset reader logic (Placeholder implementation) +// Console.WriteLine($"Resetting reader for {q.Endpoint}"); +// } +// } +// +// public class MsgqQueue +// { +// public AtomicUlong NumReaders { get; set; } = new AtomicUlong(); +// public ulong ReaderId { get; set; } +// public ulong ReadUidLocal { get; set; } +// public AtomicBool[] ReadValids { get; set; } = new AtomicBool[Msgq.NUM_READERS]; +// public AtomicUlong[] ReadPointers { get; set; } = new AtomicUlong[Msgq.NUM_READERS]; +// public AtomicUlong[] ReadUids { get; set; } = new AtomicUlong[Msgq.NUM_READERS]; +// public string Endpoint { get; set; } +// +// public MsgqQueue() +// { +// for (int i = 0; i < Msgq.NUM_READERS; i++) +// { +// ReadValids[i] = new AtomicBool(); +// ReadPointers[i] = new AtomicUlong(); +// ReadUids[i] = new AtomicUlong(); +// } +// } +// } +// +// public class AtomicBool +// { +// private int value; +// +// public bool Value +// { +// get => value == 1; +// set => Interlocked.Exchange(ref this.value, value ? 1 : 0); +// } +// } +// +// public class AtomicUlong +// { +// private long value; +// +// public ulong Value +// { +// get => (ulong)Interlocked.Read(ref value); +// set => Interlocked.Exchange(ref this.value, (long)value); +// } +// } \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqMessage.cs b/msgqNET/implementations/msgq/MsgqMessage.cs new file mode 100644 index 0000000..76e0040 --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqMessage.cs @@ -0,0 +1,39 @@ +namespace msgqNET.implementations.msgq; + +public class MsgqMessage : IMsgqMessage +{ + private long _size; + private byte[] _data; + + // Constructor to initialize with size + public MsgqMessage(long size) + { + _size = size; + _data = new byte[size]; + } + + // Constructor to initialize with data and size + public MsgqMessage(byte[] data, long size) + { + _size = size; + _data = new byte[size]; + Array.Copy(data, _data, size); + } + + public void TakeOwnership(byte[] data, long size) + { + _size = size; + _data = data; + } + + public void Close() + { + // Release the data and reset size + _data = []; + _size = 0; + } + + public long GetSize() => _size; + + public byte[] GetData() => _data; +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/MsgqSubSocket.cs b/msgqNET/implementations/msgq/MsgqSubSocket.cs new file mode 100644 index 0000000..3840838 --- /dev/null +++ b/msgqNET/implementations/msgq/MsgqSubSocket.cs @@ -0,0 +1,33 @@ +using System.Diagnostics; +using msgqNET.implementations.msgq.queues; +using msgqNET.interfaces; + +namespace msgqNET.implementations.msgq; + +public class MsgqSubSocket : ISubSocket +{ + private MessageQueueSubscriber? _queue; + private const int Size = 10486144; + private TimeSpan _timeout = TimeSpan.FromMilliseconds(500); + + public bool Connect(IContext context, string endpoint, string address = "127.0.0.1", bool conflate = false, bool checkEndpoint = true) + { + Debug.Assert(context is not null); + Debug.Assert(address == "127.0.0.1"); + + _queue = new MessageQueueSubscriber(endpoint, Size); + return true; + } + + public void SetTimeout(TimeSpan timeout) => _timeout = timeout; + + public IMessage? Receive(bool nonBlocking = false) + { + return _queue?.Read(nonBlocking); + } + + public IntPtr GetRawSocket() + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/MessageQueue.cs b/msgqNET/implementations/msgq/queues/MessageQueue.cs new file mode 100644 index 0000000..7031a22 --- /dev/null +++ b/msgqNET/implementations/msgq/queues/MessageQueue.cs @@ -0,0 +1,132 @@ +using System.IO.MemoryMappedFiles; +using System.Runtime.InteropServices; +using System.Runtime.Versioning; + +namespace msgqNET.implementations.msgq.queues; + +public abstract class MessageQueue +{ + private static readonly int HeaderSize = Marshal.SizeOf(); + protected const int MaxReaders = 15; + private MemoryMappedFile memoryMappedFile; + protected MemoryMappedViewAccessor accessor; + private string ShmPath => OperatingSystem.IsLinux() ? "/dev/shm/" : "/tmp/"; + + public string Path { get; private set; } + public int Size { get; private set; } + public byte[] Data { get; private set; } + + protected record struct MsgQueueHeader + { + public ulong NumReaders; + public ulong WritePointer; + public ulong WriterUid; + + [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] + public ulong[] ReadPointers; + + [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] + public ulong[] ReadValids; + + [MarshalAs(UnmanagedType.ByValArray, SizeConst = MaxReaders)] + public ulong[] ReaderUIDs; + + public MsgQueueHeader(int numReaders) + { + NumReaders = 0; + WritePointer = 0; + ReadPointers = new ulong[numReaders]; + ReadValids = new ulong[numReaders]; + ReaderUIDs = new ulong[numReaders]; + } + } + + protected MsgQueueHeader Header; + + protected MessageQueue(string path, int size) + { + if (size < 0) + { + Console.WriteLine($"Size must be between 0 and {int.MaxValue}"); + } + +#if RELEASE + if (OperatingSystem.IsLinux() || OperatingSystem.IsMacOS()) + RegisterSigUsr2Handler(); +#endif + + try + { + Path = path; + Size = size; + + // Construct full shared memory path + var fullPath = ShmPath + path; // Adjust for platform-specific paths + + // Create or open the memory-mapped file + memoryMappedFile = MemoryMappedFile.CreateFromFile(fullPath, FileMode.OpenOrCreate, null, size); + + // Create a view accessor for the entire file + accessor = memoryMappedFile.CreateViewAccessor(); + + // Initialize or read the header + Header = ReadHeader(); + + // Map the data portion + Data = new byte[size]; + } + catch (Exception ex) + { + Console.WriteLine($"Failed to initialize message queue: {ex.Message}"); + } + } + + private MsgQueueHeader ReadHeader() + { + // Read the header structure from the memory-mapped file + var headerBytes = new byte[HeaderSize]; + accessor.ReadArray(0, headerBytes, 0, HeaderSize); + + // Deserialize the header + var handle = GCHandle.Alloc(headerBytes, GCHandleType.Pinned); + var result = (MsgQueueHeader)Marshal.PtrToStructure(handle.AddrOfPinnedObject(), typeof(MsgQueueHeader)); + handle.Free(); + + return result; + } + + protected void WriteHeader() + { + // Serialize the header to bytes + var headerBytes = new byte[HeaderSize]; + var handle = GCHandle.Alloc(headerBytes, GCHandleType.Pinned); + Marshal.StructureToPtr(Header, handle.AddrOfPinnedObject(), false); + handle.Free(); + + // Write the header back to the memory-mapped file + accessor.WriteArray(0, headerBytes, 0, HeaderSize); + } + + public void Close() + { + accessor?.Dispose(); + memoryMappedFile?.Dispose(); + } + + /// + /// Registers a handler for the SIGUSR2 signal. + /// + [UnsupportedOSPlatform(nameof(OSPlatform.Windows))] + public static void RegisterSigUsr2Handler() + { + throw new NotImplementedException("sigusr2 handling is yet implemented. Maybe .net doesn't support it."); + // PosixSignalRegistration.Create(PosixSignal.SIGUSR2, context => + // { + // Console.WriteLine("SIGUSR2 received. Handling the signal..."); + // // Implement your signal handling logic here + // context.Cancel = true; // Prevents default termination + // }); + } + + protected static long GetUid() => ((long)Environment.ProcessId << 32) | (uint)Environment.CurrentManagedThreadId; +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs new file mode 100644 index 0000000..3cca6a1 --- /dev/null +++ b/msgqNET/implementations/msgq/queues/MessageQueuePublisher.cs @@ -0,0 +1,25 @@ +namespace msgqNET.implementations.msgq.queues; + +public class MessageQueuePublisher : MessageQueue +{ + public MessageQueuePublisher(string path, int size) : base(path, size) + { + if (accessor == null) + throw new InvalidOperationException("Message queue not initialized"); + + var uid = GetUid(); + Header.WriterUid = (ulong)uid; + Header.NumReaders = 0; + for (var i = 0; i < MaxReaders; i++) + { + Header.ReadValids[i] = 0; // false + Header.ReaderUIDs[i] = 0; + } + + WriterUidLocal = uid; + WriteHeader(); + } + + public long WriterUidLocal { get; private set; } + public bool ReadConflate { get; private set; } +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs new file mode 100644 index 0000000..da0d8cf --- /dev/null +++ b/msgqNET/implementations/msgq/queues/MessageQueueSubscriber.cs @@ -0,0 +1,171 @@ +namespace msgqNET.implementations.msgq.queues; + +public class MessageQueueSubscriber : MessageQueue +{ + private static readonly long Uid = GetUid(); + private ulong ReaderId { get; set; } + private bool ReadConflate { get; set; } + + // Expose the underlying buffer and pointers through properties. + // Assume Header is an object that holds the shared state. + private ulong WritePointer => Header.WritePointer; + + private ulong ReadPointer + { + get => Header.ReadPointers[ReaderId]; + set => Header.ReadPointers[ReaderId] = value; + } + + private bool ReadValid + { + get => Header.ReadValids[ReaderId] != 0; + set => Header.ReadValids[ReaderId] = value ? 1UL : 0UL; + } + + private ulong ReaderUid + { + get => Header.ReaderUIDs[ReaderId]; + set => Header.ReaderUIDs[ReaderId] = value; + } + + private readonly object _syncLock = new(); + + public MessageQueueSubscriber(string path, int size) : base(path, size) => InitializeSubscriber(); + + private void InitializeSubscriber() + { + if (accessor == null) + throw new InvalidOperationException("Message queue not initialized"); + + while (true) + { + var curNumReaders = Header.NumReaders; + var newNumReaders = curNumReaders + 1; + + // No more slots available. Reset all subscribers to kick out inactive ones + if (newNumReaders > MaxReaders) + { + KickAllReaders(); + continue; + } + + // Use atomic compare and exchange to handle race condition + if (Interlocked.CompareExchange(ref Header.NumReaders, newNumReaders, curNumReaders) != curNumReaders) + continue; + + ReaderId = curNumReaders; + // Start with read_valid = false + ReadValid = false; // false + ReadPointer = 0; + ReaderUid = (ulong)Uid; + + WriteHeader(); + break; + } + + ResetReader(); + } + + /// + /// This method feels like madness. I am a subscriber. Why am I writing to the header (other than my own fields)? + /// + private void KickAllReaders() + { + Header.NumReaders = 0; + + for (var i = 0; i < MaxReaders; i++) + { + Header.ReadValids[i] = 0; // false + Header.ReaderUIDs[i] = 0; + + var oldUid = Header.ReaderUIDs[i]; + +#if RELEASE + // Wake up reader if they are in a poll + if (oldUid != 0) + { + throw new NotImplementedException("ThreadSignal is not implemented because SIGUSR2 handling is not implemented."); + // SignalThread((int)(oldUid & 0xFFFFFFFF)); + } +#endif + } + + WriteHeader(); + } + + + public void ResetReader() + { + lock (_syncLock) + { + // Sync read pointer with write pointer on reset + // Header.ReadPointers[ReaderId] = Header.WritePointer; + // Header.ReadValids[ReaderId] = 0; // mark as invalid, to be revalidated externally if needed + ReadValid = true; + ReadPointer = WritePointer; + WriteHeader(); + } + } + + public MsgqMessage? Read(bool nonBlocking = false) + { + do + { + lock (_syncLock) + { + // Read the message + var readPointer = new Packed64(ReadPointer); + var writePointer = new Packed64(WritePointer); + + if (readPointer == writePointer) + { + Console.WriteLine($"No message to read because read pointer [{ReadPointer}] is equal to write pointer [{WritePointer}]."); + continue; + } + + // Get size + var size = BitConverter.ToInt64(Data, readPointer.ReadPointer); + + // Check if valid + if (!ReadValid) + { + Console.WriteLine("Read pointer is invalid. Resetting reader."); + ResetReader(); + continue; + } + + if (size == -1) + { + Console.WriteLine("Size is -1. Skipping."); + readPointer.CycleCounter++; + continue; + } + + // // crashing is better than passing garbage data to the consumer + // // the size will have weird value if it was overwritten by data accidentally + // assert((uint64_t)size < q->size); + // assert(size > 0); + + // uint32_t new_read_pointer = ALIGN(read_pointer + sizeof(std::int64_t) + size); + var newReadPointer = new Packed64((ulong)(readPointer.ReadPointer + sizeof(ulong) + size)); + if (ReadConflate && writePointer != newReadPointer) + { + Console.WriteLine("Read conflate is enabled. Skipping."); + ReadPointer = newReadPointer.Value; + continue; + } + + var spn = new Span(Data, readPointer.ReadPointer + sizeof(long), (int)size); + var msg = new MsgqMessage(spn.ToArray(), size); + + ReadPointer = newReadPointer.Value; + if (ReadValid) return msg; + + msg.Close(); + ResetReader(); + } + } while (!nonBlocking); + + return null; + } +} \ No newline at end of file diff --git a/msgqNET/implementations/msgq/queues/Packed64.cs b/msgqNET/implementations/msgq/queues/Packed64.cs new file mode 100644 index 0000000..c3c51b4 --- /dev/null +++ b/msgqNET/implementations/msgq/queues/Packed64.cs @@ -0,0 +1,39 @@ +namespace msgqNET.implementations.msgq.queues; + +public struct Packed64(ulong value) : IEquatable +{ + private ulong _value = value; + + // Constructor to initialize with a 64-bit value + + // Getter for the higher 32 bits + public int CycleCounter + { + get => (int)(_value >> 32); // Shift right by 32 bits to get the higher part + set => _value = ((ulong)value << 32) | (_value & 0xFFFFFFFF); // Set the higher part + } + + // Getter for the lower 32 bits + public int ReadPointer + { + get => (int)(_value & 0xFFFFFFFF); + set => _value = ((ulong)CycleCounter << 32) | (ulong)(value & 0xFFFFFFFF); + } + + // Optionally, allow setting the entire 64-bit value + public ulong Value + { + get => _value; + set => _value = value; + } + + public bool Equals(Packed64 other) => _value == other._value; + + public override bool Equals(object? obj) => obj is Packed64 other && Equals(other); + + public override int GetHashCode() => _value.GetHashCode(); + + public static bool operator ==(Packed64 left, Packed64 right) => left.Equals(right); + + public static bool operator !=(Packed64 left, Packed64 right) => !left.Equals(right); +} \ No newline at end of file diff --git a/msgqNET/implementations/zmq/ZmqMessage.cs b/msgqNET/implementations/zmq/ZmqMessage.cs index f386e81..a1b4622 100644 --- a/msgqNET/implementations/zmq/ZmqMessage.cs +++ b/msgqNET/implementations/zmq/ZmqMessage.cs @@ -46,7 +46,7 @@ public void Close() _netMqMessage.Clear(); } - public int GetSize() + public long GetSize() { return _netMqMessage?.First.BufferSize ?? 0; } diff --git a/msgqNET/interfaces/IMessage.cs b/msgqNET/interfaces/IMessage.cs index b2bde3b..79b8ac2 100644 --- a/msgqNET/interfaces/IMessage.cs +++ b/msgqNET/interfaces/IMessage.cs @@ -2,9 +2,9 @@ namespace msgqNET.interfaces; public interface IMessage { - void Init(int size); - void Init(byte[] data, int size); + // void Init(int size); + // void Init(byte[] data, int size); void Close(); - int GetSize(); + long GetSize(); byte[] GetData(); } \ No newline at end of file diff --git a/msgqNET/msgqNET.csproj b/msgqNET/msgqNET.csproj index f90d2c1..b94238d 100644 --- a/msgqNET/msgqNET.csproj +++ b/msgqNET/msgqNET.csproj @@ -12,8 +12,4 @@ - - - -