Skip to content

Commit 73fa831

Browse files
Added SQS plug-in as RMQ alternative.
1 parent f4b1fa4 commit 73fa831

File tree

7 files changed

+577
-0
lines changed

7 files changed

+577
-0
lines changed

src/Messaging/Monai.Deploy.Messaging.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ SPDX-License-Identifier: Apache License 2.0
5050
</ItemGroup>
5151

5252
<ItemGroup>
53+
<PackageReference Include="Amazon.SQS.ExtendedClient" Version="1.5.0" />
5354
<PackageReference Include="Ardalis.GuardClauses" Version="4.0.1" />
55+
<PackageReference Include="AWSSDK.S3" Version="3.7.9.15" />
56+
<PackageReference Include="AWSSDK.SQS" Version="3.7.2.68" />
5457
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
5558
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
5659
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium
2+
// SPDX-License-Identifier: Apache License 2.0
3+
4+
namespace Monai.Deploy.Messaging.Configuration
5+
{
6+
internal static class SQSConfigurationKeys
7+
{
8+
public static readonly string AccessKey = "accessKey";
9+
public static readonly string AccessToken = "accessToken";
10+
public static readonly string Region = "region";
11+
public static readonly string WorkflowRequestQueue = "workflowRequestQueue";
12+
public static readonly string ExportRequestQueue = "exportRequestQueue";
13+
public static readonly string BucketName = "bucketName";
14+
public static readonly string Envid = "environmentId";
15+
16+
public static readonly string[] PublisherRequiredKeys = new[] { WorkflowRequestQueue, BucketName };
17+
public static readonly string[] SubscriberRequiredKeys = new[] { ExportRequestQueue, BucketName };
18+
}
19+
}

