Skip to content

msgq implementation #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
90 changes: 55 additions & 35 deletions TestApp/Program.cs
Original file line number Diff line number Diff line change
@@ -1,55 +1,75 @@
using Microsoft.Extensions.DependencyInjection;
using msgqNET;
using msgqNET.implementations.zmq;
using msgqNET.implementations.msgq.queues;
using msgqNET.interfaces;
using TestApp;

// Set up the dependency injection container
var serviceProvider = new ServiceCollection()
.AddSingleton<IEventProcessor, EventProcessor>()
.AddMessaging()
.BuildServiceProvider();

internal class Program
{
public static void Main(string endpoint = "backupManagerSP")
{
var serviceProvider = new ServiceCollection()
.AddSingleton<IEventProcessor, EventProcessor>()
.AddMessaging()
.BuildServiceProvider();

// Get the event processor from the service provider
var messagingFactory = serviceProvider.GetRequiredService<IMessagingFactory>();
var context = messagingFactory.CreateContext();
var messagingFactory = serviceProvider.GetRequiredService<IMessagingFactory>();
var context = messagingFactory.CreateContext();


// var subsocket = messagingFactory.CreateSubSocket(context, "42979", "127.0.0.1", false, false);
// subsocket.SetTimeout(TimeSpan.FromSeconds(5));
var subsocket = messagingFactory.CreateSubSocket(context, endpoint, "127.0.0.1");
subsocket.SetTimeout(TimeSpan.FromSeconds(30));
//
long counter = 0;
// Event processing loop
while (true)
{
Console.WriteLine($"Processing event {counter++}");
// pubsocket.SendMessage(new FakeMessage());
// await Task.Delay(1000); // Sleep for 1 second
var x = subsocket.Receive();
HexDumper.PrintHexDump(x?.GetData() ?? []);
Console.WriteLine();
Thread.Sleep(1);
}




// var pubsocket = messagingFactory.CreatePubSocket(context, "example-endpoint");
// var subsocket = messagingFactory.CreateSubSocket(context, "example-endpoint");
//
// var publisherTask = Task.Run(async () =>
// {
// long counter = 0;
// // Event processing loop
// while (true)
// {
// pubsocket.SendMessage(new ZmqMessage($"hello world [{counter++}]"));
// await Task.Delay(3000);
// }
// });
//
//
// long counter = 0;
// // Event processing loop
// subsocket.SetTimeout(TimeSpan.FromSeconds(5));
// while (true)
// {
// Console.WriteLine($"Processing event {counter++}");
// // pubsocket.SendMessage(new FakeMessage());
// // await Task.Delay(1000); // Sleep for 1 second
// var x = subsocket.Receive();
// HexDumper.PrintHexDump(x?.GetData() ?? []);
// Console.WriteLine();
// HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []);
// }

var pubqueue = new MessageQueuePublisher("backupManagerSP", 10486144);
var queue = new MessageQueueSubscriber("backupManagerSP", 10486144);
var queue2 = new MessageQueueSubscriber("backupManagerSP", 10486144);

// Perform operations...


var pubsocket = messagingFactory.CreatePubSocket(context, "example-endpoint");
var subsocket = messagingFactory.CreateSubSocket(context, "example-endpoint");

var publisherTask = Task.Run(async () =>
{
long counter = 0;
// Event processing loop
while (true)
{
pubsocket.SendMessage(new ZmqMessage($"hello world [{counter++}]"));
await Task.Delay(3000);
pubqueue.Close();
queue.Close();
queue2.Close();
}
});
}


subsocket.SetTimeout(TimeSpan.FromSeconds(5));
while (true)
{
HexDumper.PrintHexDump(subsocket.Receive()?.GetData() ?? []);
}
8 changes: 7 additions & 1 deletion TestApp/TestApp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PublishAot>true</PublishAot>
<!-- <PublishAot>true</PublishAot>-->
<InvariantGlobalization>true</InvariantGlobalization>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="System.CommandLine.DragonFruit" Version="0.4.0-alpha.22272.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\msgqNET\msgqNET.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="publish\" />
</ItemGroup>

</Project>
20 changes: 17 additions & 3 deletions msgqNET/MessagingServiceExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using msgqNET.implementations.fake;
using msgqNET.implementations.msgq;
using msgqNET.implementations.zmq;
using msgqNET.interfaces;

Expand All @@ -20,13 +21,26 @@ public static IServiceCollection AddMessaging(

// Register your specific implementations
// You would replace these with your actual implementations
services.AddTransient<IContext, ZmqContext>();
services.AddTransient<ISubSocket, ZmqSubSocket>();
services.AddTransient<IPubSocket, ZmqPubSocket>();
// RegisterZmqSocket(services);
RegisterMsqSocket(services);
services.AddTransient<IPoller, FakePoller>();

return services;
}

private static void RegisterZmqSocket(IServiceCollection services)
{
services.AddTransient<IContext, ZmqContext>();
services.AddTransient<ISubSocket, ZmqSubSocket>();
services.AddTransient<IPubSocket, ZmqPubSocket>();
}

private static void RegisterMsqSocket(IServiceCollection services)
{
services.AddTransient<IContext, MsgqContext>();
services.AddTransient<ISubSocket, MsgqSubSocket>();
// services.AddTransient<IPubSocket, Msgq>();
}
}

public class MessagingOptions
Expand Down
2 changes: 1 addition & 1 deletion msgqNET/implementations/fake/FakeMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void Init(byte[] data, int size)

