Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/EventStore.Core/ClusterVNodeStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public void Configure(IApplicationBuilder app) {
_configureNode(app);

var internalDispatcher = new InternalDispatcherEndpoint(_mainQueue, _httpMessageHandler);
_mainBus.Subscribe(internalDispatcher);
_mainBus.Subscribe<SystemMessage.SystemInit>(internalDispatcher);
_mainBus.Subscribe<HttpMessage.PurgeTimedOutRequests>(internalDispatcher);

app = app.Map("/health", _statusCheck.Configure)
// AuthenticationMiddleware uses _httpAuthenticationProviders and assigns
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using EventStore.Common.Log;
using EventStore.Core.Bus;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
Expand All @@ -13,25 +11,34 @@

namespace EventStore.Core.Services.Transport.Http
{
public class InternalDispatcherEndpoint : IHandle<HttpMessage.PurgeTimedOutRequests> {
public class InternalDispatcherEndpoint :
IHandle<SystemMessage.SystemInit>,
IHandle<HttpMessage.PurgeTimedOutRequests> {

private static readonly ILogger Log = Serilog.Log.ForContext<AuthorizationMiddleware>();
private readonly IPublisher _inputBus;
private readonly IPublisher _output;
private readonly MultiQueuedHandler _requestsMultiHandler;
private static readonly TimeSpan UpdateInterval = TimeSpan.FromSeconds(1);
private readonly IEnvelope _publishEnvelope;
public InternalDispatcherEndpoint(IPublisher inputBus, MultiQueuedHandler requestsMultiHandler) {
private readonly Message _schedulePurge;

_inputBus = inputBus;
public InternalDispatcherEndpoint(IPublisher output, MultiQueuedHandler requestsMultiHandler) {
_output = output;
_requestsMultiHandler = requestsMultiHandler;
_publishEnvelope = new PublishEnvelope(inputBus);

_schedulePurge = TimerMessage.Schedule.Create(
UpdateInterval,
new PublishEnvelope(output),
new HttpMessage.PurgeTimedOutRequests());

}

public void Handle(SystemMessage.SystemInit _) {
_output.Publish(_schedulePurge);
}

public void Handle(HttpMessage.PurgeTimedOutRequests message) {

_requestsMultiHandler.PublishToAll(message);

_inputBus.Publish(
TimerMessage.Schedule.Create(
UpdateInterval, _publishEnvelope, message));
_output.Publish(_schedulePurge);
}

public Task InvokeAsync(HttpContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace EventStore.Core.Services.Transport.Http {
public class KestrelHttpService : IHttpService,
IHandle<SystemMessage.SystemInit>,
IHandle<SystemMessage.BecomeShuttingDown> {
private static readonly TimeSpan UpdateInterval = TimeSpan.FromSeconds(1);

private static readonly ILogger Log = Serilog.Log.ForContext<KestrelHttpService>();

public ServiceAccessibility Accessibility => _accessibility;
Expand Down Expand Up @@ -101,6 +101,7 @@ public void RegisterAction(ControllerAction action, Action<HttpEntityManager, Ur
public static void CreateAndSubscribePipeline(ISubscriber bus) {
var requestProcessor = new AuthenticatedHttpRequestProcessor();
bus.Subscribe<AuthenticatedHttpRequestMessage>(requestProcessor);
bus.Subscribe<HttpMessage.PurgeTimedOutRequests>(requestProcessor);
}
}
}