Skip to content

Commit 63359a3

Browse files
SQS Support adde to messaging lib
1 parent 73fa831 commit 63359a3

File tree

7 files changed

+929
-3
lines changed

7 files changed

+929
-3
lines changed

SQS/ConfigurationKeys.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium
2+
// SPDX-License-Identifier: Apache License 2.0
3+
4+
namespace Monai.Deploy.Messaging.Configuration
5+
{
6+
internal static class SQSConfigurationKeys
7+
{
8+
public static readonly string AccessKey = "accessKey";
9+
public static readonly string AccessToken = "accessToken";
10+
public static readonly string Region = "region";
11+
public static readonly string WorkflowRequestQueue = "workflowRequestQueue";
12+
public static readonly string ExportRequestQueue = "exportRequestQueue";
13+
public static readonly string BucketName = "bucketName";
14+
public static readonly string Envid = "environmentId";
15+
16+
public static readonly string[] PublisherRequiredKeys = new[] { WorkflowRequestQueue, BucketName };
17+
public static readonly string[] SubscriberRequiredKeys = new[] { ExportRequestQueue, BucketName };
18+
}
19+
}

SQS/Log.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace Monai.Deploy.Messaging.SQS
4+
{
5+
public static partial class Log
6+
{
7+
internal static readonly string LoggingScopeMessageApplication = "Message ID={0}. Application ID={1}.";
8+
9+
10+
[LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message {MessageId} to Queue={topic}.")]
11+
public static partial void PublishingToSQS(this ILogger logger, string topic, string MessageId);
12+
13+
[LoggerMessage(EventId = 10001, Level = LogLevel.Information, Message = "{ServiceName} connecting to SQS.")]
14+
public static partial void ConnectingToSQS(this ILogger logger, string serviceName);
15+
16+
[LoggerMessage(EventId = 10002, Level = LogLevel.Information, Message = "Message received from queue {queue}.")]
17+
public static partial void MessageReceivedFromQueue(this ILogger logger, string queue, string topic);
18+
19+
[LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}. Queue={queue}.")]
20+
public static partial void SubscribeToSQSQueue(this ILogger logger, string endpoint, string virtualHost, string exchange, string queue, string topic);
21+
22+
[LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")]
23+
public static partial void SendingAcknowledgement(this ILogger logger, string messageId);
24+
25+
[LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}.")]
26+
public static partial void AcknowledgementSent(this ILogger logger, string messageId);
27+
28+
[LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")]
29+
public static partial void SendingNAcknowledgement(this ILogger logger, string messageId);
30+
31+
[LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")]
32+
public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue);
33+
34+
[LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")]
35+
public static partial void ClosingConnections(this ILogger logger);
36+
37+
[LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Message ID={messageId}.")]
38+
public static partial void InvalidMessage(this ILogger logger, string queueName, string topic, string messageId, Exception ex);
39+
40+
[LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Message ID={messageId}.")]
41+
public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string topic, string messageId, Exception ex);
42+
43+
[LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Creating SQS client.")]
44+
public static partial void CreateSQSClient(this ILogger logger);
45+
46+
[LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "{ServiceName} failed to connect to SQS.")]
47+
public static partial void ConnectingToSQSError(this ILogger logger, string serviceName, Exception ex);
48+
}
49+
}

SQS/QueueFormatter.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Text.RegularExpressions;
2+
3+
namespace Monai.Deploy.Messaging.SQS
4+
{
5+
internal static class QueueFormatter
6+
{
7+
/// <summary>
8+
/// Returns an aggregate of the the environmentId, queueBasename nd topic as the name of the queue defined in SQS.
9+
/// The returned string is made compliant to SQS naming convention : It will replace non alphanumeric and other characters than "_" and "-", by an hyphen
10+
/// </summary>
11+
/// <param name="environmentId"></param>
12+
/// <param name="queuebasename"></param>
13+
/// <param name="topic"></param>
14+
/// <returns>string</returns>
15+
public static string FormatQueueName(string environmentId, string? queuebasename, string topic)
16+
{
17+
18+
string queue = $"{queuebasename}_{topic}";
19+
20+
if (!environmentId.Equals(String.Empty))
21+
queue = $"{environmentId}_{queue}";
22+
queue = Regex.Replace(queue, "[^a-zA-Z0-9_]", "-");
23+
if (queue.Length > 80)
24+
queue = queue.Substring(0, 80);
25+
return queue;
26+
}
27+
}
28+
}

SQS/README.MD

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
<p align="center">
2+
<img src="https://raw.githubusercontent.com/Project-MONAI/MONAI/dev/docs/images/MONAI-logo-color.png" width="50%" alt='project-monai'>
3+
</p>
4+
5+
💡 If you want to know more about MONAI Deploy WG vision, overall structure, and guidelines, please read [MONAI Deploy](https://github.com/Project-MONAI/monai-deploy) first.
6+
7+
# MONAI Deploy Messaging SQS Plug-In
8+
AWS SQS plugin for the messaging layer of MONAI Deploy.
9+
10+
## Information:
11+
This plugin can be used as a messaging mechanism alternative to RabbitMQ, and allows MONAI Deploy to integrate with AWS Simple Queue Service. Unlike RabbitMQ, SQS does not have a concept of vhost, exchange and routing key; instead the plugin will create one specific queue for each vhost, exchange and routing key combibation.
12+
13+
<table>
14+
<tr>
15+
<td>
16+
<b>RMQ plugin</b>
17+
</td>
18+
<td>
19+
<b>SQS plugin</b>
20+
</td>
21+
<td><b>Description</b>
22+
</td>
23+
</tr>
24+
<tr>
25+
<td>vhost</td>
26+
<td>environmentId</td>
27+
<td>The vhost namespace is replaced be the parameter environmentId. This parmater is used as a prefix for each queue name, allowing mutliple MONAI deploy installations on the same AWS account with not queue name conflict.</td>
28+
</tr>
29+
<tr>
30+
<td>exchange and routing key</td>
31+
<td>workflowRequestQueue and exportRequestQueue</td>
32+
<td>These parmaeters control the name of the queues for each purpose. Each queue name will be suffixed with the name routing key.</td>
33+
</tr>
34+
</table>
35+
36+
* Queue names generated by the plugin have the following name convention : [environmentId]_[RequestQueue]_[routingKey].
37+
* Queue names will be less or equal to 80 characters long. The name will be truncated if the combinatino of environmentId, RequestQueue and routingKey exceeds this limit.
38+
* Any non-alphanumerical character other than "-" and "\_" found in the environmentId, RequestQueue, parameters or routing key variables will be replaced by "\_".
39+
* The queues are created automatically upon the 1st message publication/subscription if they do not already exist.
40+
* Because MONAI Deploy can generate payloads greater than 256KBytes, the plugin leverages the SQS extended client, allowing payload up to 2GB in size. The use of this specific client mandates the usage of an S3 bucket as transient store for the messages to be held until their delivery to the queue subscriber. More information is available below in this documentation about IAM privileges, SQS and S3 bucket requirements.
41+
42+
43+
44+
## Plugin activation
45+
46+
Because MONAI Deploy plugin management work is still in progress (as of 06/13/2022), plugins cannot be loaded at run-time. Instead this SQS pluging can be activated by doing small code changes to the MONAI Informatic Gateway project https://github.com/Project-MONAI/monai-deploy-informatics-gateway, in the `Program.cs` file and recompiling.
47+
48+
In the declaration for `CreateHostBuilder`, locate the following code block:
49+
50+
services.UseRabbitMq();
51+
services.AddSingleton<RabbitMqMessagePublisherService>();
52+
services.AddSingleton<IMessageBrokerPublisherService>(implementationFactory =>
53+
{
54+
var options = implementationFactory.GetService<IOptions<InformaticsGatewayConfiguration>>();
55+
var serviceProvider = implementationFactory.GetService<IServiceProvider>();
56+
var logger = implementationFactory.GetService<ILogger<Program>>();
57+
return serviceProvider.LocateService<IMessageBrokerPublisherService>(logger, options.Value.Messaging.PublisherServiceAssemblyName);
58+
});
59+
60+
services.AddSingleton<RabbitMqMessageSubscriberService>();
61+
services.AddSingleton<IMessageBrokerSubscriberService>(implementationFactory =>
62+
{
63+
var options = implementationFactory.GetService<IOptions<InformaticsGatewayConfiguration>>();
64+
var serviceProvider = implementationFactory.GetService<IServiceProvider>();
65+
var logger = implementationFactory.GetService<ILogger<Program>>();
66+
return serviceProvider.LocateService<IMessageBrokerSubscriberService>(logger, options.Value.Messaging.SubscriberServiceAssemblyName);
67+
});
68+
69+
70+
71+
and alter it by commenting the line `services.UseRabbitMq();`, replace `services.AddSingleton<RabbitMqMessagePublisherService>();` by `services.AddSingleton<SqsMessagePublisherService>();` and the line `services.AddSingleton<RabbitMqMessageSubscriberService>();` by `services.AddSingleton<SqsMessageSubscriberService>();`. The code block should now look like this :
72+
73+
74+
//services.UseRabbitMq();
75+
services.AddSingleton<SqsMessagePublisherService>();
76+
services.AddSingleton<IMessageBrokerPublisherService>(implementationFactory =>
77+
{
78+
var options = implementationFactory.GetService<IOptions<InformaticsGatewayConfiguration>>();
79+
var serviceProvider = implementationFactory.GetService<IServiceProvider>();
80+
var logger = implementationFactory.GetService<ILogger<Program>>();
81+
return serviceProvider.LocateService<IMessageBrokerPublisherService>(logger, options.Value.Messaging.PublisherServiceAssemblyName);
82+
});
83+
84+
services.AddSingleton<SqsMessageSubscriberService>();
85+
services.AddSingleton<IMessageBrokerSubscriberService>(implementationFactory =>
86+
{
87+
var options = implementationFactory.GetService<IOptions<InformaticsGatewayConfiguration>>();
88+
var serviceProvider = implementationFactory.GetService<IServiceProvider>();
89+
var logger = implementationFactory.GetService<ILogger<Program>>();
90+
return serviceProvider.LocateService<IMessageBrokerSubscriberService>(logger, options.Value.Messaging.SubscriberServiceAssemblyName);
91+
});
92+
93+
## MONAI Informatic Gateway Configuration
94+
95+
The plugin is configured in the Messaging section of `appsettings.json` / `appsettings.Development.json` :
96+
97+
```json
98+
"messaging": {
99+
"publisherServiceAssemblyName": "Monai.Deploy.Messaging.SQS.SQSMessagePublisherService, Monai.Deploy.Messaging",
100+
"subscriberServiceAssemblyName": "Monai.Deploy.Messaging.SQS.SqsMessageSubscriberService, Monai.Deploy.Messaging",
101+
"publisherSettings": {
102+
"bucketName": "monai-minio",
103+
"workflowRequestQueue": "workflow_tasks",
104+
"environmentId": "monai-1",
105+
"accessKey": "ASDFGHJKLADF123456789",
106+
"accessToken": "QwErTyUiOpAsDonMB88W1mcCCwQdePe8X27SEu1S"
107+
},
108+
"subscriberSettings": {
109+
"exportRequestQueue": "export_tasks",
110+
"bucketName": "monai-minio",
111+
"environmentId": "monai-1",
112+
"accessKey": "ASDFGHJKLADF123456789",
113+
"accessToken": "QwErTyUiOpAsDonMB88W1mcCCwQdePe8X27SEu1S"
114+
}
115+
},
116+
```
117+
<table>
118+
<tr>
119+
<td>bucketName</td>
120+
<td>S3 bucket used to store the messages temporarily until the subscriber gets it.</td>
121+
</tr>
122+
<tr>
123+
<td>workflowRequestQueue</td>
124+
<td>Queue prefix for the workflow requests ( MIG -> Workflow Manager ). This parameter is only useful in the PublisherSettings section.</td>
125+
</tr>
126+
<tr>
127+
<td>exportRequestQueue</td>
128+
<td>Queue prefix for the export requests ( Workflow Manager -> MIG ). This parameter is only useful in the SubscriberSettings section.</td>
129+
</tr>
130+
<tr>
131+
<td>bucketName</td>
132+
<td>S3 bucket used to store the messages temporarily until the subscriber gets it.</td>
133+
</tr>
134+
<tr>
135+
<td>environmentId</td>
136+
<td>A prefix used to identify the MONAI environment. This allows to create multiple environments within a single AWS account without queue name conflict. This parameter allows for a configuration comparable to the vhost concept in RabbitMq.</td>
137+
</tr>
138+
<tr>
139+
<td>accessKey</td>
140+
<td>AWS IAM user access key. This parameter is optional. If not present the plugin will fallback to local credentials, then EC2 role. If this parameter is used the parameter accessToken is also required. Refer to the section IAM privileges for IAM configuration.</td>
141+
</tr>
142+
<tr>
143+
<td>accessToken</td>
144+
<td>AWS IAM user access token. This parameter is only required when the parameter accesskey is provided.</td>
145+
</tr>
146+
</table>
147+
148+
## IAM Privileges
149+
150+
For the plugin to function a set of specific privileges need to be provided. The permissions can be associated either with an IAM user ( by using accessKey and accessToken in the configuratio file ) or by running MONAI on an EC2 instance associated with an IAM Role granted for the following privileges:
151+
152+
Replace the tags below in the below policy as follow :
153+
154+
[AWS_Account] : The AWS Account ID ( numeric )
155+
[EnvironmentId] : The Environment Id use int Subscriber and Publisher settings of the Messaging seciton of `appsettings.json` / `appsettings.Development.json`.
156+
[BucketName] : the name of the S3 bucket used to store the messages temporarily. The same bucket as the one used to store incoming DICOM objects can be used if desired. ( see the AWS S3 plugin to use S3 natively with MONAI Deploy )
157+
158+
```json
159+
{
160+
"Version": "2012-10-17",
161+
"Statement": [
162+
{
163+
"Sid": "VisualEditor0",
164+
"Effect": "Allow",
165+
"Action": [
166+
"sqs:DeleteMessage",
167+
"s3:PutObject",
168+
"s3:GetObject",
169+
"sqs:GetQueueUrl",
170+
"sqs:ReceiveMessage",
171+
"sqs:SendMessage",
172+
"sqs:GetQueueAttributes",
173+
"sqs:CreateQueue",
174+
"s3:DeleteObject",
175+
"sqs:SetQueueAttributes"
176+
],
177+
"Resource": [
178+
"arn:aws:sqs:*:[AWS_Account]:[EnvironmentId]",
179+
"arn:aws:s3:::[BucketName]/*"
180+
]
181+
}
182+
]
183+
}
184+
```
185+
186+
## Contributing
187+
188+
For guidance on making a contribution to MONAI Deploy Workflow Manager, see the [contributing guidelines](https://github.com/Project-MONAI/monai-deploy/blob/main/CONTRIBUTING.md).
189+
190+
Join the conversation on Twitter [@ProjectMONAI](https://twitter.com/ProjectMONAI) or join our [Slack channel](https://forms.gle/QTxJq3hFictp31UM9).
191+
192+
Ask and answer questions over on [MONAI Deploy Workflow Manager's GitHub Discussions tab](https://github.com/Project-MONAI/monai-deploy-workflow-manager/discussions).
193+
194+
## Links
195+
196+
- Website: <https://monai.io>
197+
- Code: <https://github.com/Project-MONAI/monai-deploy-messaging>
198+
- Project tracker: <https://github.com/Project-MONAI/monai-deploy-messaging/projects>
199+
- Issue tracker: <https://github.com/Project-MONAI/monai-deploy-messaging/issues>
200+
- Test status: <https://github.com/Project-MONAI/monai-deploy-messaging/actions>

0 commit comments

Comments
 (0)