public void Close() => _data = [];

public int GetSize() => _data.Length;
public long GetSize() => _data.Length;

public byte[] GetData() => _data;

Expand Down
8 changes: 8 additions & 0 deletions msgqNET/implementations/msgq/IMsgqMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using msgqNET.interfaces;

namespace msgqNET.implementations.msgq;

public interface IMsgqMessage : IMessage
{
void TakeOwnership(byte[] data, long size);
}
11 changes: 11 additions & 0 deletions msgqNET/implementations/msgq/MsgqContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using msgqNET.interfaces;

namespace msgqNET.implementations.msgq;

public class MsgqContext : IContext
{
public IntPtr GetRawContext()
{
throw new NotImplementedException();
}
}
121 changes: 121 additions & 0 deletions msgqNET/implementations/msgq/MsgqDeep.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// using System;
// using System.Diagnostics;
// using System.Threading;
//
// namespace msgqNET.implementations.msgq;
//
// public static class Msgq
// {
// public const int NUM_READERS = 32; // Assuming a constant value
//
// public static void MsgqInitSubscriber(ref MsgqQueue q)
// {
// Debug.Assert(q != null);
// Debug.Assert(q.NumReaders != null);
//
// ulong uid = MsgqGetUid();
//
// while (true)
// {
// ulong curNumReaders = q.NumReaders.Value;
// ulong newNumReaders = curNumReaders + 1;
//
// // No more slots available. Reset all subscribers to kick out inactive ones
// if (newNumReaders > NUM_READERS)
// {
// Console.WriteLine("Warning, evicting all subscribers!");
// q.NumReaders.Value = 0;
//
// for (int i = 0; i < NUM_READERS; i++)
// {
// q.ReadValids[i].Value = false;
//
// ulong oldUid = q.ReadUids[i].Value;
// q.ReadUids[i].Value = 0;
//
// // Wake up reader in case they are in a poll
// ThreadSignal((int)(oldUid & 0xFFFFFFFF));
// }
//
// continue;
// }
//
// // Use atomic compare and exchange to handle race condition where two subscribers start at the same time
// if (Interlocked.CompareExchange(ref q.NumReaders.Value, newNumReaders, curNumReaders) == curNumReaders)
// {
// q.ReaderId = curNumReaders;
// q.ReadUidLocal = uid;
//
// q.ReadValids[curNumReaders].Value = false;
// q.ReadPointers[curNumReaders].Value = 0;
// q.ReadUids[curNumReaders].Value = uid;
// break;
// }
// }
//
// Console.WriteLine($"New subscriber id: {q.ReaderId} uid: {q.ReadUidLocal} {q.Endpoint}");
// MsgqResetReader(ref q);
// }
//
// // Assuming implementation of GetUid, ThreadSignal, ResetReader and supporting classes
// private static ulong MsgqGetUid()
// {
// // Example UUID generation
// return (ulong)DateTime.Now.Ticks;
// }
//
// private static void ThreadSignal(int uid)
// {
// // Example signal to a specific thread (Placeholder implementation)
// Console.WriteLine($"ThreadSignal called for uid: {uid}");
// }
//
// private static void MsgqResetReader(ref MsgqQueue q)
// {
// // Reset reader logic (Placeholder implementation)
// Console.WriteLine($"Resetting reader for {q.Endpoint}");
// }
// }
//
// public class MsgqQueue
// {
// public AtomicUlong NumReaders { get; set; } = new AtomicUlong();
// public ulong ReaderId { get; set; }
// public ulong ReadUidLocal { get; set; }
// public AtomicBool[] ReadValids { get; set; } = new AtomicBool[Msgq.NUM_READERS];
// public AtomicUlong[] ReadPointers { get; set; } = new AtomicUlong[Msgq.NUM_READERS];
// public AtomicUlong[] ReadUids { get; set; } = new AtomicUlong[Msgq.NUM_READERS];
// public string Endpoint { get; set; }
//
// public MsgqQueue()
// {
// for (int i = 0; i < Msgq.NUM_READERS; i++)
// {
// ReadValids[i] = new AtomicBool();
// ReadPointers[i] = new AtomicUlong();
// ReadUids[i] = new AtomicUlong();
// }
// }
// }
//
// public class AtomicBool
// {
// private int value;
//
// public bool Value
// {
// get => value == 1;
// set => Interlocked.Exchange(ref this.value, value ? 1 : 0);
// }
// }
//
// public class AtomicUlong
// {
// private long value;
//
// public ulong Value
// {
// get => (ulong)Interlocked.Read(ref value);
// set => Interlocked.Exchange(ref this.value, (long)value);
// }
// }
39 changes: 39 additions & 0 deletions msgqNET/implementations/msgq/MsgqMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace msgqNET.implementations.msgq;

public class MsgqMessage : IMsgqMessage
{
private long _size;
private byte[] _data;

// Constructor to initialize with size
public MsgqMessage(long size)
{
_size = size;
_data = new byte[size];
}

// Constructor to initialize with data and size
public MsgqMessage(byte[] data, long size)
{
_size = size;
_data = new byte[size];
Array.Copy(data, _data, size);
}

public void TakeOwnership(byte[] data, long size)
{
_size = size;
_data = data;
}

public void Close()
{
// Release the data and reset size
_data = [];
_size = 0;
}

public long GetSize() => _size;

public byte[] GetData() => _data;
}
Loading