diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 1e63bf0..1fd856d 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,8 +1,3 @@ - - Fixes # . ### Description @@ -16,7 +11,5 @@ A few sentences describing the changes proposed in this pull request. - [ ] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. -- [ ] All tests passed locally by running `./src/run-tests-in-docker.sh`. +- [ ] All tests passed locally. - [ ] [Documentation comments](https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/language-specification/documentation-comments) included/updated. -- [ ] User guide updated. -- [ ] I have updated the changelog diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e186736..4c1d0ca 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,6 +9,11 @@ on: # Allows you to run this workflow manually from the Actions tab workflow_dispatch: + inputs: + nuget: + type: boolean + default: false + description: Publish to NuGet.org env: BUILD_CONFIG: "Release" @@ -206,6 +211,7 @@ jobs: retention-days: 30 publish: + name: Publish to GitHub Packages runs-on: ubuntu-latest needs: [build, unit-test] steps: @@ -223,10 +229,11 @@ jobs: source-url: https://nuget.pkg.github.com/Project-MONAI/index.json - name: Publish to GitHub - run: dotnet nuget push ${{ steps.download.outputs.download-path }}/artifact/*.nupkg + run: dotnet nuget push ${{ steps.download.outputs.download-path }}/artifact/*.nupkg --skip-duplicate release: - if: ${{ contains(github.ref, 'refs/heads/main') ||contains(github.head_ref, 'release/') }} + name: Official Release to NuGet.org + if: ${{ github.event.inputs.nuget || contains(github.ref, 'refs/heads/release') }} runs-on: ubuntu-latest needs: [build, unit-test] env: @@ -268,8 +275,8 @@ jobs: name: Release v${{ env.MAJORMINORPATCH }} - name: Publish release with GitReleaseManager - uses: gittools/actions/gitreleasemanager/publish@v0.9.13 if: ${{ contains(github.ref, 'refs/heads/main') }} + uses: gittools/actions/gitreleasemanager/publish@v0.9.13 with: token: ${{ secrets.GITHUB_TOKEN }} owner: ${{ steps.repo.outputs._0 }} @@ -277,8 +284,8 @@ jobs: tagName: ${{ env.MAJORMINORPATCH }} - name: Close release with GitReleaseManager - uses: gittools/actions/gitreleasemanager/close@v0.9.13 if: ${{ contains(github.ref, 'refs/heads/main') }} + uses: gittools/actions/gitreleasemanager/close@v0.9.13 with: token: ${{ secrets.GITHUB_TOKEN }} owner: ${{ steps.repo.outputs._0 }} diff --git a/GitVersion.yml b/GitVersion.yml index ab2a9a2..a35b3cd 100644 --- a/GitVersion.yml +++ b/GitVersion.yml @@ -1,17 +1,57 @@ -# SPDX-FileCopyrightText: © 2022 MONAI Consortium +# SPDX-FileCopyrightText: © 2022 MONAI Consortium # SPDX-License-Identifier: Apache License 2.0 assembly-versioning-scheme: MajorMinorPatchTag -mode: ContinuousDelivery +mode: ContinuousDeployment branches: main: tag: '' + mode: ContinuousDelivery + increment: Patch + prevent-increment-of-merged-branch-version: true + track-merge-target: false + source-branches: [ 'release' ] + tracks-release-branches: false + is-release-branch: false + is-mainline: true + pre-release-weight: 55000 release: tag: rc + regex: ^releases?[/-] + mode: ContinuousDeployment + increment: None + prevent-increment-of-merged-branch-version: true + track-merge-target: false + source-branches: [ 'main', 'release' ] + tracks-release-branches: false + is-release-branch: true + is-mainline: false + pre-release-weight: 30000 feature: tag: alpha.{BranchName} + regex: ^features?[/-] + mode: ContinuousDeployment + increment: Inherit + prevent-increment-of-merged-branch-version: false + track-merge-target: false + source-branches: [ 'main', 'release', 'feature' ] + tracks-release-branches: false + is-release-branch: false + is-mainline: false + pre-release-weight: 30000 pull-request: tag: pr + regex: ^(pull|pull\-requests|pr)[/-] + mode: ContinuousDeployment + increment: Inherit + prevent-increment-of-merged-branch-version: false + tag-number-pattern: '[/-](?\d+)[-/]' + track-merge-target: false + source-branches: [ 'main', 'release', 'feature' ] + tracks-release-branches: false + is-release-branch: false + is-mainline: false + pre-release-weight: 30000 ignore: sha: [] diff --git a/src/Messaging/API/IMessageBrokerPublisherService.cs b/src/Messaging/API/IMessageBrokerPublisherService.cs index 73bfb17..a36ac8f 100644 --- a/src/Messaging/API/IMessageBrokerPublisherService.cs +++ b/src/Messaging/API/IMessageBrokerPublisherService.cs @@ -5,7 +5,7 @@ namespace Monai.Deploy.Messaging { - public interface IMessageBrokerPublisherService + public interface IMessageBrokerPublisherService : IDisposable { /// /// Gets or sets the name of the storage service. diff --git a/src/Messaging/API/IMessageBrokerSubscriberService.cs b/src/Messaging/API/IMessageBrokerSubscriberService.cs index 6d14844..240d867 100644 --- a/src/Messaging/API/IMessageBrokerSubscriberService.cs +++ b/src/Messaging/API/IMessageBrokerSubscriberService.cs @@ -6,7 +6,7 @@ namespace Monai.Deploy.Messaging { - public interface IMessageBrokerSubscriberService + public interface IMessageBrokerSubscriberService : IDisposable { /// /// Gets or sets the name of the storage service. @@ -14,15 +14,49 @@ public interface IMessageBrokerSubscriberService string Name { get; } /// - /// Subscribe to a message topic & queue. + /// Subscribe to a message topic & queue and executes messageReceivedCallback for every message that is received. /// Either provide a topic, a queue or both. + /// A queue is generated if the name of the queue is not provided. /// - /// Name of the topic to subscribe to + /// Topic/routing key to bind to /// Name of the queue to consume /// Action to be performed when message is received - /// Number of unacknowledged messages to receive at once. Defaults to 0. + /// Number of unacknowledged messages to receive at once. Defaults to 0. void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0); + /// + /// Subscribe to a message topic & queue and executes messageReceivedCallback for every message that is received. + /// Either provide a topic, a queue or both. + /// A queue is generated if the name of the queue is not provided. + /// + /// Topics/routing keys to bind to + /// Name of the queue to consume + /// Action to be performed when message is received + /// Number of unacknowledged messages to receive at once. Defaults to 0. + void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0); + + /// + /// Subscribe to a message topic & queue and executes messageReceivedCallback asynchronously for every message that is received. + /// Either provide a topic, a queue or both. + /// A queue is generated if the name of the queue is not provided. + /// + /// Topic/routing key to bind to + /// Name of the queue to consume + /// to be performed when message is received + /// Number of unacknowledged messages to receive at once. Defaults to 0. + void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0); + + /// + /// Subscribe to a message topic & queue and executes messageReceivedCallback asynchronously for every message that is received. + /// Either provide a topic, a queue or both. + /// A queue is generated if the name of the queue is not provided. + /// + /// Topics/routing keys to bind to + /// Name of the queue to consume + /// to be performed when message is received + /// Number of unacknowledged messages to receive at once. Defaults to 0. + void SubscribeAsync(string[] topics, string queue, Func messageReceivedCallback, ushort prefetchCount = 0); + /// /// Acknowledge receiving of a message with the given token. /// @@ -30,9 +64,10 @@ public interface IMessageBrokerSubscriberService void Acknowledge(MessageBase message); /// - /// Rejects a messags. + /// Rejects a message. /// /// Message to be rejected. - void Reject(MessageBase message); + /// Determines if the message should be requeued. + void Reject(MessageBase message, bool requeue = true); } } diff --git a/src/Messaging/AssemblyInfo.cs b/src/Messaging/AssemblyInfo.cs deleted file mode 100644 index f6fa0f8..0000000 --- a/src/Messaging/AssemblyInfo.cs +++ /dev/null @@ -1,13 +0,0 @@ -//------------------------------------------------------------------------------ -// -// This code was generated by GitVersion. -// -// You can modify this code as we will not overwrite it when re-executing GitVersion -// -//------------------------------------------------------------------------------ - -using System.Reflection; - -[assembly: AssemblyFileVersion("0.1.0.0")] -[assembly: AssemblyVersion("0.1.0.0")] -[assembly: AssemblyInformationalVersion("0.1.0+0.Branch.main.Sha.ae43f4c7111e09d2a34d881c6704a2dea81d9155")] diff --git a/src/Messaging/Common/Credentials.cs b/src/Messaging/Common/Credentials.cs new file mode 100644 index 0000000..f14c980 --- /dev/null +++ b/src/Messaging/Common/Credentials.cs @@ -0,0 +1,38 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.ComponentModel.DataAnnotations; +using Newtonsoft.Json; + +namespace Monai.Deploy.Messaging.Common +{ + public class Credentials + { + /// + /// Gets or sets the access key or user name of the credentials pair. + /// + [JsonProperty(PropertyName = "access_key")] + [Required] + public string AccessKey { get; set; } + + /// + /// Gets or sets the access token or password of the credentials pair. + /// + [JsonProperty(PropertyName = "access_token")] + [Required] + public string AccessToken { get; set; } + + /// + /// Gets or sets the session token of the credentials pair. + /// + [JsonProperty(PropertyName = "session_token")] + public string SessionToken { get; set; } + + public Credentials() + { + AccessKey = string.Empty; + AccessToken = string.Empty; + SessionToken = string.Empty; + } + } +} diff --git a/src/Messaging/Common/Log.cs b/src/Messaging/Common/Log.cs index dfb4098..0bc9795 100644 --- a/src/Messaging/Common/Log.cs +++ b/src/Messaging/Common/Log.cs @@ -30,10 +30,16 @@ public static partial class Log [LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")] public static partial void SendingNAcknowledgement(this ILogger logger, string messageId); - [LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}.")] - public static partial void NAcknowledgementSent(this ILogger logger, string messageId); + [LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")] + public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue); - [LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connection.")] - public static partial void ClosingConnection(this ILogger logger); + [LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")] + public static partial void ClosingConnections(this ILogger logger); + + [LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Topic={topic}, Message ID={messageId}.")] + public static partial void InvalidMessage(this ILogger logger, string queueName, string topic, string messageId, Exception ex); + + [LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Topic={topic}, Message ID={messageId}.")] + public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string topic, string messageId, Exception ex); } } diff --git a/src/Messaging/Common/MessageConversionException.cs b/src/Messaging/Common/MessageConversionException.cs new file mode 100644 index 0000000..3f5dac6 --- /dev/null +++ b/src/Messaging/Common/MessageConversionException.cs @@ -0,0 +1,27 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 +// +using System.Runtime.Serialization; + +namespace Monai.Deploy.Messaging.Common +{ + [Serializable] + public class MessageConversionException : Exception + { + public MessageConversionException() + { + } + + public MessageConversionException(string message) : base(message) + { + } + + public MessageConversionException(string message, Exception innerException) : base(message, innerException) + { + } + + protected MessageConversionException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + } +} diff --git a/src/Messaging/Common/MessageValidationException.cs b/src/Messaging/Common/MessageValidationException.cs new file mode 100644 index 0000000..9be16e2 --- /dev/null +++ b/src/Messaging/Common/MessageValidationException.cs @@ -0,0 +1,31 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.ComponentModel.DataAnnotations; +using System.Runtime.Serialization; + +namespace Monai.Deploy.Messaging.Common +{ + [Serializable] + public class MessageValidationException : Exception + { + public MessageValidationException(List errors) + : base(FormatMessage(errors)) + { + } + + protected MessageValidationException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + + private static string FormatMessage(List errors) + { + if (errors is null || errors.Count == 0) + { + return "Invalid message."; + } + + return $"Invalid message: {string.Join(',', errors.Select(p => $"{p.ErrorMessage} Path: {string.Join(',', p.MemberNames)}."))}"; + } + } +} diff --git a/src/Messaging/Common/Storage.cs b/src/Messaging/Common/Storage.cs new file mode 100644 index 0000000..424f4f3 --- /dev/null +++ b/src/Messaging/Common/Storage.cs @@ -0,0 +1,67 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.ComponentModel.DataAnnotations; +using Newtonsoft.Json; + +namespace Monai.Deploy.Messaging.Common +{ + public class Storage : ICloneable + { + /// + /// Gets or sets the name of the artifact. + /// For Argo, name of the artifact used in the template. + /// + [JsonProperty(PropertyName = "name")] + [Required] + public string Name { get; set; } + + /// + /// Gets or sets the endpoint of the storage service. + /// + [JsonProperty(PropertyName = "endpoint")] + [Required] + public string Endpoint { get; set; } + + /// + /// Gets or sets credentials for accessing the storage service. + /// + [JsonProperty(PropertyName = "credentials")] + public Credentials? Credentials { get; set; } + + /// + /// Gets or sets name of the bucket. + /// + [JsonProperty(PropertyName = "bucket")] + [Required] + public string Bucket { get; set; } + + /// + /// Gets or sets whether the connection should be secured or not. + /// + [JsonProperty(PropertyName = "secured_connection")] + public bool SecuredConnection { get; set; } + + /// + /// Gets or sets the optional relative root path to the data. + /// + [JsonProperty(PropertyName = "relative_root_path")] + [Required] + public string RelativeRootPath { get; set; } + + public Storage() + { + Name = string.Empty; + Endpoint = string.Empty; + Credentials = null; + Bucket = string.Empty; + SecuredConnection = false; + RelativeRootPath = string.Empty; + } + + public object Clone() + { + return MemberwiseClone(); + } + } +} diff --git a/src/Messaging/Configuration/ConfigurationException.cs b/src/Messaging/Configuration/ConfigurationException.cs index 1da9311..3fc53b6 100644 --- a/src/Messaging/Configuration/ConfigurationException.cs +++ b/src/Messaging/Configuration/ConfigurationException.cs @@ -8,18 +8,10 @@ namespace Monai.Deploy.Messaging.Configuration [Serializable] public class ConfigurationException : Exception { - public ConfigurationException() - { - } - public ConfigurationException(string? message) : base(message) { } - public ConfigurationException(string? message, Exception? innerException) : base(message, innerException) - { - } - protected ConfigurationException(SerializationInfo info, StreamingContext context) : base(info, context) { } diff --git a/src/Messaging/Configuration/ConfigurationKeys.cs b/src/Messaging/Configuration/ConfigurationKeys.cs index 367b25c..0bed43a 100644 --- a/src/Messaging/Configuration/ConfigurationKeys.cs +++ b/src/Messaging/Configuration/ConfigurationKeys.cs @@ -11,7 +11,8 @@ internal static class ConfigurationKeys public static readonly string VirtualHost = "virtualHost"; public static readonly string Exchange = "exchange"; public static readonly string ExportRequestQueue = "exportRequestQueue"; - + public static readonly string UseSSL = "useSSL"; + public static readonly string Port = "port"; public static readonly string[] PublisherRequiredKeys = new[] { EndPoint, Username, Password, VirtualHost, Exchange }; public static readonly string[] SubscriberRequiredKeys = new[] { EndPoint, Username, Password, VirtualHost, Exchange, ExportRequestQueue }; } diff --git a/src/Messaging/Events/EventBase.cs b/src/Messaging/Events/EventBase.cs new file mode 100644 index 0000000..a108ba2 --- /dev/null +++ b/src/Messaging/Events/EventBase.cs @@ -0,0 +1,129 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.Collections; +using System.ComponentModel.DataAnnotations; +using Ardalis.GuardClauses; +using Monai.Deploy.Messaging.Common; + +namespace Monai.Deploy.Messaging.Events +{ + public abstract class EventBase + { + /// + /// Validates the message with all properties recursively. + /// Throws on error. + /// + public virtual void Validate() + { + var validationContextItems = new Dictionary(); + var validationResults = new List(); + if (!TryValidateRecusively(this, validationContextItems, validationResults, new HashSet(), GetType().Name)) + { + throw new MessageValidationException(validationResults); + } + } + + private bool TryValidateRecusively(T instance, + IDictionary validationContextItems, + List validationResults, + ISet validatedObjects, + string propertyPath) + { + Guard.Against.Null(instance, nameof(instance)); + Guard.Against.Null(validationContextItems, nameof(validationContextItems)); + Guard.Against.Null(validationResults, nameof(validationResults)); + Guard.Against.Null(validatedObjects, nameof(validatedObjects)); + Guard.Against.NullOrWhiteSpace(propertyPath, nameof(propertyPath)); + + if (validatedObjects.Contains(instance)) + { + return true; + } + validatedObjects.Add(instance); + + var result = Validator.TryValidateObject(instance, new ValidationContext(instance, null, validationContextItems), validationResults, true); + + var properties = instance.GetType().GetProperties().Where(prop => prop.CanRead && prop.GetIndexParameters().Length == 0).ToList(); + + foreach (var property in properties) + { + if (property.PropertyType == typeof(string) || property.PropertyType.IsValueType) continue; + + var value = instance.GetType().GetProperty(property.Name)?.GetValue(instance, null); + + if (value == null) + { + continue; + } + + result &= TryValidateProperty(validationContextItems, validationResults, validatedObjects, value, $"{propertyPath}.{property.Name}"); + } + + return result; + } + + private bool TryValidateProperty(IDictionary validationContextItems, + List validationResults, + ISet validatedObjects, + object? value, + string propertyPath) + { + Guard.Against.Null(validationContextItems, nameof(validationContextItems)); + Guard.Against.Null(validationResults, nameof(validationResults)); + Guard.Against.Null(validatedObjects, nameof(validatedObjects)); + Guard.Against.NullOrWhiteSpace(propertyPath, nameof(propertyPath)); + + var result = true; + + if (value is IEnumerable enumerable && + !TryValidateEnumerableRecursively(enumerable, validationContextItems, validationResults, validatedObjects, propertyPath)) + { + result = false; + } + + var nestedValidationResults = new List(); + if (!TryValidateRecusively(value, validationContextItems, nestedValidationResults, validatedObjects, propertyPath)) + { + result = false; + foreach (var validationResult in nestedValidationResults) + { + validationResults.Add(new ValidationResult(validationResult.ErrorMessage, validationResult.MemberNames.Select(p => propertyPath + '.' + p))); + } + } + + return result; + } + + private bool TryValidateEnumerableRecursively(IEnumerable enumerable, + IDictionary validationContextItems, + IList validationResults, + ISet validatedObjects, + string propertyPath) + { + Guard.Against.Null(enumerable, nameof(enumerable)); + Guard.Against.Null(validationContextItems, nameof(validationContextItems)); + Guard.Against.Null(validationResults, nameof(validationResults)); + Guard.Against.Null(validatedObjects, nameof(validatedObjects)); + Guard.Against.NullOrWhiteSpace(propertyPath, nameof(propertyPath)); + + var result = true; + foreach (var enumObj in enumerable) + { + if (enumObj is not null) + { + var nestedValidationResults = new List(); + if (!TryValidateRecusively(enumObj, validationContextItems, nestedValidationResults, validatedObjects, propertyPath)) + { + result = false; + foreach (var validationResult in nestedValidationResults) + { + validationResults.Add(new ValidationResult(validationResult.ErrorMessage, validationResult.MemberNames.Select(p => $"{propertyPath}.{p}"))); + } + } + } + } + return result; + } + } +} diff --git a/src/Messaging/Messages/ExportCompleteMessage.cs b/src/Messaging/Events/ExportCompleteEvent.cs similarity index 77% rename from src/Messaging/Messages/ExportCompleteMessage.cs rename to src/Messaging/Events/ExportCompleteEvent.cs index d907676..12859da 100644 --- a/src/Messaging/Messages/ExportCompleteMessage.cs +++ b/src/Messaging/Events/ExportCompleteEvent.cs @@ -1,50 +1,50 @@ // SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium // SPDX-License-Identifier: Apache License 2.0 +using System.ComponentModel.DataAnnotations; using Ardalis.GuardClauses; using Newtonsoft.Json; using Newtonsoft.Json.Converters; -namespace Monai.Deploy.Messaging.Messages +namespace Monai.Deploy.Messaging.Events { - public enum ExportStatus - { - Success = 0, - Failure, - PartialFailure, - Unknown - } - - public class ExportCompleteMessage + public class ExportCompleteEvent : EventBase { /// /// Gets or sets the workflow ID generated by the Workflow Manager. /// + [JsonProperty(PropertyName = "workflow_id")] + [Required] public string WorkflowId { get; set; } = default!; /// /// Gets or sets the export task ID generated by the Workflow Manager. /// + [JsonProperty(PropertyName = "export_task_id")] + [Required] public string ExportTaskId { get; set; } = default!; /// /// Gets or sets the state of the export task. /// + [JsonProperty(PropertyName = "status")] [JsonConverter(typeof(StringEnumConverter))] + [Required] public ExportStatus Status { get; set; } /// /// Gets or sets error messages, if any, when exporting. /// + [JsonProperty(PropertyName = "message")] public string Message { get; set; } = default!; [JsonConstructor] - public ExportCompleteMessage() + public ExportCompleteEvent() { Status = ExportStatus.Unknown; } - public ExportCompleteMessage(ExportRequestMessage exportRequest) + public ExportCompleteEvent(ExportRequestEvent exportRequest) { Guard.Against.Null(exportRequest, nameof(exportRequest)); diff --git a/src/Messaging/Messages/ExportRequestMessage.cs b/src/Messaging/Events/ExportRequestEvent.cs similarity index 79% rename from src/Messaging/Messages/ExportRequestMessage.cs rename to src/Messaging/Events/ExportRequestEvent.cs index bff2b3d..72207d5 100644 --- a/src/Messaging/Messages/ExportRequestMessage.cs +++ b/src/Messaging/Events/ExportRequestEvent.cs @@ -1,23 +1,32 @@ // SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium // SPDX-License-Identifier: Apache License 2.0 -namespace Monai.Deploy.Messaging.Messages +using System.ComponentModel.DataAnnotations; +using Newtonsoft.Json; + +namespace Monai.Deploy.Messaging.Events { - public class ExportRequestMessage + public class ExportRequestEvent : EventBase { /// /// Gets or sets the workflow ID generated by the Workflow Manager. /// + [JsonProperty(PropertyName = "workflow_id")] + [Required] public string WorkflowId { get; set; } = default!; /// /// Gets or sets the export task ID generated by the Workflow Manager. /// + [JsonProperty(PropertyName = "export_task_id")] + [Required] public string ExportTaskId { get; set; } = default!; /// /// Gets or sets a list of files to be exported. /// + [JsonProperty(PropertyName = "files")] + [Required, MinLength(1)] public IEnumerable Files { get; set; } = default!; /// @@ -25,6 +34,8 @@ public class ExportRequestMessage /// For DIMSE, the named DICOM destination. /// For ACR, the Transaction ID in the original inference request. /// + [JsonProperty(PropertyName = "destination")] + [Required] public string Destination { get; set; } = default!; /// @@ -32,6 +43,8 @@ public class ExportRequestMessage /// For DIMSE, the correlation ID is the UUID associated with the first DICOM association received. /// For ACR, use the Transaction ID in the original request. /// + [JsonProperty(PropertyName = "correlation_id")] + [Required] public string CorrelationId { get; set; } = default!; /// @@ -63,9 +76,9 @@ public bool IsCompleted /// /// Gets or sets error messages related to this export task. /// - public List ErrorMessages { get; init; } + public List ErrorMessages { get; private set; } - public ExportRequestMessage() + public ExportRequestEvent() { ErrorMessages = new List(); } diff --git a/src/Messaging/Events/ExportStatus.cs b/src/Messaging/Events/ExportStatus.cs new file mode 100644 index 0000000..3788574 --- /dev/null +++ b/src/Messaging/Events/ExportStatus.cs @@ -0,0 +1,13 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +namespace Monai.Deploy.Messaging.Events +{ + public enum ExportStatus + { + Success = 0, + Failure, + PartialFailure, + Unknown + } +} diff --git a/src/Messaging/Events/FailureReason.cs b/src/Messaging/Events/FailureReason.cs new file mode 100644 index 0000000..a34605b --- /dev/null +++ b/src/Messaging/Events/FailureReason.cs @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +namespace Monai.Deploy.Messaging.Events +{ + public enum FailureReason + { + None, + Unknown, + RunnerNotSupported, + InvalidMessage, + PluginError, + ExternalServiceError, + TimedOut, + } +} diff --git a/src/Messaging/Events/TaskCallbackEvent.cs b/src/Messaging/Events/TaskCallbackEvent.cs new file mode 100644 index 0000000..5e0f3b8 --- /dev/null +++ b/src/Messaging/Events/TaskCallbackEvent.cs @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.ComponentModel.DataAnnotations; +using Monai.Deploy.Messaging.Common; +using Newtonsoft.Json; + +namespace Monai.Deploy.Messaging.Events +{ + public class TaskCallbackEvent : EventBase + { + /// + /// Gets or sets the ID representing the instance of the workflow. + /// + [JsonProperty(PropertyName = "workflow_instance_id")] + [Required] + public string WorkflowInstanceId { get; set; } + + /// + /// Gets or sets the ID representing the instance of the Task. + /// + [Required] + [JsonProperty(PropertyName = "task_id")] + public string TaskId { get; set; } + + /// + /// Gets or sets the execution ID representing the instance of the task. + /// + [JsonProperty(PropertyName = "execution_id")] + [Required] + public string ExecutionId { get; set; } + + /// + /// Gets or sets the correlation ID. + /// + [JsonProperty(PropertyName = "correlation_id")] + [Required] + public string CorrelationId { get; set; } + + /// + /// Gets or sets the identity provided by the external service. + /// + [JsonProperty(PropertyName = "identity")] + [Required, MaxLength(63)] + public string Identity { get; set; } + + /// + /// Gets or sets any metadata generated by the task, including any output generated. + /// + [JsonProperty(PropertyName = "metadata")] + public Dictionary Metadata { get; set; } + + /// + /// Gets or sets the output storage information. + /// + [JsonProperty(PropertyName = "outputs")] + public List Outputs { get; set; } + + public TaskCallbackEvent() + { + WorkflowInstanceId = String.Empty; + TaskId = String.Empty; + ExecutionId = String.Empty; + CorrelationId = String.Empty; + Identity = String.Empty; + Metadata = new Dictionary(); + Outputs = new List(); + } + } +} diff --git a/src/Messaging/Events/TaskDispatchEvent.cs b/src/Messaging/Events/TaskDispatchEvent.cs new file mode 100644 index 0000000..c4cf450 --- /dev/null +++ b/src/Messaging/Events/TaskDispatchEvent.cs @@ -0,0 +1,112 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.ComponentModel.DataAnnotations; +using Monai.Deploy.Messaging.Common; +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; + +namespace Monai.Deploy.Messaging.Events +{ + public class TaskDispatchEvent : EventBase + { + /// + /// Gets or sets the ID representing the instance of the workflow. + /// + [JsonProperty(PropertyName = "workflow_instance_id")] + [Required] + public string WorkflowInstanceId { get; set; } + + /// + /// Gets or sets the ID representing the instance of the Task. + /// + [Required] + [JsonProperty(PropertyName = "task_id")] + public string TaskId { get; set; } + + /// + /// Gets or sets the execution ID representing the instance of the task. + /// + [JsonProperty(PropertyName = "execution_id")] + [Required] + public string ExecutionId { get; set; } + + /// + /// Gets or sets the payload ID of the current workflow instance. + /// + [JsonProperty(PropertyName = "payload_id")] + [Required] + public string PayloadId { get; set; } + + /// + /// Gets or sets the correlation ID. + /// + [JsonProperty(PropertyName = "correlation_id")] + [Required] + public string CorrelationId { get; set; } + + /// + /// Gets or sets the type of plug-in the task is associated with. + /// + [JsonProperty(PropertyName = "type")] + [Required] + public string TaskPluginType { get; set; } + + /// + /// Gets or sets the task execution arguments. + /// + [JsonProperty(PropertyName = "task_plugin_arguments")] + public Dictionary TaskPluginArguments { get; set; } + + /// + /// Gets or set the status of the task. + /// + [JsonProperty(PropertyName = "status")] + [JsonConverter(typeof(StringEnumConverter))] + [Required] + public TaskExecutionStatus Status { get; set; } + + /// + /// Gets or sets the input storage information. + /// + [JsonProperty(PropertyName = "inputs")] + [Required, MinLength(1, ErrorMessage = "At least input is required.")] + public List Inputs { get; set; } + + /// + /// Gets or sets the output storage information. + /// + [JsonProperty(PropertyName = "outputs")] + [Required] + public List Outputs { get; set; } + + /// + /// Gets or sets the intermediate storage information. + /// + [JsonProperty(PropertyName = "intermediate_storage")] + [Required] + public Storage IntermediateStorage { get; set; } + + /// + /// Gets or sets any metadata relevant to the task. + /// + [JsonProperty(PropertyName = "metadata")] + public Dictionary Metadata { get; set; } + + public TaskDispatchEvent() + { + WorkflowInstanceId = string.Empty; + TaskId = string.Empty; + ExecutionId = string.Empty; + CorrelationId = string.Empty; + PayloadId = string.Empty; + TaskPluginType = string.Empty; + TaskPluginArguments = new Dictionary(); + Status = TaskExecutionStatus.Unknown; + Inputs = new List(); + Outputs = new List(); + IntermediateStorage = null!; + Metadata = new Dictionary(); + } + } +} diff --git a/src/Messaging/Events/TaskExecutionStatus.cs b/src/Messaging/Events/TaskExecutionStatus.cs new file mode 100644 index 0000000..b002187 --- /dev/null +++ b/src/Messaging/Events/TaskExecutionStatus.cs @@ -0,0 +1,17 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +namespace Monai.Deploy.Messaging.Events +{ + public enum TaskExecutionStatus + { + Unknown, + Created, + Dispatched, + Accepted, + Succeeded, + Failed, + Canceled, + Exported + } +} diff --git a/src/Messaging/Events/TaskUpdateEvent.cs b/src/Messaging/Events/TaskUpdateEvent.cs new file mode 100644 index 0000000..aeae7cf --- /dev/null +++ b/src/Messaging/Events/TaskUpdateEvent.cs @@ -0,0 +1,88 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.ComponentModel.DataAnnotations; +using Monai.Deploy.Messaging.Common; +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; + +namespace Monai.Deploy.Messaging.Events +{ + public class TaskUpdateEvent : EventBase + { + /// + /// Gets or sets the ID representing the instance of the workflow. + /// + [JsonProperty(PropertyName = "workflow_instance_id")] + [Required] + public string WorkflowInstanceId { get; set; } + + /// + /// Gets or sets the ID representing the instance of the Task. + /// + [Required] + [JsonProperty(PropertyName = "task_id")] + public string TaskId { get; set; } + + /// + /// Gets or sets the execution ID representing the instance of the task. + /// + [JsonProperty(PropertyName = "execution_id")] + [Required] + public string ExecutionId { get; set; } + + /// + /// Gets or sets the correlation ID. + /// + [JsonProperty(PropertyName = "correlation_id")] + [Required] + public string CorrelationId { get; set; } + + /// + /// Gets or set the status of the task. + /// + [JsonProperty(PropertyName = "status")] + [JsonConverter(typeof(StringEnumConverter))] + [Required] + public TaskExecutionStatus Status { get; set; } + + /// + /// Gets or set the failure reason of the task. + /// + [JsonProperty(PropertyName = "reason")] + [JsonConverter(typeof(StringEnumConverter))] + [Required] + public FailureReason Reason { get; set; } + + /// + /// Gets or set any additional (error) message related to the task. + /// + [JsonProperty(PropertyName = "message")] + public string Message { get; set; } + + /// + /// Gets or sets any output artifacts relevent to the output of the task. + /// + [JsonProperty(PropertyName = "outputs")] + public List Outputs { get; set; } + + /// + /// Gets or sets any metadata relevant to the output of the task. + /// + [JsonProperty(PropertyName = "metadata")] + public Dictionary Metadata { get; set; } + + public TaskUpdateEvent() + { + WorkflowInstanceId = String.Empty; + TaskId = String.Empty; + ExecutionId = String.Empty; + CorrelationId = String.Empty; + Status = TaskExecutionStatus.Unknown; + Reason = FailureReason.None; + Message = String.Empty; + Metadata = new Dictionary(); + Outputs = new List(); + } + } +} diff --git a/src/Messaging/Messages/WorkflowRequestMessage.cs b/src/Messaging/Events/WorkflowRequestEvent.cs similarity index 88% rename from src/Messaging/Messages/WorkflowRequestMessage.cs rename to src/Messaging/Events/WorkflowRequestEvent.cs index 9b96a1a..94d47a0 100644 --- a/src/Messaging/Messages/WorkflowRequestMessage.cs +++ b/src/Messaging/Events/WorkflowRequestEvent.cs @@ -1,12 +1,13 @@ // SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium // SPDX-License-Identifier: Apache License 2.0 +using System.ComponentModel.DataAnnotations; using Monai.Deploy.Messaging.Common; using Newtonsoft.Json; -namespace Monai.Deploy.Messaging.Messages +namespace Monai.Deploy.Messaging.Events { - public class WorkflowRequestMessage + public class WorkflowRequestEvent : EventBase { private readonly List _payload; @@ -14,6 +15,7 @@ public class WorkflowRequestMessage /// Gets or sets the ID of the payload which is also used as the root path of the payload. /// [JsonProperty(PropertyName = "payload_id")] + [Required] public Guid PayloadId { get; set; } /// @@ -26,6 +28,7 @@ public class WorkflowRequestMessage /// Gets or sets number of files in the payload. /// [JsonProperty(PropertyName = "file_count")] + [Required] public int FileCount { get; set; } /// @@ -33,12 +36,14 @@ public class WorkflowRequestMessage /// For an ACR inference request, the correlation ID is the Transaction ID in the original request. /// [JsonProperty(PropertyName = "correlation_id")] + [Required] public string CorrelationId { get; set; } = default!; /// /// Gets or set the name of the bucket where the files in are stored. /// [JsonProperty(PropertyName = "bucket")] + [Required] public string Bucket { get; set; } = default!; /// @@ -46,6 +51,7 @@ public class WorkflowRequestMessage /// For an ACR inference request, the transaction ID. /// [JsonProperty(PropertyName = "calling_aetitle")] + [Required] public string CallingAeTitle { get; set; } = default!; /// @@ -59,15 +65,17 @@ public class WorkflowRequestMessage /// Gets or sets the time the data was received. /// [JsonProperty(PropertyName = "timestamp")] + [Required] public DateTime Timestamp { get; set; } /// /// Gets or sets a list of files and metadata files in this request. /// [JsonProperty(PropertyName = "payload")] + [Required, MinLength(1, ErrorMessage = "At least one file is required.")] public IReadOnlyList Payload { get => _payload; } - public WorkflowRequestMessage() + public WorkflowRequestEvent() { _payload = new List(); Workflows = new List(); diff --git a/src/Messaging/InternalVisible.cs b/src/Messaging/InternalVisible.cs new file mode 100644 index 0000000..9426699 --- /dev/null +++ b/src/Messaging/InternalVisible.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Monai.Deploy.Messaging.Test")] diff --git a/src/Messaging/Messages/JsonMessage.cs b/src/Messaging/Messages/JsonMessage.cs index 928b9a3..a84b743 100644 --- a/src/Messaging/Messages/JsonMessage.cs +++ b/src/Messaging/Messages/JsonMessage.cs @@ -13,7 +13,7 @@ public sealed class JsonMessage : MessageBase /// /// Body of the message. /// - public T Body { get; init; } + public T Body { get; private set; } public JsonMessage(T body, string applicationId, @@ -24,7 +24,7 @@ public JsonMessage(T body, Guid.NewGuid().ToString(), applicationId, correlationId, - DateTime.UtcNow, + DateTimeOffset.UtcNow, deliveryTag) { } @@ -34,7 +34,7 @@ public JsonMessage(T body, string messageId, string applicationId, string correlationId, - DateTime creationDateTime, + DateTimeOffset creationDateTime, string deliveryTag) : base(messageId, messageDescription, MediaTypeNames.Application.Json, applicationId, correlationId, creationDateTime) { diff --git a/src/Messaging/Messages/Message.cs b/src/Messaging/Messages/Message.cs index 4a8c996..144d0ba 100644 --- a/src/Messaging/Messages/Message.cs +++ b/src/Messaging/Messages/Message.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache License 2.0 using System.Text; +using Monai.Deploy.Messaging.Common; using Newtonsoft.Json; namespace Monai.Deploy.Messaging.Messages @@ -11,7 +12,7 @@ public sealed class Message : MessageBase /// /// Body of the message. /// - public byte[] Body { get; init; } + public byte[] Body { get; private set; } public Message(byte[] body, string messageDescription, @@ -19,7 +20,7 @@ public Message(byte[] body, string applicationId, string contentType, string correlationId, - DateTime creationDateTime, + DateTimeOffset creationDateTime, string deliveryTag = "") : base(messageId, messageDescription, contentType, applicationId, correlationId, creationDateTime) { @@ -34,8 +35,33 @@ public Message(byte[] body, /// Instance of T or null if data cannot be deserialized. public T ConvertTo() { - var json = Encoding.UTF8.GetString(Body); - return JsonConvert.DeserializeObject(json)!; + try + { + var json = Encoding.UTF8.GetString(Body); + return JsonConvert.DeserializeObject(json)!; + } + catch(Exception ex) + { + throw new MessageConversionException($"Error converting message to type {typeof(T)}", ex); + } + } + + /// + /// Converts the Message into a JsonMessageT. + /// + /// Type to convert to + /// Instance of JsonMessageT or null if data cannot be deserialized. + public JsonMessage ConvertToJsonMessage() + { + try + { + var body = ConvertTo(); + return new JsonMessage(body, MessageDescription, MessageId, ApplicationId, CorrelationId, CreationDateTime, DeliveryTag); + } + catch (Exception ex) + { + throw new MessageConversionException($"Error converting message to type {typeof(T)}", ex); + } } } } diff --git a/src/Messaging/Messages/MessageBase.cs b/src/Messaging/Messages/MessageBase.cs index 3cbefa7..e206599 100644 --- a/src/Messaging/Messages/MessageBase.cs +++ b/src/Messaging/Messages/MessageBase.cs @@ -11,47 +11,47 @@ public abstract class MessageBase /// UUID for the message formatted with hyphens. /// xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx /// - public string MessageId { get; init; } + public string MessageId { get; private set; } /// /// A short description of the type serialized in the message body. /// - public string MessageDescription { get; init; } + public string MessageDescription { get; private set; } /// /// Content or MIME type of the message body. /// - public string ContentType { get; init; } + public string ContentType { get; private set; } /// /// UUID of the application, in this case, the Informatics Gateway. /// The UUID of Informatics Gateway is 16988a78-87b5-4168-a5c3-2cfc2bab8e54. /// - public string ApplicationId { get; init; } + public string ApplicationId { get; private set; } /// /// Correlation ID of the message. /// For DIMSE connections, the ID generated during association is used. /// For ACR inference requests, the Transaction ID provided in the request is used. /// - public string CorrelationId { get; init; } + public string CorrelationId { get; private set; } /// /// Datetime the message is created. /// - public DateTime CreationDateTime { get; init; } + public DateTimeOffset CreationDateTime { get; private set; } /// /// Gets or set the delivery tag/acknoweldge token for the message. /// - public string DeliveryTag { get; init; } + public string DeliveryTag { get; protected set; } protected MessageBase(string messageId, string messageDescription, string contentType, string applicationId, string correlationId, - DateTime creationDateTime) + DateTimeOffset creationDateTime) { Guard.Against.NullOrWhiteSpace(messageId, nameof(messageId)); Guard.Against.NullOrWhiteSpace(messageDescription, nameof(messageDescription)); diff --git a/src/Messaging/Monai.Deploy.Messaging.csproj b/src/Messaging/Monai.Deploy.Messaging.csproj index d6931a7..f1bf8cd 100644 --- a/src/Messaging/Monai.Deploy.Messaging.csproj +++ b/src/Messaging/Monai.Deploy.Messaging.csproj @@ -5,7 +5,8 @@ SPDX-License-Identifier: Apache License 2.0 - net6.0 + netstandard2.1 + latest enable enable false @@ -37,6 +38,10 @@ SPDX-License-Identifier: Apache License 2.0 + + + + True @@ -45,10 +50,14 @@ SPDX-License-Identifier: Apache License 2.0 + + + + \ No newline at end of file diff --git a/src/Messaging/RabbitMq/IServiceCollectionExtension.cs b/src/Messaging/RabbitMq/IServiceCollectionExtension.cs new file mode 100644 index 0000000..bd6f9f5 --- /dev/null +++ b/src/Messaging/RabbitMq/IServiceCollectionExtension.cs @@ -0,0 +1,17 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using Microsoft.Extensions.DependencyInjection; + +namespace Monai.Deploy.Messaging.RabbitMq +{ + public static class IServiceCollectionExtension + { + public static IServiceCollection UseRabbitMq(this IServiceCollection services) + { + services.AddSingleton(); + + return services; + } + } +} diff --git a/src/Messaging/RabbitMq/RabbitMqConnectionFactory.cs b/src/Messaging/RabbitMq/RabbitMqConnectionFactory.cs new file mode 100644 index 0000000..9cdda71 --- /dev/null +++ b/src/Messaging/RabbitMq/RabbitMqConnectionFactory.cs @@ -0,0 +1,140 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.Collections.Concurrent; +using System.Security.Cryptography; +using System.Text; +using Ardalis.GuardClauses; +using Microsoft.Extensions.Logging; +using Monai.Deploy.Messaging.Common; +using RabbitMQ.Client; +using System.Net.Security; + +namespace Monai.Deploy.Messaging.RabbitMq +{ + public interface IRabbitMqConnectionFactory + { + /// + /// Creates a new channel for RabbitMQ client. + /// THe connection factory maintains a single connection to the specified + /// hostName, username, password, and virtualHost combination. + /// + /// Host name + /// User name + /// Password + /// Virtual host + /// Encrypt communication + /// Port Number + /// Instance of . + IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portnumber); + } + + public class RabbitMqConnectionFactory : IRabbitMqConnectionFactory, IDisposable + { + private readonly ConcurrentDictionary> _connectionFactoriess; + private readonly ConcurrentDictionary> _connections; + private readonly ILogger _logger; + private bool _disposedValue; + + public RabbitMqConnectionFactory(ILogger logger) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _connectionFactoriess = new ConcurrentDictionary>(); + _connections = new ConcurrentDictionary>(); + } + + public IModel CreateChannel(string hostName, string username, string password, string virtualHost, string useSSL, string portnumber ) + { + Guard.Against.NullOrWhiteSpace(hostName, nameof(hostName)); + Guard.Against.NullOrWhiteSpace(username, nameof(username)); + Guard.Against.NullOrWhiteSpace(password, nameof(password)); + Guard.Against.NullOrWhiteSpace(virtualHost, nameof(virtualHost)); + + + var key = $"{hostName}{username}{HashPassword(password)}{virtualHost}"; + + var connection = _connections.AddOrUpdate(key, + x => + { + return CreatConnection(hostName, username, password, virtualHost, key, useSSL, portnumber); + }, + (updateKey, updateConnection) => + { + if (updateConnection.Value.IsOpen) + { + return updateConnection; + } + else + { + return CreatConnection(hostName, username, password, virtualHost, key, useSSL, portnumber); + } + }); + + return connection.Value.CreateModel(); + } + + private Lazy CreatConnection(string hostName, string username, string password, string virtualHost, string key, string useSSL, string portnumber) + { + int port; + Boolean SslEnabled; + Boolean.TryParse(useSSL, out SslEnabled); + if (!Int32.TryParse(portnumber, out port)) + { + port = SslEnabled ? 5671 : 5672; // 5671 is default port for SSL/TLS , 5672 is default port for PLAIN. + } + + SslOption sslOptions = new SslOption + { + Enabled = SslEnabled, + ServerName = hostName, + AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors | SslPolicyErrors.RemoteCertificateNotAvailable + }; + + var connectionFactory = _connectionFactoriess.GetOrAdd(key, y => new Lazy(() => new ConnectionFactory() + { + HostName = hostName, + UserName = username, + Password = password, + VirtualHost = virtualHost, + Ssl = sslOptions, + Port = port + })); + + return new Lazy(() => connectionFactory.Value.CreateConnection()); + } + + private object HashPassword(string password) + { + Guard.Against.NullOrWhiteSpace(password, nameof(password)); + var sha256 = SHA256.Create(); + var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(password)); + return hash.Select(x => x.ToString("x2")); + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _logger.ClosingConnections(); + foreach (var connection in _connections.Values) + { + connection.Value.Close(); + } + _connections.Clear(); + _connectionFactoriess.Clear(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/src/Messaging/RabbitMq/RabbitMqMessagePublisherService.cs b/src/Messaging/RabbitMq/RabbitMqMessagePublisherService.cs index d22397e..0fdf5ec 100644 --- a/src/Messaging/RabbitMq/RabbitMqMessagePublisherService.cs +++ b/src/Messaging/RabbitMq/RabbitMqMessagePublisherService.cs @@ -12,40 +12,49 @@ namespace Monai.Deploy.Messaging.RabbitMq { - public class RabbitMqMessagePublisherService : IMessageBrokerPublisherService, IDisposable + public class RabbitMqMessagePublisherService : IMessageBrokerPublisherService { + private const int PersistentDeliveryMode = 2; + private readonly ILogger _logger; + private readonly IRabbitMqConnectionFactory _rabbitMqConnectionFactory; private readonly string _endpoint; + private readonly string _username; + private readonly string _password; private readonly string _virtualHost; private readonly string _exchange; - private readonly IConnection _connection; + private readonly string _useSSL = string.Empty; + private readonly string _portNumber = string.Empty; private bool _disposedValue; public string Name => "Rabbit MQ Publisher"; public RabbitMqMessagePublisherService(IOptions options, - ILogger logger) + ILogger logger, + IRabbitMqConnectionFactory rabbitMqConnectionFactory) { Guard.Against.Null(options, nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _rabbitMqConnectionFactory = rabbitMqConnectionFactory ?? throw new ArgumentNullException(nameof(rabbitMqConnectionFactory)); var configuration = options.Value; ValidateConfiguration(configuration); _endpoint = configuration.PublisherSettings[ConfigurationKeys.EndPoint]; - var username = configuration.PublisherSettings[ConfigurationKeys.Username]; - var password = configuration.PublisherSettings[ConfigurationKeys.Password]; - _virtualHost = configuration.SubscriberSettings[ConfigurationKeys.VirtualHost]; - _exchange = configuration.SubscriberSettings[ConfigurationKeys.Exchange]; + _username = configuration.PublisherSettings[ConfigurationKeys.Username]; + _password = configuration.PublisherSettings[ConfigurationKeys.Password]; + _virtualHost = configuration.PublisherSettings[ConfigurationKeys.VirtualHost]; + _exchange = configuration.PublisherSettings[ConfigurationKeys.Exchange]; - var connectionFactory = new ConnectionFactory() - { - HostName = _endpoint, - UserName = username, - Password = password, - VirtualHost = _virtualHost - }; - _connection = connectionFactory.CreateConnection(); + + if (configuration.PublisherSettings.ContainsKey(ConfigurationKeys.UseSSL)) + _useSSL = configuration.PublisherSettings[ConfigurationKeys.UseSSL]; + + + if (configuration.PublisherSettings.ContainsKey(ConfigurationKeys.Port)) + _portNumber = configuration.PublisherSettings[ConfigurationKeys.Port]; + + } private void ValidateConfiguration(MessageBrokerServiceConfiguration configuration) @@ -71,13 +80,8 @@ public Task Publish(string topic, Message message) _logger.PublshingRabbitMq(_endpoint, _virtualHost, _exchange, topic); - using var channel = _connection.CreateModel(); - channel.ExchangeDeclare(_exchange, ExchangeType.Topic); - - var propertiesDictionary = new Dictionary - { - { "CreationDateTime", message.CreationDateTime.ToString("o") } - }; + using var channel = _rabbitMqConnectionFactory.CreateChannel(_endpoint, _username, _password, _virtualHost , _useSSL , _portNumber); + channel.ExchangeDeclare(_exchange, ExchangeType.Topic, durable: true, autoDelete: false); var properties = channel.CreateBasicProperties(); properties.Persistent = true; @@ -85,9 +89,10 @@ public Task Publish(string topic, Message message) properties.MessageId = message.MessageId; properties.AppId = message.ApplicationId; properties.CorrelationId = message.CorrelationId; - properties.DeliveryMode = 2; + properties.DeliveryMode = PersistentDeliveryMode; + properties.Type = message.MessageDescription; + properties.Timestamp = new AmqpTimestamp(message.CreationDateTime.ToUnixTimeSeconds()); - properties.Headers = propertiesDictionary; channel.BasicPublish(exchange: _exchange, routingKey: topic, basicProperties: properties, @@ -100,11 +105,9 @@ protected virtual void Dispose(bool disposing) { if (!_disposedValue) { - if (disposing && _connection != null) + if (disposing) { - _logger.ClosingConnection(); - _connection.Close(); - _connection.Dispose(); + // Dispose any managed objects } _disposedValue = true; @@ -118,4 +121,4 @@ public void Dispose() GC.SuppressFinalize(this); } } -} +} \ No newline at end of file diff --git a/src/Messaging/RabbitMq/RabbitMqMessageSubscriberService.cs b/src/Messaging/RabbitMq/RabbitMqMessageSubscriberService.cs index dcd6207..6e7e34b 100644 --- a/src/Messaging/RabbitMq/RabbitMqMessageSubscriberService.cs +++ b/src/Messaging/RabbitMq/RabbitMqMessageSubscriberService.cs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache License 2.0 using System.Globalization; -using System.Text; using Ardalis.GuardClauses; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -14,20 +13,22 @@ namespace Monai.Deploy.Messaging.RabbitMq { - public class RabbitMqMessageSubscriberService : IMessageBrokerSubscriberService, IDisposable + public class RabbitMqMessageSubscriberService : IMessageBrokerSubscriberService { private readonly ILogger _logger; private readonly string _endpoint; private readonly string _virtualHost; private readonly string _exchange; - private readonly IConnection _connection; + private readonly string _useSSL = string.Empty; + private readonly string _portNumber = string.Empty; private readonly IModel _channel; private bool _disposedValue; public string Name => "Rabbit MQ Subscriber"; public RabbitMqMessageSubscriberService(IOptions options, - ILogger logger) + ILogger logger, + IRabbitMqConnectionFactory rabbitMqConnectionFactory) { Guard.Against.Null(options, nameof(options)); @@ -41,18 +42,19 @@ public RabbitMqMessageSubscriberService(IOptions messageReceivedCallback, ushort prefetchCount = 0) + => Subscribe(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); + + public void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) { - Guard.Against.NullOrWhiteSpace(topic, nameof(topic)); + Guard.Against.Null(topics, nameof(topics)); Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback)); var queueDeclareResult = _channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false); - _channel.QueueBind(queueDeclareResult.QueueName, _exchange, topic); + BindToRoutingKeys(topics, queueDeclareResult.QueueName); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, eventArgs) => { using var loggerScope = _logger.BeginScope(string.Format(CultureInfo.InvariantCulture, Log.LoggingScopeMessageApplication, eventArgs.BasicProperties.MessageId, eventArgs.BasicProperties.AppId)); - _logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, topic); - - var messageReceivedEventArgs = new MessageReceivedEventArgs( - new Message( - body: eventArgs.Body.ToArray(), - messageDescription: topic, - messageId: eventArgs.BasicProperties.MessageId, - applicationId: eventArgs.BasicProperties.AppId, - contentType: eventArgs.BasicProperties.ContentType, - correlationId: eventArgs.BasicProperties.CorrelationId, - creationDateTime: DateTime.Parse(Encoding.UTF8.GetString((byte[])eventArgs.BasicProperties.Headers["CreationDateTime"]), CultureInfo.InvariantCulture), - deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)), - new CancellationToken()); - - messageReceivedCallback(messageReceivedEventArgs); + _logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey); + + MessageReceivedEventArgs messageReceivedEventArgs; + try + { + messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs); + } + catch (Exception ex) + { + _logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + + _logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId); + _channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false); + _logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false); + return; + } + + try + { + messageReceivedCallback(messageReceivedEventArgs); + } + catch (Exception ex) + { + _logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + } + }; + _channel.BasicQos(0, prefetchCount, false); + _channel.BasicConsume(queueDeclareResult.QueueName, false, consumer); + _logger.SubscribeToRabbitMqQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics)); + } + + public void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) + => SubscribeAsync(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); + + public void SubscribeAsync(string[] topics, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) + { + Guard.Against.Null(topics, nameof(topics)); + Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback)); + + var queueDeclareResult = _channel.QueueDeclare(queue: queue, durable: true, exclusive: false, autoDelete: false); + BindToRoutingKeys(topics, queueDeclareResult.QueueName); + + var consumer = new EventingBasicConsumer(_channel); + consumer.Received += async (model, eventArgs) => + { + using var loggerScope = _logger.BeginScope(string.Format(CultureInfo.InvariantCulture, Log.LoggingScopeMessageApplication, eventArgs.BasicProperties.MessageId, eventArgs.BasicProperties.AppId)); + + _logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey); + + MessageReceivedEventArgs messageReceivedEventArgs; + try + { + messageReceivedEventArgs = CreateMessage(eventArgs.RoutingKey, eventArgs); + } + catch (Exception ex) + { + _logger.InvalidMessage(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + + _logger.SendingNAcknowledgement(eventArgs.BasicProperties.MessageId); + _channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: false); + _logger.NAcknowledgementSent(eventArgs.BasicProperties.MessageId, false); + return; + } + try + { + await messageReceivedCallback(messageReceivedEventArgs); + } + catch (Exception ex) + { + _logger.ErrorNotHandledByCallback(queueDeclareResult.QueueName, eventArgs.RoutingKey, eventArgs.BasicProperties.MessageId, ex); + } }; _channel.BasicQos(0, prefetchCount, false); _channel.BasicConsume(queueDeclareResult.QueueName, false, consumer); - _logger.SubscribeToRabbitMqQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, topic); + _logger.SubscribeToRabbitMqQueue(_endpoint, _virtualHost, _exchange, queueDeclareResult.QueueName, string.Join(',', topics)); } public void Acknowledge(MessageBase message) @@ -113,24 +174,23 @@ public void Acknowledge(MessageBase message) _logger.AcknowledgementSent(message.MessageId); } - public void Reject(MessageBase message) + public void Reject(MessageBase message, bool requeue = true) { Guard.Against.Null(message, nameof(message)); _logger.SendingNAcknowledgement(message.MessageId); - _channel.BasicNack(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false, requeue: true); - _logger.NAcknowledgementSent(message.MessageId); + _channel.BasicNack(ulong.Parse(message.DeliveryTag, CultureInfo.InvariantCulture), multiple: false, requeue: requeue); + _logger.NAcknowledgementSent(message.MessageId, requeue); } protected virtual void Dispose(bool disposing) { if (!_disposedValue) { - if (disposing && _connection is not null) + if (disposing) { - _logger.ClosingConnection(); - _connection.Close(); - _connection.Dispose(); + _channel.Close(); + _channel.Dispose(); } _disposedValue = true; @@ -143,5 +203,46 @@ public void Dispose() Dispose(disposing: true); GC.SuppressFinalize(this); } + + private void BindToRoutingKeys(string[] topics, string queue) + { + Guard.Against.Null(topics, nameof(topics)); + Guard.Against.NullOrWhiteSpace(queue, nameof(queue)); + + foreach (var topic in topics) + { + if (!string.IsNullOrEmpty(topic)) + { + _channel.QueueBind(queue, _exchange, topic); + } + } + } + + private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliverEventArgs eventArgs) + { + Guard.Against.NullOrWhiteSpace(topic, nameof(topic)); + Guard.Against.Null(eventArgs, nameof(eventArgs)); + + Guard.Against.Null(eventArgs.Body, nameof(eventArgs.Body)); + Guard.Against.Null(eventArgs.BasicProperties, nameof(eventArgs.BasicProperties)); + Guard.Against.Null(eventArgs.BasicProperties.MessageId, nameof(eventArgs.BasicProperties.MessageId)); + Guard.Against.Null(eventArgs.BasicProperties.AppId, nameof(eventArgs.BasicProperties.AppId)); + Guard.Against.Null(eventArgs.BasicProperties.ContentType, nameof(eventArgs.BasicProperties.ContentType)); + Guard.Against.Null(eventArgs.BasicProperties.CorrelationId, nameof(eventArgs.BasicProperties.CorrelationId)); + Guard.Against.Null(eventArgs.BasicProperties.Timestamp, nameof(eventArgs.BasicProperties.Timestamp)); + Guard.Against.Null(eventArgs.DeliveryTag, nameof(eventArgs.DeliveryTag)); + + return new MessageReceivedEventArgs( + new Message( + body: eventArgs.Body.ToArray(), + messageDescription: eventArgs.BasicProperties.Type, + messageId: eventArgs.BasicProperties.MessageId, + applicationId: eventArgs.BasicProperties.AppId, + contentType: eventArgs.BasicProperties.ContentType, + correlationId: eventArgs.BasicProperties.CorrelationId, + creationDateTime: DateTimeOffset.FromUnixTimeSeconds(eventArgs.BasicProperties.Timestamp.UnixTime), + deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)), + CancellationToken.None); + } } -} +} \ No newline at end of file diff --git a/src/Messaging/SQS/ConfigurationKeys.cs b/src/Messaging/SQS/ConfigurationKeys.cs new file mode 100644 index 0000000..e759b05 --- /dev/null +++ b/src/Messaging/SQS/ConfigurationKeys.cs @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +namespace Monai.Deploy.Messaging.Configuration +{ + internal static class SqsConfigurationKeys + { + public static readonly string AccessKey = "accessKey"; + public static readonly string AccessToken = "accessToken"; + public static readonly string Region = "region"; + public static readonly string WorkflowRequestQueue = "workflowRequestQueue"; + public static readonly string ExportRequestQueue = "exportRequestQueue"; + public static readonly string BucketName = "bucketName"; + public static readonly string Envid = "environmentId"; + + public static readonly string[] PublisherRequiredKeys = new[] { WorkflowRequestQueue, BucketName }; + public static readonly string[] SubscriberRequiredKeys = new[] { ExportRequestQueue, BucketName }; + } +} diff --git a/src/Messaging/SQS/Log.cs b/src/Messaging/SQS/Log.cs new file mode 100644 index 0000000..9a989c8 --- /dev/null +++ b/src/Messaging/SQS/Log.cs @@ -0,0 +1,49 @@ +using Microsoft.Extensions.Logging; + +namespace Monai.Deploy.Messaging.SQS +{ + public static partial class Log + { + internal static readonly string LoggingScopeMessageApplication = "Message ID={0}. Application ID={1}."; + + + [LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message {MessageId} to Queue={topic}.")] + public static partial void PublishingToSQS(this ILogger logger, string topic, string MessageId); + + [LoggerMessage(EventId = 10001, Level = LogLevel.Information, Message = "{ServiceName} connecting to SQS.")] + public static partial void ConnectingToSQS(this ILogger logger, string serviceName); + + [LoggerMessage(EventId = 10002, Level = LogLevel.Information, Message = "Message received from queue {queue}.")] + public static partial void MessageReceivedFromQueue(this ILogger logger, string queue); + + [LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}. Queue={queue}.")] + public static partial void SubscribeToSQSQueue(this ILogger logger, string endpoint, string queue); + + [LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")] + public static partial void SendingAcknowledgement(this ILogger logger, string messageId); + + [LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}.")] + public static partial void AcknowledgementSent(this ILogger logger, string messageId); + + [LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")] + public static partial void SendingNAcknowledgement(this ILogger logger, string messageId); + + [LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")] + public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue); + + [LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")] + public static partial void ClosingConnections(this ILogger logger); + + [LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Message ID={messageId}.")] + public static partial void InvalidMessage(this ILogger logger, string queueName, string messageId, Exception ex); + + [LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Message ID={messageId}.")] + public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string messageId, Exception ex); + + [LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Creating SQS client.")] + public static partial void CreateSQSClient(this ILogger logger); + + [LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "{ServiceName} failed to connect to SQS.")] + public static partial void ConnectingToSQSError(this ILogger logger, string serviceName, Exception ex); + } +} diff --git a/src/Messaging/SQS/QueueFormatter.cs b/src/Messaging/SQS/QueueFormatter.cs new file mode 100644 index 0000000..4c70821 --- /dev/null +++ b/src/Messaging/SQS/QueueFormatter.cs @@ -0,0 +1,28 @@ +using System.Text.RegularExpressions; + +namespace Monai.Deploy.Messaging.SQS +{ + internal static class QueueFormatter + { + /// + /// Returns an aggregate of the the environmentId, queueBasename nd topic as the name of the queue defined in SQS. + /// The returned string is made compliant to SQS naming convention : It will replace non alphanumeric and other characters than "_" and "-", by an hyphen + /// + /// + /// + /// + /// string + public static string FormatQueueName(string environmentId, string? queuebasename, string topic) + { + + string queue = $"{queuebasename}_{topic}"; + + if (!string.IsNullOrEmpty(environmentId)) + queue = $"{environmentId}_{queue}"; + queue = Regex.Replace(queue, "[^a-zA-Z0-9_]", "-"); + if (queue.Length > 80) + queue = queue.Substring(0, 80); + return queue; + } + } +} diff --git a/src/Messaging/SQS/README.MD b/src/Messaging/SQS/README.MD new file mode 100644 index 0000000..09d24f1 --- /dev/null +++ b/src/Messaging/SQS/README.MD @@ -0,0 +1,200 @@ +

