Skip to content

Commit

Permalink
Merge pull request #280 from DevJonny/issue-105
Browse files Browse the repository at this point in the history
Fix #105 - Support for Binding a channel to multiple topics
  • Loading branch information
iancooper authored Mar 28, 2018
2 parents 6996b50 + fe86821 commit c3ec4e1
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 24 deletions.
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This section lists features in master, available by [AppVeyor](https://ci.appvey

## Master ##
- Added beta Support for a Redis transport
- Support for Binding a channel to multiple topics

## Release 7.2.0 ##
- Support for PostgreSql Message Store (Tarun Pothulapati @Pothulapati)
Expand Down
85 changes: 70 additions & 15 deletions src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
Expand All @@ -47,7 +48,7 @@ public class RmqMessageConsumer : RMQMessageGateway, IAmAMessageConsumer
private static readonly Lazy<ILog> _logger = new Lazy<ILog>(LogProvider.For<RmqMessageConsumer>);

private readonly string _queueName;
private readonly string _routingKey;
private readonly RoutingKeys _routingKeys;
private readonly bool _isDurable;
private readonly ushort _preFetchSize;
private const bool AutoAck = false;
Expand All @@ -73,7 +74,33 @@ public RmqMessageConsumer(
: base(connection)
{
_queueName = queueName;
_routingKey = routingKey;
_routingKeys = new RoutingKeys(routingKey);
_isDurable = isDurable;
_preFetchSize = preFetchSize;
IsQueueMirroredAcrossAllNodesInTheCluster = highAvailability;
_messageCreator = new RmqMessageCreator();
}

/// <summary>
/// Initializes a new instance of the <see cref="RMQMessageGateway" /> class.
/// </summary>
/// <param name="connection"></param>
/// <param name="queueName">The queue name.</param>
/// <param name="routingKeys">The routing keys.</param>
/// <param name="isDurable">Is the queue persisted to disk</param>
/// <param name="preFetchSize">0="Don't send me a new message until I?ve finished", 1= "Send me one message at a time", n = number to grab (take care with competing consumers)</param>
/// <param name="highAvailability"></param>
public RmqMessageConsumer(
RmqMessagingGatewayConnection connection,
string queueName,
string[] routingKeys,
bool isDurable,
ushort preFetchSize = 1,
bool highAvailability = false)
: base(connection)
{
_queueName = queueName;
_routingKeys = new RoutingKeys(routingKeys);
_isDurable = isDurable;
_preFetchSize = preFetchSize;
IsQueueMirroredAcrossAllNodesInTheCluster = highAvailability;
Expand Down Expand Up @@ -191,7 +218,7 @@ public void Reject(Message message, bool requeue)
/// <returns>Message.</returns>
public Message Receive(int timeoutInMilliseconds)
{
_logger.Value.DebugFormat("RmqMessageConsumer: Preparing to retrieve next message from queue {0} with routing key {1} via exchange {2} on connection {3}", _queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
_logger.Value.DebugFormat("RmqMessageConsumer: Preparing to retrieve next message from queue {0} with routing key {1} via exchange {2} on connection {3}", _queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());

var message = new Message();
try
Expand All @@ -203,15 +230,15 @@ public Message Receive(int timeoutInMilliseconds)
message = _messageCreator.CreateMessage(fromQueue);
_logger.Value.InfoFormat(
"RmqMessageConsumer: Received message from queue {0} with routing key {1} via exchange {2} on connection {3}, message: {5}{4}",
_queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri(),
_queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri(), JsonConvert.SerializeObject(message),
JsonConvert.SerializeObject(message),
Environment.NewLine);
}
else
{
_logger.Value.DebugFormat(
"RmqMessageConsumer: Time out without receiving message from queue {0} with routing key {1} via exchange {2} on connection {3}",
_queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
_queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
}
}
catch (EndOfStreamException endOfStreamException)
Expand All @@ -220,7 +247,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: The consumer {4} was canceled, the model closed, or the connection went away. Listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
endOfStreamException,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri(),
_consumer.ConsumerTag);
Expand All @@ -232,7 +259,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: There broker was unreachable listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
bue,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
ResetConnectionToBroker();
Expand All @@ -244,7 +271,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: There connection was already closed when listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
ace,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
ResetConnectionToBroker();
Expand All @@ -256,7 +283,7 @@ public Message Receive(int timeoutInMilliseconds)
"RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
oie,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
throw new ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", oie);
Expand All @@ -266,7 +293,7 @@ public Message Receive(int timeoutInMilliseconds)
_logger.Value.ErrorException("RmqMessageConsumer: The socket timed out whilst listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
te,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
ResetConnectionToBroker();
Expand All @@ -278,7 +305,7 @@ public Message Receive(int timeoutInMilliseconds)
_logger.Value.ErrorException("RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
nse,
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
throw new ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", nse);
Expand All @@ -287,14 +314,14 @@ public Message Receive(int timeoutInMilliseconds)
{
_logger.Value.WarnFormat("CIRCUIT BROKEN: RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}",
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri());
throw new ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", bce);
}
catch (Exception exception)
{
_logger.Value.ErrorException("RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}", exception, _queueName, _routingKey, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
_logger.Value.ErrorException("RmqMessageConsumer: There was an error listening to queue {0} via exchange {1} via exchange {2} on connection {3}", exception, _queueName, _routingKeys, Connection.Exchange.Name, Connection.AmpqUri.GetSanitizedUri());
throw;
}

Expand All @@ -313,7 +340,7 @@ protected virtual void CreateConsumer()

_logger.Value.InfoFormat("RmqMessageConsumer: Created consumer with for queue {0} with routing key {1} via exchange {2} on connection {3}",
_queueName,
_routingKey,
_routingKeys,
Connection.Exchange.Name,
Connection.AmpqUri.GetSanitizedUri()
);
Expand All @@ -336,7 +363,10 @@ private void EnsureChannelBind()

Channel.QueueDeclare(_queueName, _isDurable, false, false, SetQueueArguments());

Channel.QueueBind(_queueName, Connection.Exchange.Name, _routingKey);
foreach (var key in _routingKeys)
{
Channel.QueueBind(_queueName, Connection.Exchange.Name, key);
}
}

private Dictionary<string, object> SetConsumerArguments()
Expand Down Expand Up @@ -389,6 +419,31 @@ private void CancelConsumer()

_consumer = null;
}
}
}

internal class RoutingKeys : IEnumerable<string>
{
private readonly IEnumerable<string> _routingKeys;

public RoutingKeys(params string[] routingKeys)
{
_routingKeys = routingKeys;
}

public IEnumerator<string> GetEnumerator()
{
return _routingKeys.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public override string ToString()
{
return $"[{string.Join(", ", _routingKeys)}]";
}
}
}
26 changes: 17 additions & 9 deletions tests/Paramore.Brighter.Tests/MessagingGateway/rmq/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Paramore.Brighter.MessagingGateway.RMQ;
Expand All @@ -49,19 +50,27 @@ public TestRMQListener(RmqMessagingGatewayConnection connection, string channelN
_channel.QueueBind(_channelName, connection.Exchange.Name, _channelName);
}

public BasicGetResult Listen(int waitForMilliseconds = 0, bool suppressDisposal = false)
public TestRMQListener(RmqMessagingGatewayConnection connection, string channelName, params string[] routingKeys)
{
_channelName = channelName;
_connectionFactory = new ConnectionFactory { Uri = connection.AmpqUri.Uri.ToString() };
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.DeclareExchangeForConnection(connection);
_channel.QueueDeclare(_channelName, false, false, false, null);

foreach (var routingKey in routingKeys)
_channel.QueueBind(_channelName, connection.Exchange.Name, routingKey);
}

public BasicGetResult Listen(int waitForMilliseconds = 0, bool suppressDisposal = false, bool ack = true)
{
try
{
if (waitForMilliseconds > 0)
Task.Delay(waitForMilliseconds).Wait();
Task.Delay(waitForMilliseconds).Wait();

var result = _channel.BasicGet(_channelName, true);
if (result != null)
{
_channel.BasicAck(result.DeliveryTag, false);
return result;
}
return _channel.BasicGet(_channelName, false);
}
finally
{
Expand All @@ -73,7 +82,6 @@ public BasicGetResult Listen(int waitForMilliseconds = 0, bool suppressDisposal
if (_connection.IsOpen) _connection.Dispose();
}
}
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using FluentAssertions;
using Paramore.Brighter.MessagingGateway.RMQ;
using Paramore.Brighter.MessagingGateway.RMQ.MessagingGatewayConfiguration;
using Xunit;

namespace Paramore.Brighter.Tests.MessagingGateway.RMQ
{
[Trait("Category", "RMQ")]
public class RmqMessageConsumerMultipleTopicTests : IDisposable
{
private readonly IAmAMessageProducer _messageProducer;
private readonly IAmAMessageConsumer _messageConsumer;
private readonly Message _messageTopic1, _messageTopic2;
private readonly TestRMQListener _client;

public RmqMessageConsumerMultipleTopicTests()
{
_messageTopic1 = new Message(new MessageHeader(Guid.NewGuid(), "test1", MessageType.MT_COMMAND), new MessageBody("test content for topic test 1"));
_messageTopic2 = new Message(new MessageHeader(Guid.NewGuid(), "test2", MessageType.MT_COMMAND), new MessageBody("test content for topic test 2"));

var rmqConnection = new RmqMessagingGatewayConnection
{
AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672/%2f")),
Exchange = new Exchange("paramore.brighter.exchange")
};

var topics = new[] {_messageTopic1.Header.Topic, _messageTopic2.Header.Topic};

_messageProducer = new RmqMessageProducer(rmqConnection);
_messageConsumer = new RmqMessageConsumer(rmqConnection, "Multiple.Topic.Queue", topics, false, 1, false);
_messageConsumer.Purge();

_client = new TestRMQListener(rmqConnection, "Multiple.Topic.Queue", topics);
}

[Fact]
public void When_reading_a_message_from_a_channel_with_multiple_topics()
{
_messageProducer.Send(_messageTopic1);
_messageProducer.Send(_messageTopic2);

var topic1Result = _client.Listen(suppressDisposal: true);
var topic2Result = _client.Listen();


// should_received_a_message_from_test1_with_same_topic_and_body
topic1Result.RoutingKey.Should().Be(_messageTopic1.Header.Topic);
topic1Result.Body.Should().BeEquivalentTo(_messageTopic1.Body.Value);

// should_received_a_message_from_test2_with_same_topic_and_body
topic2Result.RoutingKey.Should().Be(_messageTopic2.Header.Topic);
topic2Result.Body.Should().BeEquivalentTo(_messageTopic2.Body.Value);
}

public void Dispose()
{
_messageConsumer.Purge();
_messageProducer.Dispose();
}
}
}

0 comments on commit c3ec4e1

Please sign in to comment.