src/Messaging/SQS/Log.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace Monai.Deploy.Messaging.SQS
4+
{
5+
public static partial class Log
6+
{
7+
internal static readonly string LoggingScopeMessageApplication = "Message ID={0}. Application ID={1}.";
8+
9+
10+
[LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message {MessageId} to Queue={topic}.")]
11+
public static partial void PublishingToSQS(this ILogger logger, string topic, string MessageId);
12+
13+
[LoggerMessage(EventId = 10001, Level = LogLevel.Information, Message = "{ServiceName} connecting to SQS.")]
14+
public static partial void ConnectingToSQS(this ILogger logger, string serviceName);
15+
16+
[LoggerMessage(EventId = 10002, Level = LogLevel.Information, Message = "Message received from queue {queue}.")]
17+
public static partial void MessageReceivedFromQueue(this ILogger logger, string queue, string topic);
18+
19+
[LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}. Queue={queue}.")]
20+
public static partial void SubscribeToSQSQueue(this ILogger logger, string endpoint, string virtualHost, string exchange, string queue, string topic);
21+
22+
[LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")]
23+
public static partial void SendingAcknowledgement(this ILogger logger, string messageId);
24+
25+
[LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}.")]
26+
public static partial void AcknowledgementSent(this ILogger logger, string messageId);
27+
28+
[LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")]
29+
public static partial void SendingNAcknowledgement(this ILogger logger, string messageId);
30+
31+
[LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")]
32+
public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue);
33+
34+
[LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")]
35+
public static partial void ClosingConnections(this ILogger logger);
36+
37+
[LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Message ID={messageId}.")]
38+
public static partial void InvalidMessage(this ILogger logger, string queueName, string topic, string messageId, Exception ex);
39+
40+
[LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Message ID={messageId}.")]
41+
public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string topic, string messageId, Exception ex);
42+
43+
[LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Creating SQS client.")]
44+
public static partial void CreateSQSClient(this ILogger logger);
45+
46+
[LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "{ServiceName} failed to connect to SQS.")]
47+
public static partial void ConnectingToSQSError(this ILogger logger, string serviceName, Exception ex);
48+
}
49+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Text.RegularExpressions;
2+
3+
namespace Monai.Deploy.Messaging.SQS
4+
{
5+
internal static class QueueFormatter
6+
{
7+
/// <summary>
8+
/// Returns an aggregate of the the environmentId, queueBasename nd topic as the name of the queue defined in SQS.
9+
/// The returned string is made compliant to SQS naming convention : It will replace non alphanumeric and other characters than "_" and "-", by an hyphen
10+
/// </summary>
11+
/// <param name="environmentId"></param>
12+
/// <param name="queuebasename"></param>
13+
/// <param name="topic"></param>
14+
/// <returns>string</returns>
15+
public static string FormatQueueName(string environmentId, string? queuebasename, string topic)
16+
{
17+
18+
string queue = $"{queuebasename}_{topic}";
19+
20+
if (!environmentId.Equals(String.Empty))
21+
queue = $"{environmentId}_{queue}";
22+
queue = Regex.Replace(queue, "[^a-zA-Z0-9_]", "-");
23+
if (queue.Length > 80)
24+
queue = queue.Substring(0, 80);
25+
return queue;
26+
}
27+
}
28+
}

src/Messaging/SQS/README.MD

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<p align="center">
2+
<img src="https://raw.githubusercontent.com/Project-MONAI/MONAI/dev/docs/images/MONAI-logo-color.png" width="50%" alt='project-monai'>
3+
</p>
4+
5+
💡 If you want to know more about MONAI Deploy WG vision, overall structure, and guidelines, please read [MONAI Deploy](https://github.com/Project-MONAI/monai-deploy) first.
6+
7+
# MONAI Deploy Messaging
8+
Messaging layer for MONAI Deploy clinical data pipelines system.
9+
10+
## Contributing
11+
12+
For guidance on making a contribution to MONAI Deploy Workflow Manager, see the [contributing guidelines](https://github.com/Project-MONAI/monai-deploy/blob/main/CONTRIBUTING.md).
13+
14+
Join the conversation on Twitter [@ProjectMONAI](https://twitter.com/ProjectMONAI) or join our [Slack channel](https://forms.gle/QTxJq3hFictp31UM9).
15+
16+
Ask and answer questions over on [MONAI Deploy Workflow Manager's GitHub Discussions tab](https://github.com/Project-MONAI/monai-deploy-workflow-manager/discussions).
17+
18+
## Links
19+
20+
- Website: <https://monai.io>
21+
- Code: <https://github.com/Project-MONAI/monai-deploy-messaging>
22+
- Project tracker: <https://github.com/Project-MONAI/monai-deploy-messaging/projects>
23+
- Issue tracker: <https://github.com/Project-MONAI/monai-deploy-messaging/issues>
24+
- Test status: <https://github.com/Project-MONAI/monai-deploy-messaging/actions>
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium
2+
// SPDX-License-Identifier: Apache License 2.0
3+
4+
using System.Globalization;
5+
using System.Text;
6+
using Amazon.S3;
7+
using Amazon.SQS;
8+
using Amazon.SQS.ExtendedClient;
9+
using Amazon.SQS.Model;
10+
using Ardalis.GuardClauses;
11+
using Microsoft.Extensions.Logging;
12+
using Microsoft.Extensions.Options;
13+
using Monai.Deploy.Messaging.Configuration;
14+
15+
namespace Monai.Deploy.Messaging.SQS
16+
{
17+
public class SQSMessagePublisherService : IMessageBrokerPublisherService
18+
{
19+
private const int PersistentDeliveryMode = 2;
20+
21+
private readonly ILogger<SQSMessagePublisherService> _logger;
22+
private readonly string? _accessKey;
23+
private readonly string? _accessToken;
24+
private readonly string _environmentId = string.Empty;
25+
private bool _disposedValue;
26+
27+
28+
public string Name => "AWS SQS Publisher";
29+
private readonly string _queueName;
30+
private readonly string _bucketName;
31+
private readonly AmazonSQSClient? _sqsClient;
32+
private readonly AmazonS3Client? _s3Client;
33+
private readonly AmazonSQSExtendedClient? _sqSExtendedClient;
34+
35+
public SQSMessagePublisherService(IOptions<MessageBrokerServiceConfiguration> options,
36+
ILogger<SQSMessagePublisherService> logger)
37+
{
38+
Guard.Against.Null(options, nameof(options));
39+
40+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
41+
42+
var configuration = options.Value;
43+
ValidateConfiguration(configuration);
44+
45+
46+
//This 2 config entries are mandatory.
47+
_queueName = configuration.PublisherSettings[SQSConfigurationKeys.WorkflowRequestQueue];
48+
_bucketName = configuration.PublisherSettings[SQSConfigurationKeys.BucketName];
49+
50+
51+
if (configuration.PublisherSettings.ContainsKey(SQSConfigurationKeys.AccessKey))
52+
{
53+
_logger.LogInformation("accessKey found in configuration.");
54+
_accessKey = configuration.PublisherSettings[SQSConfigurationKeys.AccessKey];
55+
}
56+
57+
58+
if (configuration.PublisherSettings.ContainsKey(SQSConfigurationKeys.AccessToken))
59+
{
60+
_logger.LogInformation("accessToken found in configuration.");
61+
_accessToken = configuration.PublisherSettings[SQSConfigurationKeys.AccessToken];
62+
}
63+
64+
if (configuration.PublisherSettings.ContainsKey(SQSConfigurationKeys.Envid))
65+
_environmentId = configuration.PublisherSettings[SQSConfigurationKeys.Envid];
66+
67+
try
68+
{
69+
_logger.ConnectingToSQS(Name);
70+
71+
if (!(_accessKey is null) && !(_accessToken is null))
72+
{
73+
_logger.LogInformation("Assuming IAM user as found in the configuration file.");
74+
_sqsClient = new AmazonSQSClient(_accessKey, _accessToken);
75+
_s3Client = new AmazonS3Client(_accessKey, _accessToken);
76+
}
77+
else
78+
{
79+
_logger.LogInformation("Attempting to assume local AWS credentials.");
80+
_sqsClient = new AmazonSQSClient();
81+
_s3Client = new AmazonS3Client();
82+
}
83+
84+
_sqSExtendedClient = new AmazonSQSExtendedClient(_sqsClient,
85+
new ExtendedClientConfiguration().WithLargePayloadSupportEnabled(_s3Client, _bucketName));
86+
87+
88+
89+
}
90+
catch (Amazon.SQS.AmazonSQSException Ex)
91+
{
92+
_logger.ConnectingToSQSError(Name, Ex);
93+
}
94+
}
95+
96+
private void ValidateConfiguration(MessageBrokerServiceConfiguration configuration)
97+
{
98+
Guard.Against.Null(configuration, nameof(configuration));
99+
Guard.Against.Null(configuration.PublisherSettings, nameof(configuration.PublisherSettings));
100+
101+
foreach (var key in ConfigurationKeys.PublisherRequiredKeys)
102+
{
103+
if (!configuration.PublisherSettings.ContainsKey(key))
104+
{
105+
throw new ConfigurationException($"{Name} is missing configuration for {key}.");
106+
}
107+
}
108+
}
109+
110+
public Task Publish(string topic, Monai.Deploy.Messaging.Messages.Message message)
111+
{
112+
113+
Guard.Against.NullOrWhiteSpace(topic, nameof(topic));
114+
Guard.Against.Null(message, nameof(message));
115+
116+
117+
using var loggerScope = _logger.BeginScope(string.Format(CultureInfo.InvariantCulture, Log.LoggingScopeMessageApplication, message.MessageId, message.ApplicationId));
118+
_logger.PublishingToSQS(topic, message.MessageId);
119+
var sendMessageRequest = new SendMessageRequest();
120+
121+
Dictionary<string, MessageAttributeValue> MessageAttributes = new Dictionary<string, MessageAttributeValue>();
122+
MessageAttributeValue messageIdAttribute = new MessageAttributeValue();
123+
messageIdAttribute.DataType = "String";
124+
messageIdAttribute.StringValue = message.MessageId;
125+
MessageAttributes.Add("MessageId", messageIdAttribute);
126+
127+
MessageAttributeValue ContentTypeAttribute = new MessageAttributeValue();
128+
ContentTypeAttribute.DataType = "String";
129+
ContentTypeAttribute.StringValue = message.ContentType;
130+
MessageAttributes.Add("ContentType", ContentTypeAttribute);
131+
132+
133+
MessageAttributeValue ApplicationIdAttribute = new MessageAttributeValue();
134+
ApplicationIdAttribute.DataType = "String";
135+
ApplicationIdAttribute.StringValue = message.MessageId;
136+
MessageAttributes.Add("ApplicationId", ApplicationIdAttribute);
137+
138+
sendMessageRequest.MessageAttributes = MessageAttributes;
139+
140+
141+
Console.WriteLine("Message information : ");
142+
Console.WriteLine(message);
143+
Console.WriteLine(message.Body);
144+
Console.WriteLine(message.Body.Length);
145+
146+
147+
string queueName = QueueFormatter.FormatQueueName(_environmentId, _queueName, topic);
148+
_logger.LogDebug($"Attempting to create or subscribe to {queueName}");
149+
150+
var queueAttributes = new Dictionary<string, string>();
151+
152+
queueAttributes.Add("KmsMasterKeyId", "alias/aws/sqs");
153+
var request = new CreateQueueRequest
154+
{
155+
Attributes = queueAttributes,
156+
QueueName = queueName
157+
};
158+
159+
CreateQueueResponse createQueueResponse = new CreateQueueResponse();
160+
try
161+
{
162+
createQueueResponse = _sqSExtendedClient.CreateQueueAsync(request).Result;
163+
}
164+
catch (Exception ex)
165+
{
166+
_logger.LogDebug($"The queue could not be created or subscribed to: {ex.Message}");
167+
}
168+
169+
sendMessageRequest.QueueUrl = createQueueResponse.QueueUrl;
170+
171+
172+
173+
sendMessageRequest.MessageBody = Encoding.UTF8.GetString(message.Body, 0, message.Body.Length);
174+
175+
try
176+
{
177+
SendMessageResponse sqsresp = _sqSExtendedClient.SendMessageAsync(sendMessageRequest).Result;
178+
}
179+
catch(Exception e)
180+
{
181+
_logger.LogError($"The message could not be posted to the queue {queueName} : \n {e.Message}");
182+
}
183+
184+
185+
return Task.CompletedTask;
186+
}
187+
188+
189+
protected virtual void Dispose(bool disposing)
190+
{
191+
if (!_disposedValue)
192+
{
193+
if (disposing)
194+
{
195+
// Dispose any managed objects
196+
}
197+
198+
_disposedValue = true;
199+
}
200+
}
201+
202+
public void Dispose()
203+
{
204+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
205+
Dispose(disposing: true);
206+
GC.SuppressFinalize(this);
207+
}
208+
}
209+
}

0 commit comments

Comments
 (0)