+project-monai +

+ +💡 If you want to know more about MONAI Deploy WG vision, overall structure, and guidelines, please read [MONAI Deploy](https://github.com/Project-MONAI/monai-deploy) first. + +# MONAI Deploy Messaging SQS Plug-In +AWS SQS plugin for the messaging layer of MONAI Deploy. + +## Information: +This plugin can be used as a messaging mechanism alternative to RabbitMQ, and allows MONAI Deploy to integrate with AWS Simple Queue Service. Unlike RabbitMQ, SQS does not have a concept of vhost, exchange and routing key; instead the plugin will create one specific queue for each vhost, exchange and routing key combibation. + + + + + + + + + + + + + + + + + +
+ RMQ plugin + + SQS plugin + Description +
vhostenvironmentIdThe vhost namespace is replaced be the parameter environmentId. This parmater is used as a prefix for each queue name, allowing mutliple MONAI deploy installations on the same AWS account with not queue name conflict.
exchange and routing keyworkflowRequestQueue and exportRequestQueueThese parmaeters control the name of the queues for each purpose. Each queue name will be suffixed with the name routing key.
+ +* Queue names generated by the plugin have the following name convention : [environmentId]_[RequestQueue]_[routingKey]. +* Queue names will be less or equal to 80 characters long. The name will be truncated if the combinatino of environmentId, RequestQueue and routingKey exceeds this limit. +* Any non-alphanumerical character other than "-" and "\_" found in the environmentId, RequestQueue, parameters or routing key variables will be replaced by "\_". +* The queues are created automatically upon the 1st message publication/subscription if they do not already exist. +* Because MONAI Deploy can generate payloads greater than 256KBytes, the plugin leverages the SQS extended client, allowing payload up to 2GB in size. The use of this specific client mandates the usage of an S3 bucket as transient store for the messages to be held until their delivery to the queue subscriber. More information is available below in this documentation about IAM privileges, SQS and S3 bucket requirements. + + + +## Plugin activation + +Because MONAI Deploy plugin management work is still in progress (as of 06/13/2022), plugins cannot be loaded at run-time. Instead this SQS pluging can be activated by doing small code changes to the MONAI Informatic Gateway project https://github.com/Project-MONAI/monai-deploy-informatics-gateway, in the `Program.cs` file and recompiling. + +In the declaration for `CreateHostBuilder`, locate the following code block: + + services.UseRabbitMq(); + services.AddSingleton(); + services.AddSingleton(implementationFactory => + { + var options = implementationFactory.GetService>(); + var serviceProvider = implementationFactory.GetService(); + var logger = implementationFactory.GetService>(); + return serviceProvider.LocateService(logger, options.Value.Messaging.PublisherServiceAssemblyName); + }); + + services.AddSingleton(); + services.AddSingleton(implementationFactory => + { + var options = implementationFactory.GetService>(); + var serviceProvider = implementationFactory.GetService(); + var logger = implementationFactory.GetService>(); + return serviceProvider.LocateService(logger, options.Value.Messaging.SubscriberServiceAssemblyName); + }); + + + +and alter it by commenting the line `services.UseRabbitMq();`, replace `services.AddSingleton();` by `services.AddSingleton();` and the line `services.AddSingleton();` by `services.AddSingleton();`. The code block should now look like this : + + + //services.UseRabbitMq(); + services.AddSingleton(); + services.AddSingleton(implementationFactory => + { + var options = implementationFactory.GetService>(); + var serviceProvider = implementationFactory.GetService(); + var logger = implementationFactory.GetService>(); + return serviceProvider.LocateService(logger, options.Value.Messaging.PublisherServiceAssemblyName); + }); + + services.AddSingleton(); + services.AddSingleton(implementationFactory => + { + var options = implementationFactory.GetService>(); + var serviceProvider = implementationFactory.GetService(); + var logger = implementationFactory.GetService>(); + return serviceProvider.LocateService(logger, options.Value.Messaging.SubscriberServiceAssemblyName); + }); + +## MONAI Informatic Gateway Configuration + +The plugin is configured in the Messaging section of `appsettings.json` / `appsettings.Development.json` : + +```json + "messaging": { + "publisherServiceAssemblyName": "Monai.Deploy.Messaging.SQS.SQSMessagePublisherService, Monai.Deploy.Messaging", + "subscriberServiceAssemblyName": "Monai.Deploy.Messaging.SQS.SqsMessageSubscriberService, Monai.Deploy.Messaging", + "publisherSettings": { + "bucketName": "monai-minio", + "workflowRequestQueue": "workflow_tasks", + "environmentId": "monai-1", + "accessKey": "ASDFGHJKLADF123456789", + "accessToken": "QwErTyUiOpAsDonMB88W1mcCCwQdePe8X27SEu1S" + }, + "subscriberSettings": { + "exportRequestQueue": "export_tasks", + "bucketName": "monai-minio", + "environmentId": "monai-1", + "accessKey": "ASDFGHJKLADF123456789", + "accessToken": "QwErTyUiOpAsDonMB88W1mcCCwQdePe8X27SEu1S" + } + }, +``` + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
bucketNameS3 bucket used to store the messages temporarily until the subscriber gets it.
workflowRequestQueueQueue prefix for the workflow requests ( MIG -> Workflow Manager ). This parameter is only useful in the PublisherSettings section.
exportRequestQueueQueue prefix for the export requests ( Workflow Manager -> MIG ). This parameter is only useful in the SubscriberSettings section.
bucketNameS3 bucket used to store the messages temporarily until the subscriber gets it.
environmentIdA prefix used to identify the MONAI environment. This allows to create multiple environments within a single AWS account without queue name conflict. This parameter allows for a configuration comparable to the vhost concept in RabbitMq.
accessKeyAWS IAM user access key. This parameter is optional. If not present the plugin will fallback to local credentials, then EC2 role. If this parameter is used the parameter accessToken is also required. Refer to the section IAM privileges for IAM configuration.
accessTokenAWS IAM user access token. This parameter is only required when the parameter accesskey is provided.
+ +## IAM Privileges + +For the plugin to function a set of specific privileges need to be provided. The permissions can be associated either with an IAM user ( by using accessKey and accessToken in the configuratio file ) or by running MONAI on an EC2 instance associated with an IAM Role granted for the following privileges: + +Replace the tags below in the below policy as follow : + +[AWS_Account] : The AWS Account ID ( numeric ) +[EnvironmentId] : The Environment Id use int Subscriber and Publisher settings of the Messaging seciton of `appsettings.json` / `appsettings.Development.json`. +[BucketName] : the name of the S3 bucket used to store the messages temporarily. The same bucket as the one used to store incoming DICOM objects can be used if desired. ( see the AWS S3 plugin to use S3 natively with MONAI Deploy ) + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": [ + "sqs:DeleteMessage", + "s3:PutObject", + "s3:GetObject", + "sqs:GetQueueUrl", + "sqs:ReceiveMessage", + "sqs:SendMessage", + "sqs:GetQueueAttributes", + "sqs:CreateQueue", + "s3:DeleteObject", + "sqs:SetQueueAttributes" + ], + "Resource": [ + "arn:aws:sqs:*:[AWS_Account]:[EnvironmentId]", + "arn:aws:s3:::[BucketName]/*" + ] + } + ] +} +``` + +## Contributing + +For guidance on making a contribution to MONAI Deploy Workflow Manager, see the [contributing guidelines](https://github.com/Project-MONAI/monai-deploy/blob/main/CONTRIBUTING.md). + +Join the conversation on Twitter [@ProjectMONAI](https://twitter.com/ProjectMONAI) or join our [Slack channel](https://forms.gle/QTxJq3hFictp31UM9). + +Ask and answer questions over on [MONAI Deploy Workflow Manager's GitHub Discussions tab](https://github.com/Project-MONAI/monai-deploy-workflow-manager/discussions). + +## Links + +- Website: +- Code: +- Project tracker: +- Issue tracker: +- Test status: diff --git a/src/Messaging/SQS/SQSMessagePublisherService.cs b/src/Messaging/SQS/SQSMessagePublisherService.cs new file mode 100644 index 0000000..071431e --- /dev/null +++ b/src/Messaging/SQS/SQSMessagePublisherService.cs @@ -0,0 +1,200 @@ +// SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.Globalization; +using System.Text; +using Amazon.S3; +using Amazon.SQS; +using Amazon.SQS.ExtendedClient; +using Amazon.SQS.Model; +using Ardalis.GuardClauses; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.Configuration; + +namespace Monai.Deploy.Messaging.SQS +{ + public class SqsMessagePublisherService : IMessageBrokerPublisherService + { + + private readonly ILogger _logger; + private readonly string? _accessKey; + private readonly string? _accessToken; + private readonly string _environmentId = string.Empty; + private bool _disposedValue; + + + public string Name => "AWS SQS Publisher"; + private readonly string _queueName; + private readonly AmazonSQSClient _sqsClient; + private readonly AmazonS3Client _s3Client; + private readonly AmazonSQSExtendedClient _sqSExtendedClient; + + public SqsMessagePublisherService(IOptions options, + ILogger logger) + { + Guard.Against.Null(options, nameof(options)); + + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + var configuration = options.Value; + ValidateConfiguration(configuration); + + _queueName = configuration.PublisherSettings[SqsConfigurationKeys.WorkflowRequestQueue]; + string bucketName = configuration.PublisherSettings[SqsConfigurationKeys.BucketName]; + + + if (configuration.PublisherSettings.ContainsKey(SqsConfigurationKeys.AccessKey)) + { + _logger.LogInformation("accessKey found in configuration."); + _accessKey = configuration.PublisherSettings[SqsConfigurationKeys.AccessKey]; + } + + + if (configuration.PublisherSettings.ContainsKey(SqsConfigurationKeys.AccessToken)) + { + _logger.LogInformation("accessToken found in configuration."); + _accessToken = configuration.PublisherSettings[SqsConfigurationKeys.AccessToken]; + } + + if (configuration.PublisherSettings.ContainsKey(SqsConfigurationKeys.Envid)) + _environmentId = configuration.PublisherSettings[SqsConfigurationKeys.Envid]; + + + _logger.ConnectingToSQS(Name); + + if (!(_accessKey is null) && !(_accessToken is null)) + { + _logger.LogInformation("Assuming IAM user as found in the configuration file."); + _sqsClient = new AmazonSQSClient(_accessKey, _accessToken); + _s3Client = new AmazonS3Client(_accessKey, _accessToken); + } + else + { + _logger.LogInformation("Attempting to assume local AWS credentials."); + _sqsClient = new AmazonSQSClient(); + _s3Client = new AmazonS3Client(); + } + + _sqSExtendedClient = new AmazonSQSExtendedClient(_sqsClient, + new ExtendedClientConfiguration().WithLargePayloadSupportEnabled(_s3Client, bucketName)); + + + } + + private void ValidateConfiguration(MessageBrokerServiceConfiguration configuration) + { + Guard.Against.Null(configuration, nameof(configuration)); + Guard.Against.Null(configuration.PublisherSettings, nameof(configuration.PublisherSettings)); + + foreach (var key in ConfigurationKeys.PublisherRequiredKeys) + { + if (!configuration.PublisherSettings.ContainsKey(key)) + { + throw new ConfigurationException($"{Name} is missing configuration for {key}."); + } + } + } + + public Task Publish(string topic, Monai.Deploy.Messaging.Messages.Message message) + { + + Guard.Against.NullOrWhiteSpace(topic, nameof(topic)); + Guard.Against.Null(message, nameof(message)); + + + using var loggerScope = _logger.BeginScope(string.Format(CultureInfo.InvariantCulture, Log.LoggingScopeMessageApplication, message.MessageId, message.ApplicationId)); + _logger.PublishingToSQS(topic, message.MessageId); + var sendMessageRequest = new SendMessageRequest(); + + Dictionary MessageAttributes = new Dictionary(); + MessageAttributeValue messageIdAttribute = new MessageAttributeValue(); + messageIdAttribute.DataType = "String"; + messageIdAttribute.StringValue = message.MessageId; + MessageAttributes.Add("MessageId", messageIdAttribute); + + MessageAttributeValue ContentTypeAttribute = new MessageAttributeValue(); + ContentTypeAttribute.DataType = "String"; + ContentTypeAttribute.StringValue = message.ContentType; + MessageAttributes.Add("ContentType", ContentTypeAttribute); + + + MessageAttributeValue ApplicationIdAttribute = new MessageAttributeValue(); + ApplicationIdAttribute.DataType = "String"; + ApplicationIdAttribute.StringValue = message.MessageId; + MessageAttributes.Add("ApplicationId", ApplicationIdAttribute); + + sendMessageRequest.MessageAttributes = MessageAttributes; + + + Console.WriteLine("Message information : "); + Console.WriteLine(message); + Console.WriteLine(message.Body); + Console.WriteLine(message.Body.Length); + + + string queueName = QueueFormatter.FormatQueueName(_environmentId, _queueName, topic); + _logger.LogDebug($"Attempting to create or subscribe to {queueName}"); + + var queueAttributes = new Dictionary(); + + queueAttributes.Add("KmsMasterKeyId", "alias/aws/sqs"); + var request = new CreateQueueRequest + { + Attributes = queueAttributes, + QueueName = queueName + }; + + CreateQueueResponse createQueueResponse = new CreateQueueResponse(); + try + { + createQueueResponse = _sqSExtendedClient.CreateQueueAsync(request).Result; + } + catch (Exception ex) + { + _logger.LogDebug($"The queue could not be created or subscribed to: {ex.Message}"); + } + + sendMessageRequest.QueueUrl = createQueueResponse.QueueUrl; + + + + sendMessageRequest.MessageBody = Encoding.UTF8.GetString(message.Body, 0, message.Body.Length); + + try + { + _sqSExtendedClient.SendMessageAsync(sendMessageRequest).Wait(); + } + catch (Exception e) + { + _logger.LogError($"The message could not be posted to the queue {queueName} : \n {e.Message}"); + } + + + return Task.CompletedTask; + } + + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _sqSExtendedClient.Dispose(); + _s3Client.Dispose(); + _sqsClient.Dispose(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } +} diff --git a/src/Messaging/SQS/SQSMessageSubscriberService.cs b/src/Messaging/SQS/SQSMessageSubscriberService.cs new file mode 100644 index 0000000..a20e0ed --- /dev/null +++ b/src/Messaging/SQS/SQSMessageSubscriberService.cs @@ -0,0 +1,253 @@ +// SPDX-FileCopyrightText: © 2021-2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.Text; +using Amazon.S3; +using Amazon.SQS; +using Amazon.SQS.ExtendedClient; +using Amazon.SQS.Model; +using Ardalis.GuardClauses; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Configuration; +using Monai.Deploy.Messaging.Messages; +using Newtonsoft.Json.Linq; + +namespace Monai.Deploy.Messaging.SQS +{ + public class SqsMessageSubscriberService : IMessageBrokerSubscriberService + { + private readonly ILogger _logger; + private bool _disposedValue; + private readonly string? _accessKey; + private readonly string? _accessToken; + private readonly string? _queueName; + private readonly string _environmentId = string.Empty; + + private readonly AmazonSQSClient _sqsClient; + private readonly AmazonS3Client _s3Client; + private readonly AmazonSQSExtendedClient _sqSExtendedClient; + + public string Name => "AWS SQS Subscriber"; + + public SqsMessageSubscriberService(IOptions options, + ILogger logger) + { + Guard.Against.Null(options, nameof(options)); + + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + var configuration = options.Value; + ValidateConfiguration(configuration); + _queueName = configuration.SubscriberSettings[SqsConfigurationKeys.ExportRequestQueue]; + string bucketName = configuration.SubscriberSettings[SqsConfigurationKeys.BucketName]; + + + if (configuration.SubscriberSettings.ContainsKey(SqsConfigurationKeys.AccessKey)) + _accessKey = configuration.SubscriberSettings[SqsConfigurationKeys.AccessKey]; + + + if (configuration.SubscriberSettings.ContainsKey(SqsConfigurationKeys.AccessToken)) + _accessToken = configuration.SubscriberSettings[SqsConfigurationKeys.AccessToken]; + + + if (configuration.SubscriberSettings.ContainsKey(SqsConfigurationKeys.Envid)) + _environmentId = configuration.SubscriberSettings[SqsConfigurationKeys.Envid]; + + + _logger.ConnectingToSQS(Name); + + if (!(_accessKey is null) && !(_accessToken is null)) + { + _logger.LogInformation("Assuming IAM user as found in the configuration file."); + _sqsClient = new AmazonSQSClient(_accessKey, _accessToken); + _s3Client = new AmazonS3Client(_accessKey, _accessToken); + } + else + { + _logger.LogInformation("Attempting to assume local AWS credentials."); + _sqsClient = new AmazonSQSClient(); + _s3Client = new AmazonS3Client(); + } + + _sqSExtendedClient = new AmazonSQSExtendedClient(_sqsClient, + new ExtendedClientConfiguration().WithLargePayloadSupportEnabled(_s3Client, bucketName)); + + + } + + + + private void ValidateConfiguration(MessageBrokerServiceConfiguration configuration) + { + Guard.Against.Null(configuration, nameof(configuration)); + Guard.Against.Null(configuration.SubscriberSettings, nameof(configuration.SubscriberSettings)); + + foreach (var key in SqsConfigurationKeys.SubscriberRequiredKeys) + { + if (!configuration.SubscriberSettings.ContainsKey(key)) + { + throw new ConfigurationException($"{Name} is missing configuration for {key}."); + } + } + } + + public void Subscribe(string topic, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) + => Subscribe(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); + + public void Subscribe(string[] topics, string queue, Action messageReceivedCallback, ushort prefetchCount = 0) + { + Guard.Against.Null(topics, nameof(topics)); + Guard.Against.Null(messageReceivedCallback, nameof(messageReceivedCallback)); + + foreach (string topic in topics) + { + Task.Run(() => + { + QueueRunner(topic, messageReceivedCallback); + }); + } + + } + + + private void QueueRunner(string topic, Action messageReceivedCallback) + { + try + { + string queueName = QueueFormatter.FormatQueueName(_environmentId, _queueName, topic); + _logger.LogDebug($"Attempting to create or subscribe to {queueName}"); + + var queueAttributes = new Dictionary(); + + queueAttributes.Add("KmsMasterKeyId", "alias/aws/sqs"); + var request = new CreateQueueRequest + { + Attributes = queueAttributes, + QueueName = queueName + }; + + CreateQueueResponse createQueueResponse = _sqSExtendedClient.CreateQueueAsync(request).Result; + + while (true) + { + List AttributesList = new List(); + AttributesList.Add("*"); + + var messageResponse = _sqSExtendedClient.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = createQueueResponse.QueueUrl, + WaitTimeSeconds = 2, + AttributeNames = new List { "All" }, + MessageAttributeNames = new List { "All" } + }).Result; + var messages = messageResponse.Messages; + + if (messages.Any()) + { + foreach (var msg in messages) + { + _logger.Log(LogLevel.Debug, $"Message {msg.MessageId} received from SQS."); + MessageReceivedEventArgs messageReceivedEventArgs = CreateMessage(msg); + try + { + _logger.AcknowledgementSent(msg.MessageId); + _sqSExtendedClient.DeleteMessageAsync(new DeleteMessageRequest { QueueUrl = createQueueResponse.QueueUrl, ReceiptHandle = msg.ReceiptHandle }).Wait(); + messageReceivedCallback(messageReceivedEventArgs); + } + catch (Exception ex) + { + _logger.Log(LogLevel.Error, ex.Message); + } + } + } + } + } + catch (Exception ex) + { + _logger.LogDebug(ex.Message); + } + } + + public void SubscribeAsync(string topic, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) + => SubscribeAsync(new string[] { topic }, queue, messageReceivedCallback, prefetchCount); + + public void SubscribeAsync(string[] topics, string queue, Func messageReceivedCallback, ushort prefetchCount = 0) + { + throw new NotImplementedException(); + + } + + public void Acknowledge(MessageBase message) + { + //No Acknowleddgement necessary with SQS. To delete the processed message is sufficient. + } + + public void Reject(MessageBase message, bool requeue = true) + { + Guard.Against.Null(message, nameof(message)); + + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + _sqSExtendedClient.Dispose(); + _s3Client.Dispose(); + _sqsClient.Dispose(); + } + _disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + + private static MessageReceivedEventArgs CreateMessage(Amazon.SQS.Model.Message msg) + { + + Guard.Against.Null(msg, nameof(msg)); + Guard.Against.Null(msg.Body, nameof(msg.Body)); + Guard.Against.Null(msg.Attributes, nameof(msg.Attributes)); + Guard.Against.Null(msg.MessageId, nameof(msg.MessageId)); + + JObject bodyobj = JObject.Parse(msg.Body); + string messageId = string.Empty; + string correlationId = string.Empty; + + + JToken? messageIdtoken = bodyobj["MessageId"]; + if (messageIdtoken != null) + messageId = messageIdtoken.ToString(); + + JToken? correlationIdtoken = bodyobj["correlation_id"]; + if (correlationIdtoken != null) + correlationId = correlationIdtoken.ToString(); + + string contentType = msg.MessageAttributes["ContentType"].ToString(); + DateTimeOffset SentTimestamp = DateTimeOffset.FromUnixTimeMilliseconds(Int64.Parse(msg.Attributes["SentTimestamp"])); + + + return new MessageReceivedEventArgs( + new Monai.Deploy.Messaging.Messages.Message( + body: Encoding.UTF8.GetBytes(msg.Body), + messageDescription: contentType, + messageId: messageId, + applicationId: msg.Attributes["SenderId"], + contentType: contentType, + correlationId: correlationId, + creationDateTime: SentTimestamp, + deliveryTag: msg.ReceiptHandle) + , CancellationToken.None); + } + } +} diff --git a/src/Messaging/Test/DummyTest.cs b/src/Messaging/Test/DummyTest.cs deleted file mode 100644 index 1e16c86..0000000 --- a/src/Messaging/Test/DummyTest.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Xunit; - -namespace Monai.Deploy.Messaging.Test -{ - public class DummyTest - { - [Fact] - public void ToBeDeleted() - { - Assert.True(true); - } - } -} diff --git a/src/Messaging/Test/EventBaseTest.cs b/src/Messaging/Test/EventBaseTest.cs new file mode 100644 index 0000000..c082d01 --- /dev/null +++ b/src/Messaging/Test/EventBaseTest.cs @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Events; +using Xunit; + +namespace Monai.Deploy.Messaging.Test +{ + internal class StringClass : EventBase + { + [Required] + public string? StringField { get; set; } + } + + internal class NestedStringClass : EventBase + { + [Required] + public StringClass? NestedStringField { get; set; } + } + + internal class NestedStringCollectionClass : EventBase + { + [Required, MinLength(1)] + public IList? NestedStrings { get; set; } + } + + public class EventBaseTest + { + [Fact(DisplayName = "Validates StringClass")] + public void ValidatesStringField() + { + var obj = new StringClass(); + var validationException = Assert.Throws(() => obj.Validate()); + Assert.Equal($"Invalid message: The {nameof(obj.StringField)} field is required. Path: {nameof(obj.StringField)}.", validationException.Message); + + obj.StringField = "hello"; + var exception = Record.Exception(() => obj.Validate()); + Assert.Null(exception); + } + + [Fact(DisplayName = "Validates NestedStringClass")] + public void ValidatesNestedStringField() + { + var obj = new NestedStringClass(); + var validationException = Assert.Throws(() => obj.Validate()); + Assert.Equal($"Invalid message: The {nameof(obj.NestedStringField)} field is required. Path: {nameof(obj.NestedStringField)}.", validationException.Message); + + obj.NestedStringField = new StringClass(); + validationException = Assert.Throws(() => obj.Validate()); + Assert.Equal($"Invalid message: The {nameof(obj.NestedStringField.StringField)} field is required. Path: {nameof(NestedStringClass)}.{nameof(obj.NestedStringField)}.{nameof(obj.NestedStringField.StringField)}.", validationException.Message); + + obj.NestedStringField.StringField = "hello"; + var exception = Record.Exception(() => obj.Validate()); + Assert.Null(exception); + } + + [Fact(DisplayName = "Validates NestedStringCollectionClass")] + public void ValidatesNestedStringCollectionClassField() + { + var obj = new NestedStringCollectionClass(); + var validationException = Assert.Throws(() => obj.Validate()); + Assert.Equal($"Invalid message: The {nameof(obj.NestedStrings)} field is required. Path: {nameof(obj.NestedStrings)}.", validationException.Message); + + obj.NestedStrings = new List(); + validationException = Assert.Throws(() => obj.Validate()); + Assert.Equal($"Invalid message: The field {nameof(obj.NestedStrings)} must be a string or array type with a minimum length of '1'. Path: {nameof(obj.NestedStrings)}.", validationException.Message); + + var stringClass = new StringClass(); + obj.NestedStrings = new List() { stringClass }; + validationException = Assert.Throws(() => obj.Validate()); + Assert.Equal($"Invalid message: The {nameof(stringClass.StringField)} field is required. Path: {nameof(NestedStringCollectionClass)}.{nameof(obj.NestedStrings)}.{nameof(StringClass.StringField)}.", validationException.Message); + + stringClass.StringField = "Hello World!"; + var exception = Record.Exception(() => obj.Validate()); + Assert.Null(exception); + } + } +} diff --git a/src/Messaging/Test/ExportCompleteEventTest.cs b/src/Messaging/Test/ExportCompleteEventTest.cs new file mode 100644 index 0000000..6dcae2f --- /dev/null +++ b/src/Messaging/Test/ExportCompleteEventTest.cs @@ -0,0 +1,67 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using System.Collections.Generic; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Events; +using Xunit; + +namespace Monai.Deploy.Messaging.Test +{ + public class ExportCompleteEventTest + { + [Theory(DisplayName = "Shall generate ExportCompleteMessageTest from ExportRequestMessage")] + [InlineData(1, 0, ExportStatus.Success)] + [InlineData(0, 5, ExportStatus.Failure)] + [InlineData(3, 3, ExportStatus.PartialFailure)] + public void ShallGenerateExportCompleteMessageTestFromExportRequestMessage(int successded, int fialure, ExportStatus status) + { + var exportRequestMessage = new ExportRequestEvent + { + CorrelationId = Guid.NewGuid().ToString(), + DeliveryTag = Guid.NewGuid().ToString(), + Destination = Guid.NewGuid().ToString(), + ExportTaskId = Guid.NewGuid().ToString(), + MessageId = Guid.NewGuid().ToString(), + WorkflowId = Guid.NewGuid().ToString(), + SucceededFiles = successded, + FailedFiles = fialure, + }; + exportRequestMessage.Files = new List() + { + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + }; + + var errors = new List() + { + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + Guid.NewGuid().ToString(), + }; + + exportRequestMessage.AddErrorMessages(errors); + + var exportCompleteMessage = new ExportCompleteEvent(exportRequestMessage); + + Assert.Equal(exportRequestMessage.WorkflowId, exportCompleteMessage.WorkflowId); + Assert.Equal(exportRequestMessage.ExportTaskId, exportCompleteMessage.ExportTaskId); + Assert.Equal(string.Join(System.Environment.NewLine, errors), exportCompleteMessage.Message); + Assert.Equal(status, exportCompleteMessage.Status); + } + + [Fact(DisplayName = "Validation shall throw on error")] + public void ValidationShallThrowOnError() + { + var exportCompleteEvent = new ExportCompleteEvent(); + + Assert.Throws(() => exportCompleteEvent.Validate()); + } + } +} diff --git a/src/Messaging/Test/JsonMessageTest.cs b/src/Messaging/Test/JsonMessageTest.cs new file mode 100644 index 0000000..1bab7b7 --- /dev/null +++ b/src/Messaging/Test/JsonMessageTest.cs @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Messages; +using Xunit; + +namespace Monai.Deploy.Messaging.Test +{ + public class DummyTypeOne + { + public string? MyProperty { get; set; } + } + + public class DummyTypeTwo + { + public int MyProperty { get; set; } + } + + public class JsonMessageTest + { + [Fact(DisplayName = "Convert throws on different type")] + public void ConvertsThrowsError() + { + var data = new DummyTypeOne { MyProperty = "hello world" }; + var jsonMessage = new JsonMessage(data, Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString()); + var message = jsonMessage.ToMessage(); + + + Assert.Throws(() => message.ConvertTo()); + Assert.Throws(() => message.ConvertToJsonMessage()); + } + [Fact(DisplayName = "Converts JsonMessage to Message")] + public void ConvertsJsonMessageToMessage() + { + var expected = "hello world"; + var jsonMessage = new JsonMessage(expected, Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString()); + var message = jsonMessage.ToMessage(); + + Assert.Equal(jsonMessage.ApplicationId, message.ApplicationId); + Assert.Equal(jsonMessage.CreationDateTime, message.CreationDateTime); + Assert.Equal(jsonMessage.ContentType, message.ContentType); + Assert.Equal(jsonMessage.CorrelationId, message.CorrelationId); + Assert.Equal(jsonMessage.DeliveryTag, message.DeliveryTag); + Assert.Equal(jsonMessage.MessageDescription, message.MessageDescription); + Assert.Equal(jsonMessage.MessageId, message.MessageId); + + var result = message.ConvertTo(); + + Assert.Equal(expected, result); + } + + [Fact(DisplayName = "Converts Message to JsonMessage")] + public void ConvertsMessageToJsonMessage() + { + var expected = "hello world"; + var jsonMessage = new JsonMessage(expected, Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), Guid.NewGuid().ToString()); + var message = jsonMessage.ToMessage(); + + Assert.Equal(jsonMessage.ApplicationId, message.ApplicationId); + Assert.Equal(jsonMessage.CreationDateTime, message.CreationDateTime); + Assert.Equal(jsonMessage.ContentType, message.ContentType); + Assert.Equal(jsonMessage.CorrelationId, message.CorrelationId); + Assert.Equal(jsonMessage.DeliveryTag, message.DeliveryTag); + Assert.Equal(jsonMessage.MessageDescription, message.MessageDescription); + Assert.Equal(jsonMessage.MessageId, message.MessageId); + + var result = message.ConvertToJsonMessage(); + + Assert.Equal(expected, result.Body); + Assert.Equal(jsonMessage.ApplicationId, result.ApplicationId); + Assert.Equal(jsonMessage.CreationDateTime, result.CreationDateTime); + Assert.Equal(jsonMessage.ContentType, result.ContentType); + Assert.Equal(jsonMessage.CorrelationId, result.CorrelationId); + Assert.Equal(jsonMessage.DeliveryTag, result.DeliveryTag); + Assert.Equal(jsonMessage.MessageDescription, result.MessageDescription); + Assert.Equal(jsonMessage.MessageId, result.MessageId); + } + } +} diff --git a/src/Messaging/Test/Monai.Deploy.Messaging.Test.csproj b/src/Messaging/Test/Monai.Deploy.Messaging.Test.csproj index 9f77f5f..d3833e5 100644 --- a/src/Messaging/Test/Monai.Deploy.Messaging.Test.csproj +++ b/src/Messaging/Test/Monai.Deploy.Messaging.Test.csproj @@ -9,6 +9,7 @@ + runtime; build; native; contentfiles; analyzers; buildtransitive @@ -20,4 +21,8 @@ + + + + diff --git a/src/Messaging/Test/RabbitMq/RabbitMqMessagePublisherServiceTest.cs b/src/Messaging/Test/RabbitMq/RabbitMqMessagePublisherServiceTest.cs new file mode 100644 index 0000000..dedb976 --- /dev/null +++ b/src/Messaging/Test/RabbitMq/RabbitMqMessagePublisherServiceTest.cs @@ -0,0 +1,82 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.Configuration; +using Monai.Deploy.Messaging.Messages; +using Monai.Deploy.Messaging.RabbitMq; +using Moq; +using RabbitMQ.Client; +using Xunit; + +namespace Monai.Deploy.Messaging.Test.RabbitMq +{ + public class RabbitMqMessagePublisherServiceTest + { + private readonly IOptions _options; + private readonly Mock> _logger; + private readonly Mock _connectionFactory; + private readonly Mock _model; + + public RabbitMqMessagePublisherServiceTest() + { + _options = Options.Create(new MessageBrokerServiceConfiguration()); + _logger = new Mock>(); + _connectionFactory = new Mock(); + _model = new Mock(); + + _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(),It.IsAny(),It.IsAny())) + .Returns(_model.Object); + } + + [Fact(DisplayName = "Fails to validate when required keys are missing")] + public void FailsToValidateWhenRequiredKeysAreMissing() + { + Assert.Throws(() => new RabbitMqMessagePublisherService(_options, _logger.Object, _connectionFactory.Object)); + } + + [Fact(DisplayName = "Publishes a message")] + public async Task PublishesAMessage() + { + _options.Value.PublisherSettings.Add(ConfigurationKeys.EndPoint, "endpoint"); + _options.Value.PublisherSettings.Add(ConfigurationKeys.Username, "username"); + _options.Value.PublisherSettings.Add(ConfigurationKeys.Password, "password"); + _options.Value.PublisherSettings.Add(ConfigurationKeys.VirtualHost, "virtual-host"); + _options.Value.PublisherSettings.Add(ConfigurationKeys.Exchange, "exchange"); + + var basicProperties = new Mock(); + _model.Setup(p => p.CreateBasicProperties()).Returns(basicProperties.Object); + _model.Setup(p => p.BasicPublish( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>())); + + var service = new RabbitMqMessagePublisherService(_options, _logger.Object, _connectionFactory.Object); + + var jsonMessage = new JsonMessage("hello world", Guid.NewGuid().ToString(), Guid.NewGuid().ToString()); + var message = jsonMessage.ToMessage(); + await service.Publish("topic", message).ConfigureAwait(false); + + basicProperties.VerifySet(p => p.Persistent = true); + basicProperties.VerifySet(p => p.ContentType = jsonMessage.ContentType); + basicProperties.VerifySet(p => p.MessageId = jsonMessage.MessageId); + basicProperties.VerifySet(p => p.AppId = jsonMessage.ApplicationId); + basicProperties.VerifySet(p => p.CorrelationId = jsonMessage.CorrelationId); + basicProperties.VerifySet(p => p.DeliveryMode = 2); + + _model.Verify(p => p.BasicPublish( + It.Is(p => p.Equals("exchange")), + It.Is(p => p.Equals("topic")), + false, + It.Is(p => p.Equals(basicProperties.Object)), + It.IsAny>()), Times.Once()); + + _model.Verify(p => p.Dispose(), Times.Once()); + } + } +} diff --git a/src/Messaging/Test/RabbitMq/RabbitMqMessageSubscriberServiceTest.cs b/src/Messaging/Test/RabbitMq/RabbitMqMessageSubscriberServiceTest.cs new file mode 100644 index 0000000..4c9c0d6 --- /dev/null +++ b/src/Messaging/Test/RabbitMq/RabbitMqMessageSubscriberServiceTest.cs @@ -0,0 +1,189 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Text; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.Messaging.Configuration; +using Monai.Deploy.Messaging.Messages; +using Monai.Deploy.Messaging.RabbitMq; +using Moq; +using RabbitMQ.Client; +using Xunit; + +namespace Monai.Deploy.Messaging.Test.RabbitMq +{ + public class RabbitMqMessageSubscriberServiceTest + { + private readonly IOptions _options; + private readonly Mock> _logger; + private readonly Mock _connectionFactory; + private readonly Mock _model; + + public RabbitMqMessageSubscriberServiceTest() + { + _options = Options.Create(new MessageBrokerServiceConfiguration()); + _logger = new Mock>(); + _connectionFactory = new Mock(); + _model = new Mock(); + + _connectionFactory.Setup(p => p.CreateChannel(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(_model.Object); + + } + + [Fact(DisplayName = "Fails to validate when required keys are missing")] + public void FailsToValidateWhenRequiredKeysAreMissing() + { + Assert.Throws(() => new RabbitMqMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object)); + } + + [Fact(DisplayName = "Cleanup connections on Dispose")] + public void CleanupOnDispose() + { + _options.Value.SubscriberSettings.Add(ConfigurationKeys.EndPoint, "endpoint"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Username, "username"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Password, "password"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.VirtualHost, "virtual-host"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Exchange, "exchange"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.ExportRequestQueue, "export-request-queue"); + + var service = new RabbitMqMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object); + service.Dispose(); + + _model.Verify(p => p.Close(), Times.Once()); + _model.Verify(p => p.Dispose(), Times.Once()); + } + + [Fact(DisplayName = "Subscribes to a topic")] + public void SubscribesToATopic() + { + _options.Value.SubscriberSettings.Add(ConfigurationKeys.EndPoint, "endpoint"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Username, "username"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Password, "password"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.VirtualHost, "virtual-host"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Exchange, "exchange"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.ExportRequestQueue, "export-request-queue"); + + var jsonMessage = new JsonMessage("hello world", Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "1"); + var message = jsonMessage.ToMessage(); + var basicProperties = new Mock(); + basicProperties.SetupGet(p => p.MessageId).Returns(jsonMessage.MessageId); + basicProperties.SetupGet(p => p.AppId).Returns(jsonMessage.ApplicationId); + basicProperties.SetupGet(p => p.ContentType).Returns(jsonMessage.ContentType); + basicProperties.SetupGet(p => p.CorrelationId).Returns(jsonMessage.CorrelationId); + basicProperties.SetupGet(p => p.Headers["CreationDateTime"]).Returns(Encoding.UTF8.GetBytes(jsonMessage.CreationDateTime.ToString("o", System.Globalization.CultureInfo.InvariantCulture))); + + _model.Setup(p => p.QueueDeclare( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>())) + .Returns(new QueueDeclareOk("queue-name", 1, 1)); + _model.Setup(p => p.QueueBind( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>())); + _model.Setup(p => p.BasicQos( + It.IsAny(), + It.IsAny(), + It.IsAny())); + _model.Setup(p => p.BasicConsume( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny())) + .Callback, IBasicConsumer>( + (queue, autoAck, tag, noLocal, exclusive, args, consumer) => + { + consumer.HandleBasicDeliver(tag, Convert.ToUInt64(jsonMessage.DeliveryTag, CultureInfo.InvariantCulture), false, "exchange", "topic", basicProperties.Object, new ReadOnlyMemory(message.Body)); + }); + + var service = new RabbitMqMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object); + + service.Subscribe("topic", "queue", (args) => + { + Assert.Equal(message.ApplicationId, args.Message.ApplicationId); + Assert.Equal(message.ContentType, args.Message.ContentType); + Assert.Equal(message.MessageId, args.Message.MessageId); + Assert.Equal(message.CreationDateTime.ToUniversalTime(), args.Message.CreationDateTime.ToUniversalTime()); + Assert.Equal(message.DeliveryTag, args.Message.DeliveryTag); + Assert.Equal("topic", args.Message.MessageDescription); + Assert.Equal(message.MessageId, args.Message.MessageId); + Assert.Equal(message.Body, args.Message.Body); + }); + + service.SubscribeAsync("topic", "queue", async (args) => + { + await System.Threading.Tasks.Task.Run(() => + { + Assert.Equal(message.ApplicationId, args.Message.ApplicationId); + Assert.Equal(message.ContentType, args.Message.ContentType); + Assert.Equal(message.MessageId, args.Message.MessageId); + Assert.Equal(message.CreationDateTime.ToUniversalTime(), args.Message.CreationDateTime.ToUniversalTime()); + Assert.Equal(message.DeliveryTag, args.Message.DeliveryTag); + Assert.Equal("topic", args.Message.MessageDescription); + Assert.Equal(message.MessageId, args.Message.MessageId); + Assert.Equal(message.Body, args.Message.Body); + }).ConfigureAwait(false); + }); + } + + [Fact(DisplayName = "Acknowledge a message")] + public void AcknowledgeAMessage() + { + _options.Value.SubscriberSettings.Add(ConfigurationKeys.EndPoint, "endpoint"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Username, "username"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Password, "password"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.VirtualHost, "virtual-host"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Exchange, "exchange"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.ExportRequestQueue, "export-request-queue"); + + var jsonMessage = new JsonMessage("hello world", Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "1"); + var message = jsonMessage.ToMessage(); + + _model.Setup(p => p.BasicAck( + It.IsAny(), + It.IsAny())); + + var service = new RabbitMqMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object); + + service.Acknowledge(message); + + _model.Verify(p => p.BasicAck(1, false), Times.Once()); + } + + [Fact(DisplayName = "Reject a message")] + public void RejectAMessage() + { + _options.Value.SubscriberSettings.Add(ConfigurationKeys.EndPoint, "endpoint"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Username, "username"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Password, "password"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.VirtualHost, "virtual-host"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.Exchange, "exchange"); + _options.Value.SubscriberSettings.Add(ConfigurationKeys.ExportRequestQueue, "export-request-queue"); + + var jsonMessage = new JsonMessage("hello world", Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "1"); + var message = jsonMessage.ToMessage(); + + _model.Setup(p => p.BasicNack( + It.IsAny(), + It.IsAny(), + It.IsAny())); + + var service = new RabbitMqMessageSubscriberService(_options, _logger.Object, _connectionFactory.Object); + + service.Reject(message); + + _model.Verify(p => p.BasicNack(1, false, true), Times.Once()); + } + } +} diff --git a/src/Messaging/Test/TaskCallbackEventTest.cs b/src/Messaging/Test/TaskCallbackEventTest.cs new file mode 100644 index 0000000..ca670a2 --- /dev/null +++ b/src/Messaging/Test/TaskCallbackEventTest.cs @@ -0,0 +1,39 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Events; +using Xunit; + +namespace Monai.Deploy.Messaging.Test +{ + public class TaskCallbackEventTest + { + [Fact(DisplayName = "Validation throws on error")] + public void ValidationThrowsOnError() + { + var runnerComplete = new TaskCallbackEvent(); + Assert.Throws(() => runnerComplete.Validate()); + + runnerComplete.WorkflowInstanceId = Guid.NewGuid().ToString(); + Assert.Throws(() => runnerComplete.Validate()); + + runnerComplete.TaskId = Guid.NewGuid().ToString(); + Assert.Throws(() => runnerComplete.Validate()); + + runnerComplete.ExecutionId = Guid.NewGuid().ToString(); + Assert.Throws(() => runnerComplete.Validate()); + + runnerComplete.CorrelationId = Guid.NewGuid().ToString(); + Assert.Throws(() => runnerComplete.Validate()); + + runnerComplete.Identity = "1234567890123456789012345678901234567890123456789012345678901234567890"; + Assert.Throws(() => runnerComplete.Validate()); + + runnerComplete.Identity = "123456789012345678901234567890123456789012345678901234567890123"; + var exception = Record.Exception(() => runnerComplete.Validate()); + Assert.Null(exception); + } + } +} diff --git a/src/Messaging/Test/TaskDispatchEventTest.cs b/src/Messaging/Test/TaskDispatchEventTest.cs new file mode 100644 index 0000000..5841c69 --- /dev/null +++ b/src/Messaging/Test/TaskDispatchEventTest.cs @@ -0,0 +1,100 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using System.Collections.Generic; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Events; +using Xunit; + +namespace Monai.Deploy.Messaging.Test +{ + public class TaskDispatchEventTest + { + [Fact(DisplayName = "Validation throws on error")] + public void ValidationThrowsOnError() + { + var taskDispatchEvent = new TaskDispatchEvent(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.WorkflowInstanceId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.ExecutionId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.PayloadId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.TaskId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.CorrelationId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.TaskPluginType = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.Inputs = new List(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + var input = new Storage(); + taskDispatchEvent.Inputs.Add(input); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.Outputs = new List(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + var output = new Storage(); + taskDispatchEvent.Outputs.Add(output); + Assert.Throws(() => taskDispatchEvent.Validate()); + + output.Name = "name"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + output.Endpoint = "endpoint"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + // Skip settings credentials for output, this shall not throw given that is not required + + output.Bucket = "bucket"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + output.RelativeRootPath = "path"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + var intermediate = new Storage(); + taskDispatchEvent.IntermediateStorage = intermediate; + Assert.Throws(() => taskDispatchEvent.Validate()); + + intermediate.Name = "name"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + intermediate.Endpoint = "endpoint"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + intermediate.Bucket = "bucket"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + intermediate.RelativeRootPath = "path"; + Assert.Throws(() => taskDispatchEvent.Validate()); + + input.Name = "name"; + input.Endpoint = "endpoint"; + input.Bucket = "bucket"; + input.RelativeRootPath = "path"; + + var exception = Record.Exception(() => taskDispatchEvent.Validate()); + Assert.Null(exception); + + // Let's set the credentials for input, this should throw validation exception given that it's no longer null + input.Credentials = new Credentials(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + input.Credentials.AccessKey = "key"; + input.Credentials.AccessToken = "token"; + exception = Record.Exception(() => taskDispatchEvent.Validate()); + Assert.Null(exception); + } + } +} diff --git a/src/Messaging/Test/TaskUpdateEventTest.cs b/src/Messaging/Test/TaskUpdateEventTest.cs new file mode 100644 index 0000000..4777d13 --- /dev/null +++ b/src/Messaging/Test/TaskUpdateEventTest.cs @@ -0,0 +1,33 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Events; +using Xunit; + +namespace Monai.Deploy.Messaging.Test +{ + public class TaskUpdateEventTest + { + [Fact(DisplayName = "Validation throws on error")] + public void ValidationThrowsOnError() + { + var taskDispatchEvent = new TaskUpdateEvent(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.WorkflowInstanceId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.ExecutionId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.TaskId = Guid.NewGuid().ToString(); + Assert.Throws(() => taskDispatchEvent.Validate()); + + taskDispatchEvent.CorrelationId = Guid.NewGuid().ToString(); + var exception = Record.Exception(() => taskDispatchEvent.Validate()); + Assert.Null(exception); + } + } +} diff --git a/src/Messaging/Test/WorkflowRequestMessageTest.cs b/src/Messaging/Test/WorkflowRequestMessageTest.cs new file mode 100644 index 0000000..faf911d --- /dev/null +++ b/src/Messaging/Test/WorkflowRequestMessageTest.cs @@ -0,0 +1,40 @@ +// SPDX-FileCopyrightText: © 2022 MONAI Consortium +// SPDX-License-Identifier: Apache License 2.0 + +using System; +using System.Collections.Generic; +using Monai.Deploy.Messaging.Common; +using Monai.Deploy.Messaging.Events; +using Xunit; + +namespace Monai.Deploy.Messaging.Test +{ + public class WorkflowRequestMessageTest + { + [Fact(DisplayName = "Converts JSONMessage to Message")] + public void ConvertsJsonMessageToMessage() + { + var input = new WorkflowRequestEvent() + { + Bucket = Guid.NewGuid().ToString(), + CalledAeTitle = Guid.NewGuid().ToString(), + CallingAeTitle = Guid.NewGuid().ToString(), + CorrelationId = Guid.NewGuid().ToString(), + FileCount = 10, + PayloadId = Guid.NewGuid(), + Timestamp = DateTime.Now, + Workflows = new List { Guid.NewGuid().ToString() } + }; + + var files = new List() + { + new BlockStorageInfo{ Path =Guid.NewGuid().ToString(), Metadata=Guid.NewGuid().ToString() }, + new BlockStorageInfo{ Path =Guid.NewGuid().ToString(), Metadata=Guid.NewGuid().ToString() }, + }; + + input.AddFiles(files); + + Assert.Equal(files, input.Payload); + } + } +}