1
- using Liquid . Base . Interfaces ;
1
+ // Copyright (c) Avanade Inc. All rights reserved.
2
+ // Licensed under the MIT License. See LICENSE in the project root for license information.
3
+
4
+ using System ;
5
+ using System . Collections . Generic ;
6
+ using System . Reflection ;
7
+ using System . Threading . Tasks ;
8
+ using Liquid . Activation ;
2
9
using Liquid . Domain ;
3
10
using Liquid . Domain . Base ;
4
- using Liquid . Activation ;
5
11
using Liquid . Runtime . Configuration . Base ;
6
12
using Liquid . Runtime . Telemetry ;
7
13
using Microsoft . Azure . ServiceBus ;
8
- using System ;
9
- using System . Collections . Generic ;
10
- using System . Reflection ;
11
- using System . Text ;
12
- using System . Threading . Tasks ;
13
14
14
15
namespace Liquid . OnAzure
15
16
{
16
17
/// <summary>
17
- /// Implementation of the communication component between queues and topics of the Azure, this class is specific to azure
18
+ /// Defines an object capable of creating instances of <see cref="IQueueClient"/>.
19
+ /// </summary>
20
+ public interface IQueueClientFactory
21
+ {
22
+ /// <summary>
23
+ /// Creates a new instance of <see cref="IQueueClient"/>.
24
+ /// </summary>
25
+ /// <param name="connectionString">The connection string for the client.</param>
26
+ /// <param name="queueName">The name of the queue.</param>
27
+ /// <param name="receiveMode">The receive mode that the client will connect to the queue.</param>
28
+ /// <returns>A new instance of <see cref="IQueueClient"/>.</returns>
29
+ IQueueClient CreateClient ( string connectionString , string queueName , ReceiveMode receiveMode ) ;
30
+ }
31
+
32
+ /// <summary>
33
+ /// Defines an object capable of creating instances of <see cref="ISubscriptionClient"/>.
34
+ /// </summary>
35
+ public interface ISubscriptionClientFactory
36
+ {
37
+ /// <summary>
38
+ /// Creates a new instance of <see cref="ISubscriptionClient"/>.
39
+ /// </summary>
40
+ /// <param name="connectionString">The connection string for the client.</param>
41
+ /// <param name="topicName">The name of the topic to connect to.</param>
42
+ /// <param name="subscriptionName">Identifies the subscription to this topic.</param>
43
+ /// <param name="receiveMode">The receive mode that the client will connect to the queue.</param>
44
+ /// <returns>A new instance of <see cref="ISubscriptionClient"/>.</returns>
45
+ ISubscriptionClient CreateClient ( string connectionString , string topicName , string subscriptionName , ReceiveMode receiveMode ) ;
46
+ }
47
+
48
+ /// <summary>
49
+ /// Configuration source for <see cref="ServiceBus"/>.
50
+ /// </summary>
51
+ // TODO: should remove this class once we move to .NET configuration system
52
+ public interface IServiceBusConfigurationProvider
53
+ {
54
+ /// <summary>
55
+ /// Gets the configuration for a <see cref="ServiceBus"/>.
56
+ /// </summary>
57
+ /// <returns>
58
+ /// The current configuration for a service bus.
59
+ /// </returns>
60
+ ServiceBusConfiguration GetConfiguration ( ) ;
61
+
62
+ /// <summary>
63
+ /// Gets the configuration for a <see cref="ServiceBus"/>.
64
+ /// </summary>
65
+ /// <param name="connectionName">
66
+ /// Identifies which connection should be retrieved from the file.
67
+ /// </param>
68
+ /// <returns>
69
+ /// The current configuration for a service bus.
70
+ /// </returns>
71
+ ServiceBusConfiguration GetConfiguration ( string connectionName ) ;
72
+ }
73
+
74
+ /// <summary>
75
+ /// Implementation of the communication component between queues and topics of the Azure, this class is specific to azure.
18
76
/// </summary>
19
77
public class ServiceBus : LightWorker , IWorkbenchService
20
78
{
79
+ /// <summary>
80
+ /// Factory used to create a <see cref="IQueueClient"/>.
81
+ /// </summary>
82
+ private readonly IQueueClientFactory _queueClientFactory = new DefaultQueueClientFactory ( ) ;
83
+
84
+ /// <summary>
85
+ /// Factory used to create a <see cref="ISubscriptionClient"/>.
86
+ /// </summary>
87
+ private readonly ISubscriptionClientFactory _subscriptionClientFactory = new DefaultSubscriptionClientFactory ( ) ;
88
+
89
+ /// <summary>
90
+ /// Service that retrives a <see cref="ServiceBusConfiguration"/>.
91
+ /// </summary>
92
+ private readonly IServiceBusConfigurationProvider _configurationProvider = new DefaultServiceBusConfigurationProvider ( ) ;
93
+
94
+ /// <summary>
95
+ /// Initializes a new instance of the <see cref="ServiceBus"/> class.
96
+ /// </summary>
97
+ public ServiceBus ( )
98
+ {
99
+ }
100
+
101
+ /// <summary>
102
+ /// Initializes a new instance of the <see cref="ServiceBus"/> class.
103
+ /// </summary>
104
+ /// <param name="queueClientFactory">
105
+ /// Dependency. Used to obtain new instances of a <see cref="IQueueClient"/>.
106
+ /// </param>
107
+ /// <param name="subscriptionClientFactory">
108
+ /// Dependency. Used to obtain new instances of a <see cref="ISubscriptionClient"/>.
109
+ /// </param>
110
+ /// <param name="configurationProvider">
111
+ /// Dependency. Used to retrieve a configuration for this class.
112
+ /// </param>
113
+ public ServiceBus (
114
+ IQueueClientFactory queueClientFactory ,
115
+ ISubscriptionClientFactory subscriptionClientFactory ,
116
+ IServiceBusConfigurationProvider configurationProvider )
117
+ {
118
+ _queueClientFactory = queueClientFactory ?? throw new ArgumentNullException ( nameof ( queueClientFactory ) ) ;
119
+ _subscriptionClientFactory = subscriptionClientFactory ?? throw new ArgumentNullException ( nameof ( subscriptionClientFactory ) ) ;
120
+ _configurationProvider = configurationProvider ?? throw new ArgumentNullException ( nameof ( configurationProvider ) ) ;
121
+ }
122
+
21
123
/// <summary>
22
124
/// Implementation of the start process queue and process topic. It must be called parent before start processes.
23
125
/// </summary>
@@ -36,16 +138,17 @@ public override void Initialize()
36
138
/// <returns>StringConnection of the ServiceBus</returns>
37
139
private string GetConnection < T > ( KeyValuePair < MethodInfo , T > item )
38
140
{
39
- MethodInfo method = item . Key ;
40
- string connectionKey = GetKeyConnection ( method ) ;
41
- ServiceBusConfiguration config = null ;
141
+ var method = item . Key ;
142
+ var connectionKey = GetKeyConnection ( method ) ;
143
+
144
+ ServiceBusConfiguration config ;
42
145
if ( string . IsNullOrEmpty ( connectionKey ) ) // Load specific settings if provided
43
146
{
44
- config = LightConfigurator . Config < ServiceBusConfiguration > ( $ "{ nameof ( ServiceBus ) } ") ;
147
+ config = _configurationProvider . GetConfiguration ( ) ; // LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}");
45
148
}
46
149
else
47
150
{
48
- config = LightConfigurator . Config < ServiceBusConfiguration > ( $ "{ nameof ( ServiceBus ) } _{ connectionKey } ") ;
151
+ config = _configurationProvider . GetConfiguration ( connectionKey ) ; // LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}_{connectionKey}");
49
152
}
50
153
51
154
return config . ConnectionString ;
@@ -80,7 +183,7 @@ public void ProcessQueue()
80
183
int takeQuantity = queue . Value . TakeQuantity ;
81
184
82
185
//Register Trace on the telemetry
83
- QueueClient queueReceiver = new QueueClient ( GetConnection ( queue ) , queueName , receiveMode ) ;
186
+ var queueReceiver = _queueClientFactory . CreateClient ( GetConnection ( queue ) , queueName , receiveMode ) ;
84
187
85
188
//Register the method to process receive message
86
189
//The RegisterMessageHandler is validate for all register exist on the queue, without need loop for items
@@ -114,7 +217,7 @@ await queueReceiver.DeadLetterAsync(message.SystemProperties.LockToken,
114
217
{
115
218
Exception moreInfo = new Exception ( $ "Error setting up queue consumption from service bus. See inner exception for details. Message={ exception . Message } ", exception ) ;
116
219
//Use the class instead of interface because tracking exceptions directly is not supposed to be done outside AMAW (i.e. by the business code)
117
- ( ( LightTelemetry ) Workbench . Instance . Telemetry ) . TrackException ( moreInfo ) ;
220
+ ( Workbench . Instance . Telemetry as LightTelemetry ) ? . TrackException ( moreInfo ) ;
118
221
}
119
222
}
120
223
@@ -132,14 +235,18 @@ private void ProcessSubscription()
132
235
string topicName = topic . Value . TopicName ;
133
236
string subscriptName = topic . Value . Subscription ;
134
237
ReceiveMode receiveMode = ReceiveMode . PeekLock ;
238
+
135
239
if ( topic . Value . DeleteAfterRead )
136
240
{
137
241
receiveMode = ReceiveMode . ReceiveAndDelete ;
138
242
}
243
+
139
244
int takeQuantity = topic . Value . TakeQuantity ;
140
245
141
246
//Register Trace on the telemetry
142
- SubscriptionClient subscriptionClient = new SubscriptionClient ( GetConnection ( topic ) , topicName , subscriptName , receiveMode , null ) ;
247
+ var subscriptionClient = _subscriptionClientFactory . CreateClient (
248
+ GetConnection ( topic ) , topicName , subscriptName , receiveMode
249
+ ) ;
143
250
144
251
//Register the method to process receive message
145
252
//The RegisterMessageHandler is validate for all register exist on the queue, without need loop for items
@@ -200,13 +307,57 @@ await subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken,
200
307
{
201
308
Exception moreInfo = new Exception ( $ "Error setting up subscription consumption from service bus. See inner exception for details. Message={ exception . Message } ", exception ) ;
202
309
//Use the class instead of interface because tracking exceptions directly is not supposed to be done outside AMAW (i.e. by the business code)
203
- ( ( LightTelemetry ) Workbench . Instance . Telemetry ) . TrackException ( moreInfo ) ;
310
+ ( Workbench . Instance . Telemetry as LightTelemetry ) ? . TrackException ( moreInfo ) ;
204
311
}
205
312
}
206
313
207
314
protected override Task ProcessAsync ( )
208
315
{
209
316
throw new NotImplementedException ( ) ;
210
317
}
318
+
319
+ /// <summary>
320
+ /// Default implementation for <see cref="IQueueClientFactory"/>,
321
+ /// creates instances of <see cref="IQueueClient"/>.
322
+ /// </summary>
323
+ private class DefaultQueueClientFactory : IQueueClientFactory
324
+ {
325
+ /// <inheritdoc/>
326
+ public IQueueClient CreateClient ( string connectionString , string queueName , ReceiveMode receiveMode )
327
+ {
328
+ return new QueueClient ( connectionString , queueName , receiveMode ) ;
329
+ }
330
+ }
331
+
332
+ /// <summary>
333
+ /// Default implementation for <see cref="ISubscriptionClientFactory"/>,
334
+ /// creates instances of <see cref="SubscriptionClient"/>.
335
+ /// </summary>
336
+ private class DefaultSubscriptionClientFactory : ISubscriptionClientFactory
337
+ {
338
+ /// <inheritdoc/>
339
+ public ISubscriptionClient CreateClient ( string connectionString , string topicName , string subscriptionName , ReceiveMode mode )
340
+ {
341
+ return new SubscriptionClient ( connectionString , topicName , subscriptionName , mode , null ) ;
342
+ }
343
+ }
344
+
345
+ /// <summary>
346
+ /// Retrieves configuration using <see cref="LightConfigurator"/>.
347
+ /// </summary>
348
+ private class DefaultServiceBusConfigurationProvider : IServiceBusConfigurationProvider
349
+ {
350
+ /// <inheritdoc/>
351
+ public ServiceBusConfiguration GetConfiguration ( )
352
+ {
353
+ return LightConfigurator . Config < ServiceBusConfiguration > ( $ "{ nameof ( ServiceBus ) } ") ;
354
+ }
355
+
356
+ /// <inheritdoc/>
357
+ public ServiceBusConfiguration GetConfiguration ( string connectionKey )
358
+ {
359
+ return LightConfigurator . Config < ServiceBusConfiguration > ( $ "{ nameof ( ServiceBus ) } _{ connectionKey } ") ;
360
+ }
361
+ }
211
362
}
212
363
}
0 commit comments