diff --git a/Akka.sln b/Akka.sln index 4838b6b2d75..294c0c1c586 100644 --- a/Akka.sln +++ b/Akka.sln @@ -279,6 +279,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.TestKit.Xunit2.Tests", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.TestKit.Xunit.Tests", "src\contrib\testkits\Akka.TestKit.Xunit.Tests\Akka.TestKit.Xunit.Tests.csproj", "{F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Delivery", "Delivery", "{E0F10060-F4D1-415C-A555-8A0821883F3A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DeliveryProfile", "src\examples\Cluster\Delivery\DeliveryProfile\DeliveryProfile.csproj", "{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1318,6 +1322,18 @@ Global {F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}.Release|x64.Build.0 = Release|Any CPU {F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}.Release|x86.ActiveCfg = Release|Any CPU {F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}.Release|x86.Build.0 = Release|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x64.ActiveCfg = Debug|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x64.Build.0 = Debug|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x86.ActiveCfg = Debug|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x86.Build.0 = Debug|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|Any CPU.Build.0 = Release|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x64.ActiveCfg = Release|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x64.Build.0 = Release|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x86.ActiveCfg = Release|Any CPU + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1441,6 +1457,8 @@ Global {337A85B5-4A7C-4883-8634-46E7E52A765F} = {7735F35A-E7B7-44DE-B6FB-C770B53EB69C} {95017C99-E960-44E5-83AD-BF21461DF06F} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E} {F80F41E6-E5C7-4C92-B1CF-42539ECFBE68} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E} + {E0F10060-F4D1-415C-A555-8A0821883F3A} = {C50E1A9E-820C-4E75-AE39-6F96A99AC4A7} + {33ACE430-9CB2-4DDF-8E77-C993D32F36FD} = {E0F10060-F4D1-415C-A555-8A0821883F3A} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164} diff --git a/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj b/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj index 5ec8b6777fe..311372a2fd0 100644 --- a/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj +++ b/src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj @@ -13,6 +13,7 @@ + diff --git a/src/benchmark/Akka.Benchmarks/Cluster/Delivery/ClusterDeliveryBenchmark.cs b/src/benchmark/Akka.Benchmarks/Cluster/Delivery/ClusterDeliveryBenchmark.cs new file mode 100644 index 00000000000..03667053205 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Cluster/Delivery/ClusterDeliveryBenchmark.cs @@ -0,0 +1,121 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Cluster.Sharding; +using Akka.Cluster.Sharding.Delivery; +using Akka.Configuration; +using Akka.Delivery; +using Akka.Persistence.Delivery; +using Akka.Util; +using BenchmarkDotNet.Attributes; +using FluentAssertions.Extensions; + +namespace Akka.Benchmarks.Cluster.Delivery; + +[Config(typeof(MacroBenchmarkConfig))] +[IterationCount(100)] +public class ClusterDeliveryBenchmark +{ + private static readonly Config Config = + """ + akka.loglevel = WARNING + akka.actor.provider = cluster + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem" + akka.remote.dot-netty.tcp.port = 0 + # akka.reliable-delivery.sharding.producer-controller.buffer-size = 10000 + # akka.reliable-delivery.sharding.consumer-controller.buffer-size = 10000 + # akka.reliable-delivery.consumer-controller.flow-control-window = 1000 + """; + + private ActorSystem _system; + private IActorRef? _producer; + private IActorRef? _region; + private IActorRef? _controller; + private IActorRef? _aggregator; + + private const int MessageCount = 800; + + [Params(false, true)] + public bool UseSingleState; + + [GlobalSetup] + public void GlobalSetup() + { + _system = ActorSystem.Create("BenchmarkSystem", Config); + + // Join cluster + var tcs = new TaskCompletionSource(); + var cluster = Akka.Cluster.Cluster.Get(_system); + cluster.RegisterOnMemberUp(() => + { + tcs.SetResult(); + }); + cluster.Join(cluster.SelfAddress); + tcs.Task.WaitAsync(TimeSpan.FromSeconds(3)).GetAwaiter().GetResult(); + + // Get the test completed aggregator + _aggregator = _system.ActorOf(Props.Create(() => new AggregateActor(MessageCount))); + + // Register the sharding region for later use + _region = ClusterSharding.Get(_system).StartAsync( + typeName: "TestConsumer", + entityPropsFactory: id => ShardingConsumerController.Create( + c => Props.Create(() => new TestConsumerEntity(id, c, _aggregator)), + ShardingConsumerController.Settings.Create(_system)), + settings: ClusterShardingSettings.Create(_system), + messageExtractor: new MessageExtractor()) + .WaitAsync(3.Seconds()).GetAwaiter().GetResult(); + + // Create the ShardingProducerController + _controller = _system.ActorOf( + ShardingProducerController.Create( + producerId: "test-producer", + shardRegion: _region!, + durableQueue: Option.None, + settings: ShardingProducerController.Settings.Create(_system) + ), + "producerController" + ); + + // Create the producer actor + _producer = _system.ActorOf(Props.Create(() => new ProducerActor(_controller, UseSingleState)), "producer"); + + // Debug + var consumerSettings = ConsumerController.Settings.Create(_system); + Console.WriteLine($"ConsumerController.Settings.FlowControlWindow: {consumerSettings.FlowControlWindow}"); + var shardingProducerSettings = ShardingProducerController.Settings.Create(_system); + Console.WriteLine($"ShardingProducerController.Settings.BufferSize: {shardingProducerSettings.BufferSize}"); + var shardingConsumerSettings = ShardingConsumerController.Settings.Create(_system); + Console.WriteLine($"ShardingConsumerController.Settings.BufferSize: {shardingConsumerSettings.BufferSize}"); + } + + [GlobalCleanup] + public void Teardown() + { + _system.Terminate().WaitAsync(30.Seconds()).GetAwaiter().GetResult(); + _aggregator = null; + _producer = null; + _region = null; + _controller = null; + } + + [IterationSetup] + public void IterationSetup() + { + _aggregator.Ask(Reset.Instance).GetAwaiter().GetResult(); + } + + [Benchmark(OperationsPerInvoke = MessageCount)] + public async Task ClusterShardingDeliveryMessageThroughputBenchmark() + { + foreach (var message in Enumerable.Range(0, MessageCount)) + { + _producer.Tell(message); + } + + await _aggregator.Ask(GetCompleted.Instance); + } +} diff --git a/src/benchmark/Akka.Benchmarks/Cluster/Delivery/Shared.cs b/src/benchmark/Akka.Benchmarks/Cluster/Delivery/Shared.cs new file mode 100644 index 00000000000..e60abe5153f --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/Cluster/Delivery/Shared.cs @@ -0,0 +1,188 @@ +using System.Collections.Generic; +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Cluster.Sharding.Delivery; +using Akka.Delivery; +using Akka.Event; + +namespace Akka.Benchmarks.Cluster.Delivery; + +#region Messages + +internal record Job(int Payload); + +internal class GetCompleted +{ + public static readonly GetCompleted Instance = new(); + private GetCompleted() { } +} + +internal class Completed +{ + public static readonly Completed Instance = new(); + private Completed() { } +} + +internal class Reset +{ + public static readonly Reset Instance = new(); + private Reset() { } +} + +internal class Start +{ + public static readonly Start Instance = new(); + private Start() { } +} + +#endregion + +#region Classes + +// The entity actor +internal class TestConsumerEntity : ReceiveActor +{ + private readonly IActorRef _aggregator; + private readonly string _entityId; + private readonly IActorRef _consumerController; + private readonly ILoggingAdapter _log; + + public TestConsumerEntity(string entityId, IActorRef consumerController, IActorRef aggregator) + { + _entityId = entityId; + _consumerController = consumerController; + _aggregator = aggregator; + _log = Context.GetLogger(); + + Receive>(delivery => + { + _aggregator.Tell(Done.Instance); + delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance); + }); + } + + protected override void PreStart() + { + _consumerController.Tell(new ConsumerController.Start(Self)); + } +} + +// Message extractor for sharding +internal class MessageExtractor() : HashCodeMessageExtractor(10) +{ + public override string EntityId(object message) => + message is Job cmd ? (cmd.Payload % 3).ToString() : string.Empty; +} + +// The producer actor +internal class ProducerActor : ReceiveActor, IWithStash +{ + private readonly IActorRef _producerController; + private IActorRef _sendNext = ActorRefs.Nobody; + private readonly ILoggingAdapter _log; + + public ProducerActor(IActorRef producerController, bool singleState) + { + _log = Context.GetLogger(); + _producerController = producerController; + if(singleState) + Become(SingleState); + else + Become(Idle); + } + + public IStash Stash { get; set; } = null!; + + protected override void PreStart() + { + _producerController.Tell(new ShardingProducerController.Start(Self)); + } + + private void SingleState() + { + Receive>(next => + { + _sendNext = next.SendNextTo; + Stash.Unstash(); + }); + + Receive(msg => + { + if(ReferenceEquals(_sendNext, ActorRefs.Nobody)) + { + Stash.Stash(); + } + else + { + _sendNext.Tell(new ShardingEnvelope(msg.ToString(), new Job(msg))); + _sendNext = ActorRefs.Nobody; + } + }); + } + + private void Idle() + { + Receive>(next => + { + _sendNext = next.SendNextTo; + Stash.Unstash(); + Become(Active); + }); + + Receive(_ => + { + Stash.Stash(); + }); + } + + private void Active() + { + Receive(msg => + { + Become(Idle); + _sendNext.Tell(new ShardingEnvelope(msg.ToString(), new Job(msg))); + }); + + Receive>(next => + { + _sendNext = next.SendNextTo; + }); + } +} + +internal class AggregateActor : ReceiveActor +{ + private IActorRef? _reportTo; + private readonly int _totalMessageCount; + private int _messageCount; + + public AggregateActor(int messageCount) + { + _totalMessageCount = messageCount; + Receive(_ => + { + _messageCount++; + if (_messageCount < _totalMessageCount || _reportTo == null) + return; + + _reportTo.Tell(Completed.Instance); + }); + Receive(_ => + { + _messageCount = 0; + Sender.Tell(Done.Instance); + }); + Receive(_ => + { + if (_messageCount >= _totalMessageCount) + { + Sender.Tell(Completed.Instance); + return; + } + + _reportTo = Sender; + }); + } +} + +#endregion diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs index 4f79a40454e..21d4bd85e90 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/ReliableDeliveryShardingSpec.cs @@ -27,7 +27,7 @@ namespace Akka.Cluster.Sharding.Tests.Delivery; public class ReliableDeliveryShardingSpec : TestKit.Xunit2.TestKit { public static Config Configuration = @" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = cluster akka.remote.dot-netty.tcp.port = 0 akka.reliable-delivery.consumer-controller.flow-control-window = 20 @@ -63,7 +63,7 @@ public async Task ReliableDelivery_with_Sharding_must_illustrate_Sharding_usage( var consumerEndProbe = CreateTestProbe(); var region = await ClusterSharding.Get(Sys).StartAsync($"TestConsumer-{_idCount}", _ => ShardingConsumerController.Create(c => - PropsFor(DefaultConsumerDelay, 42, consumerEndProbe.Ref, c), + PropsFor(TimeSpan.Zero, 500, consumerEndProbe.Ref, c), ShardingConsumerController.Settings.Create(Sys)), ClusterShardingSettings.Create(Sys), HashCodeMessageExtractor.Create(10, o => string.Empty, o => o)); @@ -550,7 +550,7 @@ private class TestShardingProducer : ReceiveActor, IWithTimers public TestShardingProducer(IActorRef producerController) { _producerController = producerController; - Idle(0); + Idle(-1); } public ITimerScheduler Timers { get; set; } = null!; @@ -576,21 +576,14 @@ protected override void PreStart() private void Idle(int n) { Receive(_ => { }); // ignore - Receive(next => { Become(() => Active(n + 1, next.SendNextTo)); }); - } - - private void Active(int n, IActorRef sendTo) - { - Receive(_ => + Receive(next => { + _log.Info("TRACK:[ProAct.Rcv-RequestNext,{0},{1}]", next.SendNextTo.Path.Name, DateTime.UtcNow.ToString("mm:ss.ffff")); + n++; var msg = $"msg-{n}"; var entityId = $"entity-{n % 3}"; - _log.Info("Sending {0} to {1}", msg, entityId); - sendTo.Tell(new ShardingEnvelope(entityId, new Job(msg))); - Become(() => Idle(n)); + next.SendNextTo.Tell(new ShardingEnvelope(entityId, new Job(msg))); }); - - Receive(_ => { }); // already active } public sealed class Tick diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs index b2d4df577e5..73473358281 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingProducerControllerImpl.cs @@ -7,6 +7,7 @@ #nullable enable using System; +using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Threading; @@ -276,6 +277,8 @@ private void OnMsg(EntityId entityId, T msg, Option replyTo, TotalSeq // there is demand, send immediately if (outState.NextTo.HasValue) { + _log.Info("TRACK:[ProducerController.OnMsg-Send,{0},{1},{2}]", msg?.ToString(), entityId, DateTime.UtcNow.ToString("mm:ss.ffff")); + Send(msg, outKey, outState.SeqNr, outState.NextTo.Value); var newUnconfirmed = outState.Unconfirmed.Add(new Unconfirmed(totalSeqNr, outState.SeqNr, replyTo)); @@ -309,6 +312,9 @@ outState with OutStates = CurrentState.OutStates.SetItem(outKey, outState with { Buffered = newBuffered }), ReplyAfterStore = newReplyAfterStore }; + + _log.Info("TRACK:[ProducerController.OnMsg-RequestNext,{0},{1}]", entityId, DateTime.UtcNow.ToString("mm:ss.ffff")); + // send an updated RequestNext to indicate buffer usage CurrentState.Producer.Tell(CreateRequestNext(newS)); CurrentState = newS; @@ -326,6 +332,7 @@ outState with Context.ActorOf( Props.Create(() => new ProducerController(outKey, Option.None, customSend, Settings.ProducerControllerSettings, _timeProvider, null)), Uri.EscapeDataString(entityId)); + _log.Info("TRACK:[ProducerController.OnMsg-Start,{0},{1}]", entityId, DateTime.UtcNow.ToString("mm:ss.ffff")); producer.Tell(new ProducerController.Start(RequestNextAdapter)); CurrentState = CurrentState with { @@ -346,6 +353,8 @@ private ImmutableList> OnAck(OutState outState, long confirmed if (confirmed.Any()) { + _log.Info("TRACK:[ProducerController.OnAck,{0},{1}]", outState.EntityId, DateTime.UtcNow.ToString("mm:ss.ffff")); + foreach (var c in confirmed) { switch (c) @@ -443,6 +452,8 @@ private void ReceiveWrappedRequestNext(WrappedRequestNext w) if (outState.Buffered.Any()) { var buf = outState.Buffered.First(); + _log.Info("TRACK:[ProducerController.ReceiveWrappedRequestNext-Send,{0},{1}]", outState.EntityId, DateTime.UtcNow.ToString("mm:ss.ffff")); + Send(buf.Msg, outKey, outState.SeqNr, next.SendNextTo); var newUnconfirmed2 = newUnconfirmed.Add(new Unconfirmed(buf.TotalSeqNr, outState.SeqNr, buf.ReplyTo)); @@ -469,6 +480,8 @@ outState with }); var newState = CurrentState with { OutStates = newProducers }; + _log.Info("TRACK:[ProducerController.ReceiveWrappedRequestNext-RequestNext,{0},{1}]", outState.EntityId, DateTime.UtcNow.ToString("mm:ss.ffff")); + // send an updated RequestNext CurrentState.Producer.Tell(CreateRequestNext(newState)); CurrentState = newState; @@ -485,6 +498,7 @@ private void ReceiveStart(Start start) { ProducerController.AssertLocalProducer(start.Producer); _log.Debug("Register new Producer [{0}], currentSeqNr [{1}].", start.Producer, CurrentState.CurrentSeqNr); + _log.Info("TRACK:[ProducerController.ReceiveStart-RequestNext,{0},{1}]", "", DateTime.UtcNow.ToString("mm:ss.ffff")); start.Producer.Tell(CreateRequestNext(CurrentState)); CurrentState = CurrentState with { Producer = start.Producer }; } diff --git a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj index 32c67dcd1ca..4f066eb2efe 100644 --- a/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj +++ b/src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj @@ -5,6 +5,7 @@ + diff --git a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs index 45a35c1d26e..aa99b3f356b 100644 --- a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs @@ -563,6 +563,9 @@ private void Deliver(SequencedMessage seqMsg) var assembledSeqMsg = !seqMsg.Message.IsMessage ? AssembleChunks(previouslyCollectedChunks.Add(seqMsg)) : seqMsg; + + _log.Info("TRACK:[ConsumerController.Deliver-Delivery,{0},{1}]", "", DateTime.UtcNow.ToString("mm:ss.ffff")); + CurrentState.Consumer.Tell(new Delivery(assembledSeqMsg.Message.Message!, Context.Self, seqMsg.ProducerId, seqMsg.SeqNr)); CurrentState = CurrentState.ClearCollectedChunks(); diff --git a/src/examples/Cluster/ClusterSharding/ShoppingCart/TimingExample.cs b/src/examples/Cluster/ClusterSharding/ShoppingCart/TimingExample.cs new file mode 100644 index 00000000000..978da73fa03 --- /dev/null +++ b/src/examples/Cluster/ClusterSharding/ShoppingCart/TimingExample.cs @@ -0,0 +1,253 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Cluster.Sharding.Delivery; +using Akka.Configuration; +using Akka.Delivery; +using Akka.Event; +using Akka.Util; +using ShoppingCart; + +namespace Akka.Cluster.Sharding.Examples.ShoppingCart; + +/// +/// Example demonstrating how to use the timing and tracing capabilities +/// of Akka.Cluster.Sharding.Delivery +/// +public class TimingExample +{ + /* + public static async Task Main() + { + var config = ConfigurationFactory.ParseString(@" + akka { + loglevel = DEBUG + actor { + provider = cluster + } + remote { + dot-netty.tcp { + hostname = ""127.0.0.1"" + port = 2551 + } + } + cluster { + seed-nodes = [""akka.tcp://TimingExample@127.0.0.1:2551""] + } + reliable-delivery { + sharding { + producer-controller { + buffer-size = 1000 + internal-ask-timeout = 60s + cleanup-unused-after = 60s + resend-first-unconfirmed-idle-timeout = 10s + } + consumer-controller { + buffer-size = 1000 + allow-bypass = false + } + } + } + }"); + + using var system = ActorSystem.Create("TimingExample", config); + + // Start cluster + var cluster = Cluster.Get(system); + cluster.Join(cluster.SelfAddress); + + // Create ShardRegion with timing-enabled consumer controller + var shardRegion = ClusterSharding.Get(system).Start( + typeName: "ShoppingCart", + entityProps: ShardingConsumerController.Create( + consumerProps: consumerController => Props.Create(() => new ShoppingCart(consumerController)), + settings: ShardingConsumerController.Settings.Create(system)), + settings: ClusterShardingSettings.Create(system), + messageExtractor: new MessageExtractor(10)); + + // Create producer controller + var producerController = system.ActorOf( + ShardingProducerController.Create( + producerId: "timing-producer", + shardRegion: shardRegion, + durableQueue: Option.None, + settings: ShardingProducerController.Settings.Create(system)), + "producer-controller"); + + // Create producer actor + var producer = system.ActorOf(Props.Create(() => new TimingProducer(producerController)), "producer"); + + // Start the producer + producer.Tell(new ShardingProducerController.Start(producer)); + + // Send some messages to test timing + for (int i = 0; i < 5; i++) + { + var item = new CartItem($"item-{i}", $"Product {i}", 10.0m + i); + producer.Tell(new ShardingEnvelope($"user-{i % 3}", item)); + + await Task.Delay(100); // Small delay between messages + } + + // Wait for messages to be processed + await Task.Delay(2000); + + Console.WriteLine("Timing example completed. Check logs for timing information."); + await system.Terminate(); + } + + private static Option<(string, object)> ExtractEntityId(object message) + { + if (message is ShardingEnvelope envelope) + { + return (envelope.EntityId, envelope.Message); + } + return ("", message); + } + + private static string ExtractShardId(object message) + { + if (message is ShardingEnvelope envelope) + { + return envelope.EntityId.GetHashCode().ToString(); + } + return "0"; + } +} + +/// +/// Producer actor that demonstrates timing capabilities +/// +public class TimingProducer : ReceiveActor +{ + private readonly ILoggingAdapter _log = Context.GetLogger(); + private IActorRef? _sendNext; + + public TimingProducer(IActorRef producerController) + { + Receive>(next => + { + _log.Info("Received RequestNext - ready to send messages"); + _sendNext = next.SendNextTo; + Become(Active); + }); + + Receive(envelope => + { + if (_sendNext != null) + { + _log.Info("Sending message to entity [{0}]", envelope.EntityId); + _sendNext.Tell(envelope); + Become(Idle); + } + else + { + Stash.Stash(); + } + }); + } + + private void Active() + { + Receive(envelope => + { + _log.Info("Sending message to entity [{0}]", envelope.EntityId); + _sendNext!.Tell(envelope); + Become(Idle); + }); + + Receive>(next => + { + _sendNext = next.SendNextTo; + }); + } + + private void Idle() + { + Receive(envelope => + { + Stash.Stash(); + }); + + Receive>(next => + { + _sendNext = next.SendNextTo; + Stash.Unstash(); + Become(Active); + }); + } + + public IStash Stash { get; set; } = null!; +} + +/// +/// Shopping cart entity that demonstrates timing capabilities +/// +public class ShoppingCart : ReceiveActor +{ + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly IActorRef _consumerController; + + public ShoppingCart(IActorRef consumerController) + { + _consumerController = consumerController; + + Receive>(start => + { + _log.Info("ShoppingCart started - ready to receive messages"); + Context.Watch(start.DeliverTo); + Become(Active); + }); + } + + private void Active() + { + Receive>(delivery => + { + _log.Info("Received delivery for item: {0}", delivery.Message); + + // Simulate some processing time + Task.Delay(50).ContinueWith(_ => + { + delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance); + }); + }); + + Receive(terminated => + { + _log.Info("Consumer terminated"); + Context.Stop(Self); + }); + } +} + +/// +/// Simple cart item for demonstration +/// +public class CartItem +{ + public CartItem(string id, string name, decimal price) + { + Id = id; + Name = name; + Price = price; + } + + public string Id { get; } + public string Name { get; } + public decimal Price { get; } + + public override string ToString() + { + return $"CartItem({Id}, {Name}, {Price:C})"; + } +*/ +} \ No newline at end of file diff --git a/src/examples/Cluster/Delivery/DeliveryProfile/DeliveryProfile.csproj b/src/examples/Cluster/Delivery/DeliveryProfile/DeliveryProfile.csproj new file mode 100644 index 00000000000..b366e40ff60 --- /dev/null +++ b/src/examples/Cluster/Delivery/DeliveryProfile/DeliveryProfile.csproj @@ -0,0 +1,19 @@ + + + + Exe + net8.0 + enable + enable + false + + + + + + + + + + + diff --git a/src/examples/Cluster/Delivery/DeliveryProfile/Program.cs b/src/examples/Cluster/Delivery/DeliveryProfile/Program.cs new file mode 100644 index 00000000000..df241acf3ff --- /dev/null +++ b/src/examples/Cluster/Delivery/DeliveryProfile/Program.cs @@ -0,0 +1,102 @@ +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Cluster.Sharding.Delivery; +using Akka.Configuration; +using Akka.Delivery; +using Akka.Util; +using DeliveryProfile; +using JetBrains.Profiler.SelfApi; + +public static class Program +{ + private static readonly Config Config = + """ + akka.loglevel = WARNING + akka.actor.provider = cluster + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem" + akka.remote.dot-netty.tcp.port = 0 + # akka.reliable-delivery.sharding.producer-controller.buffer-size = 10000 + # akka.reliable-delivery.sharding.consumer-controller.buffer-size = 10000 + akka.reliable-delivery.consumer-controller.flow-control-window = 1000 + """; + + private static ActorSystem? _system; + private static IActorRef? _producer; + private static IActorRef? _region; + private static IActorRef? _controller; + private static IActorRef? _aggregator; + + //[Params(3000)] + private const int MessageCount = 3000; + + public static async Task Main(string[] args) + { + /* + await DotTrace.InitAsync(); + var traceConfig = new DotTrace.Config() + .SaveToDir("G:\\dotTraceSnapshots\\ClusterShardingDelivery") + .UseTimelineProfilingType(true); + + DotTrace.Attach(traceConfig); + */ + + + _system = ActorSystem.Create("BenchmarkSystem", Config); + + // Join cluster + var tcs = new TaskCompletionSource(); + var cluster = Akka.Cluster.Cluster.Get(_system); + cluster.RegisterOnMemberUp(() => + { + tcs.SetResult(); + }); + cluster.Join(cluster.SelfAddress); + await tcs.Task; + + // Get the test completed aggregator + _aggregator = _system.ActorOf(Props.Create(() => new AggregateActor(MessageCount))); + + // Register the sharding region for later use + _region = await ClusterSharding.Get(_system).StartAsync( + typeName: "TestConsumer", + entityPropsFactory: id => ShardingConsumerController.Create( + c => Props.Create(() => new TestConsumerEntity(id, c, _aggregator)), + ShardingConsumerController.Settings.Create(_system)), + settings: ClusterShardingSettings.Create(_system), + messageExtractor: new MessageExtractor()); + + // Create the ShardingProducerController + _controller = _system.ActorOf( + ShardingProducerController.Create( + producerId: "test-producer", + shardRegion: _region!, + durableQueue: Option.None, + settings: ShardingProducerController.Settings.Create(_system) + ), + "producerController" + ); + + // Create the producer actor + _producer = _system.ActorOf(Props.Create(() => new ProducerActor(_controller, true)), "producer"); + + // Debug + var consumerSettings = ConsumerController.Settings.Create(_system); + Console.WriteLine($"ConsumerController.Settings.FlowControlWindow: {consumerSettings.FlowControlWindow}"); + var shardingProducerSettings = ShardingProducerController.Settings.Create(_system); + Console.WriteLine($"ShardingProducerController.Settings.BufferSize: {shardingProducerSettings.BufferSize}"); + var shardingConsumerSettings = ShardingConsumerController.Settings.Create(_system); + Console.WriteLine($"ShardingConsumerController.Settings.BufferSize: {shardingConsumerSettings.BufferSize}"); + + // DotTrace.StartCollectingData(); + foreach (var message in Enumerable.Range(0, MessageCount)) + { + _producer.Tell(message); + } + + await _aggregator.Ask(GetCompleted.Instance); + // DotTrace.SaveData(); + + await _system.Terminate(); + } +} \ No newline at end of file diff --git a/src/examples/Cluster/Delivery/DeliveryProfile/Shared.cs b/src/examples/Cluster/Delivery/DeliveryProfile/Shared.cs new file mode 100644 index 00000000000..817410364bd --- /dev/null +++ b/src/examples/Cluster/Delivery/DeliveryProfile/Shared.cs @@ -0,0 +1,188 @@ +using Akka; +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Cluster.Sharding.Delivery; +using Akka.Delivery; +using Akka.Event; + +namespace DeliveryProfile; + +#region Messages + +internal record Job(int Payload); + +internal class GetCompleted +{ + public static readonly GetCompleted Instance = new(); + private GetCompleted() { } +} + +internal class Completed +{ + public static readonly Completed Instance = new(); + private Completed() { } +} + +internal class Reset +{ + public static readonly Reset Instance = new(); + private Reset() { } +} + +internal class Start +{ + public static readonly Start Instance = new(); + private Start() { } +} + +#endregion + +#region Classes + +// The entity actor +internal class TestConsumerEntity : ReceiveActor +{ + private readonly IActorRef _aggregator; + private readonly string _entityId; + private readonly IActorRef _consumerController; + private readonly ILoggingAdapter _log; + + public TestConsumerEntity(string entityId, IActorRef consumerController, IActorRef aggregator) + { + _entityId = entityId; + _consumerController = consumerController; + _aggregator = aggregator; + _log = Context.GetLogger(); + + Receive>(delivery => + { + _aggregator.Tell(Done.Instance); + delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance); + }); + } + + protected override void PreStart() + { + _consumerController.Tell(new ConsumerController.Start(Self)); + } +} + +// Message extractor for sharding +internal class MessageExtractor() : HashCodeMessageExtractor(10) +{ + public override string EntityId(object message) => + message is Job cmd ? (cmd.Payload % 3).ToString() : string.Empty; +} + +// The producer actor +internal class ProducerActor : ReceiveActor, IWithStash +{ + private readonly IActorRef _producerController; + private IActorRef _sendNext = ActorRefs.Nobody; + private readonly ILoggingAdapter _log; + + public ProducerActor(IActorRef producerController, bool singleState) + { + _log = Context.GetLogger(); + _producerController = producerController; + if(singleState) + Become(SingleState); + else + Become(Idle); + } + + public IStash Stash { get; set; } = null!; + + protected override void PreStart() + { + _producerController.Tell(new ShardingProducerController.Start(Self)); + } + + private void SingleState() + { + Receive>(next => + { + _sendNext = next.SendNextTo; + Stash.Unstash(); + }); + + Receive(msg => + { + if(ReferenceEquals(_sendNext, ActorRefs.Nobody)) + { + Stash.Stash(); + } + else + { + _sendNext.Tell(new ShardingEnvelope(msg.ToString(), new Job(msg))); + _sendNext = ActorRefs.Nobody; + } + }); + } + + private void Idle() + { + Receive>(next => + { + _sendNext = next.SendNextTo; + Stash.Unstash(); + Become(Active); + }); + + Receive(_ => + { + Stash.Stash(); + }); + } + + private void Active() + { + Receive(msg => + { + Become(Idle); + _sendNext.Tell(new ShardingEnvelope(msg.ToString(), new Job(msg))); + }); + + Receive>(next => + { + _sendNext = next.SendNextTo; + }); + } +} + +internal class AggregateActor : ReceiveActor +{ + private IActorRef? _reportTo; + private readonly int _totalMessageCount; + private int _messageCount; + + public AggregateActor(int messageCount) + { + _totalMessageCount = messageCount; + Receive(_ => + { + _messageCount++; + if (_messageCount < _totalMessageCount || _reportTo == null) + return; + + _reportTo.Tell(Completed.Instance); + }); + Receive(_ => + { + _messageCount = 0; + Sender.Tell(Done.Instance); + }); + Receive(_ => + { + if (_messageCount >= _totalMessageCount) + { + Sender.Tell(Completed.Instance); + return; + } + + _reportTo = Sender; + }); + } +} + +#endregion