Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #105 - Support for Binding a channel to multiple topics #280

Merged
merged 1 commit into from
Mar 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This section lists features in master, available by [AppVeyor](https://ci.appvey

## Master ##
- Added beta Support for a Redis transport
- Support for Binding a channel to multiple topics

## Release 7.2.0 ##
- Support for PostgreSql Message Store (Tarun Pothulapati @Pothulapati)
Expand Down
85 changes: 70 additions & 15 deletions src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
Expand All @@ -47,7 +48,7 @@ public class RmqMessageConsumer : RMQMessageGateway, IAmAMessageConsumer
private static readonly Lazy<ILog> _logger = new Lazy<ILog>(LogProvider.For<RmqMessageConsumer>);

private readonly string _queueName;
private readonly string _routingKey;
private readonly RoutingKeys _routingKeys;
private readonly bool _isDurable;
private readonly ushort _preFetchSize;
private const bool AutoAck = false;
Expand All @@ -73,7 +74,33 @@ public RmqMessageConsumer(
: base(connection)
{
_queueName = queueName;
_routingKey = routingKey;
_routingKeys = new RoutingKeys(routingKey);
_isDurable = isDurable;
_preFetchSize = preFetchSize;
IsQueueMirroredAcrossAllNodesInTheCluster = highAvailability;
_messageCreator = new RmqMessageCreator();
}

/// <summary>
/// Initializes a new instance of the <see cref="RMQMessageGateway" /> class.
/// </summary>
/// <param name="connection"></param>
/// <param name="queueName">The queue name.</param>
/// <param name="routingKeys">The routing keys.</param>
/// <param name="isDurable">Is the queue persisted to disk</param>
/// <param name="preFetchSize">0="Don't send me a new message until I?ve finished", 1= "Send me one message at a time", n = number to grab (take care with competing consumers)</param>
/// <param name="highAvailability"></param>
public RmqMessageConsumer(
RmqMessagingGatewayConnection connection,
string queueName,
string[] routingKeys,
bool isDurable,
ushort preFetchSize = 1,
bool highAvailability = false)
: base(connection)
{
_queueName = queueName;
_routingKeys = new RoutingKeys(routingKeys);
_isDurable = isDurable;
_preFetchSize = preFetchSize;
IsQueueMirroredAcrossAllNodesInTheCluster = highAvailability;
Expand Down Expand Up @@ -191,7 +218,7 @@ public void Reject(Message message, bool requeue)
/// <returns>Message.</returns>
public Message Receive(int timeoutInMilliseconds)
{
_logger.Value.DebugFormat("RmqMessageConsumer: Preparing to retrieve next message from queue {0} with routing key {1} via exchange {2} on connection {3}", _queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
_logger.Value.DebugFormat("RmqMessageConsumer: Preparing to retrieve next message from queue {0} with routing key {1} via exchange {2} on connection {3}", _queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());

var message = new Message();
try
Expand All @@ -203,15 +230,15 @@ public Message Receive(int timeoutInMilliseconds)
message = _messageCreator.CreateMessage(fromQueue);
_logger.Value.InfoFormat(
"RmqMessageConsumer: Received message from queue {0} with routing key {1} via exchange {2} on connection {3}, message: {5}{4}",
_queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri(),
_queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri(), JsonConvert.SerializeObject(message),
JsonConvert.SerializeObject(message),
Environment.NewLine);
}
else
{
_logger.Value.DebugFormat(
"RmqMessageConsumer: Time out without receiving message from queue {0} with routing key {1} via exchange {2} on connection {3}",
_queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
_queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
}
}
catch (EndOfStreamException endOfStreamException)
Expand All @@ -220,7 +247,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: The consumer {4} was canceled, the model closed, or the connection went away. Listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
endOfStreamException,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri(),
_consumer.ConsumerTag);
Expand All @@ -232,7 +259,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: There broker was unreachable listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
bue,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
ResetConnectionToBroker();
Expand All @@ -244,7 +271,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: There connection was already closed when listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
ace,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
ResetConnectionToBroker();
Expand All @@ -256,7 +283,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
oie,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
throw new ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", oie);
Expand All @@ -266,7 +293,7 @@ public Message Receive(int timeoutInMilliseconds)
_logger.Value.ErrorException("RmqMessageConsumer: The socket timed out whilst listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
te,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
ResetConnectionToBroker();
Expand All @@ -278,7 +305,7 @@ public Message Receive(int timeoutInMilliseconds)
_logger.Value.ErrorException("RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
nse,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
throw new ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", nse);
Expand All @@ -287,14 +314,14 @@ public Message Receive(int timeoutInMilliseconds)
{
_logger.Value.WarnFormat("CIRCUIT BROKEN: RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
throw new ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", bce);
}
catch (Exception exception)
{
_logger.Value.ErrorException("RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}", exception, _queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
_logger.Value.ErrorException("RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}", exception, _queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
throw;
}

Expand All @@ -313,7 +340,7 @@ protected virtual void CreateConsumer()

_logger.Value.InfoFormat("RmqMessageConsumer: Created consumer with for queue {0} with routing key {1} via exchange {2} on connection {3}",
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri()
);
Expand All @@ -336,7 +363,10 @@ private void EnsureChannelBind()

Channel.QueueDeclare(_queueName, _isDurable, false, false, SetQueueArguments());

Channel.QueueBind(_queueName, Connection.Exchange.Name, _routingKey);
foreach (var key in _routingKeys)
{
Channel.QueueBind(_queueName, Connection.Exchange.Name, key);
}
}

private Dictionary<string, object> SetConsumerArguments()
Expand Down Expand Up @@ -389,6 +419,31 @@ private void CancelConsumer()

_consumer = null;
}
}
}

internal class RoutingKeys : IEnumerable<string>
{
private readonly IEnumerable<string> _routingKeys;

public RoutingKeys(params string[] routingKeys)
{
_routingKeys = routingKeys;
}

public IEnumerator<string> GetEnumerator()
{
return _routingKeys.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public override string ToString()
{
return $"[{string.Join(", ", _routingKeys)}]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Paramore.Brighter.MessagingGateway.RMQ;
Expand All @@ -49,19 +50,27 @@ public TestRMQListener(RmqMessagingGatewayConnection connection, string channelN
_channel.QueueBind(_channelName, connection.Exchange.Name, _channelName);
}

public BasicGetResult Listen(int waitForMilliseconds = 0, bool suppressDisposal = false)
public TestRMQListener(RmqMessagingGatewayConnection connection, string channelName, params string[] routingKeys)
{
_channelName = channelName;
_connectionFactory = new ConnectionFactory { Uri = connection.AmpqUri.Uri.ToString() };
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.DeclareExchangeForConnection(connection);
_channel.QueueDeclare(_channelName, false, false, false, null);

foreach (var routingKey in routingKeys)
_channel.QueueBind(_channelName, connection.Exchange.Name, routingKey);
}

public BasicGetResult Listen(int waitForMilliseconds = 0, bool suppressDisposal = false, bool ack = true)
{
try
{
if (waitForMilliseconds > 0)
Task.Delay(waitForMilliseconds).Wait();
Task.Delay(waitForMilliseconds).Wait();

var result = _channel.BasicGet(_channelName, true);
if (result != null)
{
_channel.BasicAck(result.DeliveryTag, false);
return result;
}
return _channel.BasicGet(_channelName, false);
}
finally
{
Expand All @@ -73,7 +82,6 @@ public BasicGetResult Listen(int waitForMilliseconds = 0, bool suppressDisposal
if (_connection.IsOpen) _connection.Dispose();
}
}
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using FluentAssertions;
using Paramore.Brighter.MessagingGateway.RMQ;
using Paramore.Brighter.MessagingGateway.RMQ.MessagingGatewayConfiguration;
using Xunit;

namespace Paramore.Brighter.Tests.MessagingGateway.RMQ
{
[Trait("Category", "RMQ")]
public class RmqMessageConsumerMultipleTopicTests : IDisposable
{
private readonly IAmAMessageProducer _messageProducer;
private readonly IAmAMessageConsumer _messageConsumer;
private readonly Message _messageTopic1, _messageTopic2;
private readonly TestRMQListener _client;

public RmqMessageConsumerMultipleTopicTests()
{
_messageTopic1 = new Message(new MessageHeader(Guid.NewGuid(), "test1", MessageType.MT_COMMAND), new MessageBody("test content for topic test 1"));
_messageTopic2 = new Message(new MessageHeader(Guid.NewGuid(), "test2", MessageType.MT_COMMAND), new MessageBody("test content for topic test 2"));

var rmqConnection = new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")),
Exchange = new Exchange("paramore.brighter.exchange")
};

var topics = new[] {_messageTopic1.Header.Topic, _messageTopic2.Header.Topic};

_messageProducer = new RmqMessageProducer(rmqConnection);
_messageConsumer = new RmqMessageConsumer(rmqConnection, "Multiple.Topic.Queue", topics, false, 1, false);
_messageConsumer.Purge();

_client = new TestRMQListener(rmqConnection, "Multiple.Topic.Queue", topics);
}

[Fact]
public void When_reading_a_message_from_a_channel_with_multiple_topics()
{
_messageProducer.Send(_messageTopic1);
_messageProducer.Send(_messageTopic2);

var topic1Result = _client.Listen(suppressDisposal: true);
var topic2Result = _client.Listen();


// should_received_a_message_from_test1_with_same_topic_and_body
topic1Result.RoutingKey.Should().Be(_messageTopic1.Header.Topic);
topic1Result.Body.Should().BeEquivalentTo(_messageTopic1.Body.Value);

// should_received_a_message_from_test2_with_same_topic_and_body
topic2Result.RoutingKey.Should().Be(_messageTopic2.Header.Topic);
topic2Result.Body.Should().BeEquivalentTo(_messageTopic2.Body.Value);
}

public void Dispose()
{
_messageConsumer.Purge();
_messageProducer.Dispose();
}
}
}