Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Akka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.TestKit.Xunit2.Tests",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.TestKit.Xunit.Tests", "src\contrib\testkits\Akka.TestKit.Xunit.Tests\Akka.TestKit.Xunit.Tests.csproj", "{F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Delivery", "Delivery", "{E0F10060-F4D1-415C-A555-8A0821883F3A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DeliveryProfile", "src\examples\Cluster\Delivery\DeliveryProfile\DeliveryProfile.csproj", "{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1318,6 +1322,18 @@ Global
{F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}.Release|x64.Build.0 = Release|Any CPU
{F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}.Release|x86.ActiveCfg = Release|Any CPU
{F80F41E6-E5C7-4C92-B1CF-42539ECFBE68}.Release|x86.Build.0 = Release|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x64.ActiveCfg = Debug|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x64.Build.0 = Debug|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x86.ActiveCfg = Debug|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Debug|x86.Build.0 = Debug|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|Any CPU.Build.0 = Release|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x64.ActiveCfg = Release|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x64.Build.0 = Release|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x86.ActiveCfg = Release|Any CPU
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1441,6 +1457,8 @@ Global
{337A85B5-4A7C-4883-8634-46E7E52A765F} = {7735F35A-E7B7-44DE-B6FB-C770B53EB69C}
{95017C99-E960-44E5-83AD-BF21461DF06F} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E}
{F80F41E6-E5C7-4C92-B1CF-42539ECFBE68} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E}
{E0F10060-F4D1-415C-A555-8A0821883F3A} = {C50E1A9E-820C-4E75-AE39-6F96A99AC4A7}
{33ACE430-9CB2-4DDF-8E77-C993D32F36FD} = {E0F10060-F4D1-415C-A555-8A0821883F3A}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {03AD8E21-7507-4E68-A4E9-F4A7E7273164}
Expand Down
1 change: 1 addition & 0 deletions src/benchmark/Akka.Benchmarks/Akka.Benchmarks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\contrib\cluster\Akka.Cluster.Sharding\Akka.Cluster.Sharding.csproj" />
<ProjectReference Include="..\..\contrib\cluster\Akka.DistributedData\Akka.DistributedData.csproj" />
<ProjectReference Include="..\..\contrib\serializers\Akka.Serialization.Hyperion\Akka.Serialization.Hyperion.csproj" />
<ProjectReference Include="..\..\core\Akka.Cluster\Akka.Cluster.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Cluster.Sharding;
using Akka.Cluster.Sharding.Delivery;
using Akka.Configuration;
using Akka.Delivery;
using Akka.Persistence.Delivery;
using Akka.Util;
using BenchmarkDotNet.Attributes;
using FluentAssertions.Extensions;

namespace Akka.Benchmarks.Cluster.Delivery;

