1515
1616package software .amazon .awssdk .services .sqs .internal .batchmanager ;
1717
18+ import java .time .Duration ;
1819import java .util .concurrent .CompletableFuture ;
1920import java .util .concurrent .ScheduledExecutorService ;
2021import software .amazon .awssdk .annotations .SdkInternalApi ;
22+ import software .amazon .awssdk .annotations .SdkTestInternalApi ;
2123import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
2224import software .amazon .awssdk .services .sqs .batchmanager .BatchOverrideConfiguration ;
2325import software .amazon .awssdk .services .sqs .batchmanager .SqsAsyncBatchManager ;
3941public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
4042 // TODO : update the validation here while implementing this class in next PR
4143 private final SqsAsyncClient client ;
42- private final ScheduledExecutorService scheduledExecutor ;
43- private final BatchOverrideConfiguration overrideConfiguration ;
4444
4545 private final BatchManager <SendMessageRequest , SendMessageResponse , SendMessageBatchResponse > sendMessageBatchManager ;
4646
@@ -53,34 +53,69 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
5353
5454 private DefaultSqsAsyncBatchManager (DefaultBuilder builder ) {
5555 this .client = Validate .notNull (builder .client , "client cannot be null" );
56- this .scheduledExecutor = Validate .notNull (builder .scheduledExecutor , "scheduledExecutor cannot be null" );
57- // TODO : create overrideConfiguration with Default values if null
58- this .overrideConfiguration = builder .overrideConfiguration ;
5956
60- sendMessageBatchManager = null ;
61- deleteMessageBatchManager = null ;
62- changeMessageVisibilityBatchManager = null ;
57+ ScheduledExecutorService scheduledExecutor = builder .scheduledExecutor ;
58+
59+ this .sendMessageBatchManager = BatchManager
60+ .requestBatchManagerBuilder (SendMessageRequest .class , SendMessageResponse .class , SendMessageBatchResponse .class )
61+ .batchFunction (SqsBatchFunctions .sendMessageBatchAsyncFunction (client ))
62+ .responseMapper (SqsBatchFunctions .sendMessageResponseMapper ())
63+ .batchKeyMapper (SqsBatchFunctions .sendMessageBatchKeyMapper ())
64+ .overrideConfiguration (sendMessageConfig (builder .overrideConfiguration )).scheduledExecutor (scheduledExecutor )
65+ .build ();
66+ this .deleteMessageBatchManager = BatchManager
67+ .requestBatchManagerBuilder (DeleteMessageRequest .class , DeleteMessageResponse .class , DeleteMessageBatchResponse .class )
68+ .batchFunction (SqsBatchFunctions .deleteMessageBatchAsyncFunction (client ))
69+ .responseMapper (SqsBatchFunctions .deleteMessageResponseMapper ())
70+ .batchKeyMapper (SqsBatchFunctions .deleteMessageBatchKeyMapper ())
71+ .overrideConfiguration (deleteMessageConfig (builder .overrideConfiguration )).scheduledExecutor (scheduledExecutor )
72+ .build ();
73+ this .changeMessageVisibilityBatchManager = BatchManager
74+ .requestBatchManagerBuilder (ChangeMessageVisibilityRequest .class , ChangeMessageVisibilityResponse .class ,
75+ ChangeMessageVisibilityBatchResponse .class )
76+ .batchFunction (SqsBatchFunctions .changeMessageVisibilityBatchAsyncFunction (client ))
77+ .responseMapper (SqsBatchFunctions .changeMessageVisibilityResponseMapper ())
78+ .batchKeyMapper (SqsBatchFunctions .changeMessageVisibilityBatchKeyMapper ())
79+ .overrideConfiguration (changeMessageVisibilityConfig (builder .overrideConfiguration ))
80+ .scheduledExecutor (scheduledExecutor ).build ();
81+
82+ //TODO : this will be updated while implementing the Receive Message Batch Manager
6383 receiveMessageBatchManager = null ;
6484 }
6585
86+
87+ @ SdkTestInternalApi
88+ public DefaultSqsAsyncBatchManager (
89+ SqsAsyncClient client ,
90+ BatchManager <SendMessageRequest , SendMessageResponse , SendMessageBatchResponse > sendMessageBatchManager ,
91+ BatchManager <DeleteMessageRequest , DeleteMessageResponse , DeleteMessageBatchResponse > deleteMessageBatchManager ,
92+ BatchManager <ChangeMessageVisibilityRequest , ChangeMessageVisibilityResponse ,
93+ ChangeMessageVisibilityBatchResponse > changeMessageVisibilityBatchManager ) {
94+ this .sendMessageBatchManager = sendMessageBatchManager ;
95+ this .deleteMessageBatchManager = deleteMessageBatchManager ;
96+ this .changeMessageVisibilityBatchManager = changeMessageVisibilityBatchManager ;
97+ receiveMessageBatchManager = null ;
98+ this .client = client ;
99+ }
100+
66101 @ Override
67102 public CompletableFuture <SendMessageResponse > sendMessage (SendMessageRequest request ) {
68- return SqsAsyncBatchManager . super . sendMessage (request );
103+ return sendMessageBatchManager . batchRequest (request );
69104 }
70105
71106 @ Override
72107 public CompletableFuture <DeleteMessageResponse > deleteMessage (DeleteMessageRequest request ) {
73- return SqsAsyncBatchManager . super . deleteMessage (request );
108+ return deleteMessageBatchManager . batchRequest (request );
74109 }
75110
76111 @ Override
77112 public CompletableFuture <ChangeMessageVisibilityResponse > changeMessageVisibility (ChangeMessageVisibilityRequest request ) {
78- return SqsAsyncBatchManager . super . changeMessageVisibility (request );
113+ return changeMessageVisibilityBatchManager . batchRequest (request );
79114 }
80115
81116 @ Override
82117 public CompletableFuture <ReceiveMessageResponse > receiveMessage (ReceiveMessageRequest request ) {
83- return SqsAsyncBatchManager . super . receiveMessage (request );
118+ return receiveMessageBatchManager . batchRequest (request );
84119 }
85120
86121 public static SqsAsyncBatchManager .Builder builder () {
@@ -89,6 +124,33 @@ public static SqsAsyncBatchManager.Builder builder() {
89124
90125 @ Override
91126 public void close () {
127+ sendMessageBatchManager .close ();
128+ deleteMessageBatchManager .close ();
129+ changeMessageVisibilityBatchManager .close ();
130+ }
131+
132+ private BatchOverrideConfiguration createConfig (BatchOverrideConfiguration overrideConfiguration ) {
133+ BatchOverrideConfiguration .Builder config = BatchOverrideConfiguration .builder ();
134+ if (overrideConfiguration == null ) {
135+ config .maxBatchItems (10 );
136+ config .maxBatchOpenInMs (Duration .ofMillis (200 ));
137+ } else {
138+ config .maxBatchItems (overrideConfiguration .maxBatchItems ().orElse (10 ));
139+ config .maxBatchOpenInMs (overrideConfiguration .maxBatchOpenInMs ().orElse (Duration .ofMillis (200 )));
140+ }
141+ return config .build ();
142+ }
143+
144+ private BatchOverrideConfiguration sendMessageConfig (BatchOverrideConfiguration overrideConfiguration ) {
145+ return createConfig (overrideConfiguration );
146+ }
147+
148+ private BatchOverrideConfiguration deleteMessageConfig (BatchOverrideConfiguration overrideConfiguration ) {
149+ return createConfig (overrideConfiguration );
150+ }
151+
152+ private BatchOverrideConfiguration changeMessageVisibilityConfig (BatchOverrideConfiguration overrideConfiguration ) {
153+ return createConfig (overrideConfiguration );
92154 }
93155
94156 public static final class DefaultBuilder implements SqsAsyncBatchManager .Builder {
0 commit comments