[Config(typeof(MacroBenchmarkConfig))]
[IterationCount(100)]
public class ClusterDeliveryBenchmark
{
private static readonly Config Config =
"""
akka.loglevel = WARNING
akka.actor.provider = cluster
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem"
akka.remote.dot-netty.tcp.port = 0
# akka.reliable-delivery.sharding.producer-controller.buffer-size = 10000
# akka.reliable-delivery.sharding.consumer-controller.buffer-size = 10000
# akka.reliable-delivery.consumer-controller.flow-control-window = 1000
""";

private ActorSystem _system;
private IActorRef? _producer;
private IActorRef? _region;
private IActorRef? _controller;
private IActorRef? _aggregator;

private const int MessageCount = 800;

[Params(false, true)]
public bool UseSingleState;

[GlobalSetup]
public void GlobalSetup()
{
_system = ActorSystem.Create("BenchmarkSystem", Config);

// Join cluster
var tcs = new TaskCompletionSource();
var cluster = Akka.Cluster.Cluster.Get(_system);
cluster.RegisterOnMemberUp(() =>
{
tcs.SetResult();
});
cluster.Join(cluster.SelfAddress);
tcs.Task.WaitAsync(TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();

// Get the test completed aggregator
_aggregator = _system.ActorOf(Props.Create(() => new AggregateActor(MessageCount)));

// Register the sharding region for later use
_region = ClusterSharding.Get(_system).StartAsync(
typeName: "TestConsumer",
entityPropsFactory: id => ShardingConsumerController.Create<Job>(
c => Props.Create(() => new TestConsumerEntity(id, c, _aggregator)),
ShardingConsumerController.Settings.Create(_system)),
settings: ClusterShardingSettings.Create(_system),
messageExtractor: new MessageExtractor())
.WaitAsync(3.Seconds()).GetAwaiter().GetResult();

// Create the ShardingProducerController
_controller = _system.ActorOf(
ShardingProducerController.Create<Job>(
producerId: "test-producer",
shardRegion: _region!,
durableQueue: Option<Props>.None,
settings: ShardingProducerController.Settings.Create(_system)
),
"producerController"
);

// Create the producer actor
_producer = _system.ActorOf(Props.Create(() => new ProducerActor(_controller, UseSingleState)), "producer");

// Debug
var consumerSettings = ConsumerController.Settings.Create(_system);
Console.WriteLine($"ConsumerController.Settings.FlowControlWindow: {consumerSettings.FlowControlWindow}");
var shardingProducerSettings = ShardingProducerController.Settings.Create(_system);
Console.WriteLine($"ShardingProducerController.Settings.BufferSize: {shardingProducerSettings.BufferSize}");
var shardingConsumerSettings = ShardingConsumerController.Settings.Create(_system);
Console.WriteLine($"ShardingConsumerController.Settings.BufferSize: {shardingConsumerSettings.BufferSize}");
}

[GlobalCleanup]
public void Teardown()
{
_system.Terminate().WaitAsync(30.Seconds()).GetAwaiter().GetResult();
_aggregator = null;
_producer = null;
_region = null;
_controller = null;
}

[IterationSetup]
public void IterationSetup()
{
_aggregator.Ask<Done>(Reset.Instance).GetAwaiter().GetResult();
}

[Benchmark(OperationsPerInvoke = MessageCount)]
public async Task ClusterShardingDeliveryMessageThroughputBenchmark()
{
foreach (var message in Enumerable.Range(0, MessageCount))
{
_producer.Tell(message);
}

await _aggregator.Ask<Completed>(GetCompleted.Instance);
}
}
188 changes: 188 additions & 0 deletions src/benchmark/Akka.Benchmarks/Cluster/Delivery/Shared.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
using System.Collections.Generic;
using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Cluster.Sharding.Delivery;
using Akka.Delivery;
using Akka.Event;

namespace Akka.Benchmarks.Cluster.Delivery;

#region Messages

internal record Job(int Payload);

internal class GetCompleted
{
public static readonly GetCompleted Instance = new();
private GetCompleted() { }
}

internal class Completed
{
public static readonly Completed Instance = new();
private Completed() { }
}

internal class Reset
{
public static readonly Reset Instance = new();
private Reset() { }
}

internal class Start
{
public static readonly Start Instance = new();
private Start() { }
}

#endregion

#region Classes

// The entity actor
internal class TestConsumerEntity : ReceiveActor
{
private readonly IActorRef _aggregator;
private readonly string _entityId;
private readonly IActorRef _consumerController;
private readonly ILoggingAdapter _log;

public TestConsumerEntity(string entityId, IActorRef consumerController, IActorRef aggregator)
{
_entityId = entityId;
_consumerController = consumerController;
_aggregator = aggregator;
_log = Context.GetLogger();

Receive<ConsumerController.Delivery<Job>>(delivery =>
{
_aggregator.Tell(Done.Instance);
delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
});
}

protected override void PreStart()
{
_consumerController.Tell(new ConsumerController.Start<Job>(Self));
}
}

// Message extractor for sharding
internal class MessageExtractor() : HashCodeMessageExtractor(10)
{
public override string EntityId(object message) =>
message is Job cmd ? (cmd.Payload % 3).ToString() : string.Empty;
}

// The producer actor
internal class ProducerActor : ReceiveActor, IWithStash
{
private readonly IActorRef _producerController;
private IActorRef _sendNext = ActorRefs.Nobody;
private readonly ILoggingAdapter _log;

public ProducerActor(IActorRef producerController, bool singleState)
{
_log = Context.GetLogger();
_producerController = producerController;
if(singleState)
Become(SingleState);
else
Become(Idle);
}

public IStash Stash { get; set; } = null!;

protected override void PreStart()
{
_producerController.Tell(new ShardingProducerController.Start<Job>(Self));
}

private void SingleState()
{
Receive<ShardingProducerController.RequestNext<Job>>(next =>
{
_sendNext = next.SendNextTo;
Stash.Unstash();
});

Receive<int>(msg =>
{
if(ReferenceEquals(_sendNext, ActorRefs.Nobody))
{
Stash.Stash();
}
else
{
_sendNext.Tell(new ShardingEnvelope(msg.ToString(), new Job(msg)));
_sendNext = ActorRefs.Nobody;
}
});
}

private void Idle()
{
Receive<ShardingProducerController.RequestNext<Job>>(next =>
{
_sendNext = next.SendNextTo;
Stash.Unstash();
Become(Active);
});

Receive<int>(_ =>
{
Stash.Stash();
});
}

private void Active()
{
Receive<int>(msg =>
{
Become(Idle);
_sendNext.Tell(new ShardingEnvelope(msg.ToString(), new Job(msg)));
});

Receive<ShardingProducerController.RequestNext<Job>>(next =>
{
_sendNext = next.SendNextTo;
});
}
}

internal class AggregateActor : ReceiveActor
{
private IActorRef? _reportTo;
private readonly int _totalMessageCount;
private int _messageCount;

public AggregateActor(int messageCount)
{
_totalMessageCount = messageCount;
Receive<Done>(_ =>
{
_messageCount++;
if (_messageCount < _totalMessageCount || _reportTo == null)
return;

_reportTo.Tell(Completed.Instance);
});
Receive<Reset>(_ =>
{
_messageCount = 0;
Sender.Tell(Done.Instance);
});
Receive<GetCompleted>(_ =>
{
if (_messageCount >= _totalMessageCount)
{
Sender.Tell(Completed.Instance);
return;
}

_reportTo = Sender;
});
}
}

#endregion
Loading
Loading