1+ /*
2+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License").
5+ * You may not use this file except in compliance with the License.
6+ * A copy of the License is located at
7+ *
8+ * http://aws.amazon.com/apache2.0
9+ *
10+ * or in the "license" file accompanying this file. This file is distributed
11+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+ * express or implied. See the License for the specific language governing
13+ * permissions and limitations under the License.
14+ */
15+
16+ package software .amazon .awssdk .services .sqs .internal .batchmanager ;
17+
18+ import java .util .ArrayList ;
19+ import java .util .List ;
20+ import java .util .Optional ;
21+ import java .util .concurrent .CompletableFuture ;
22+ import java .util .concurrent .ScheduledExecutorService ;
23+ import java .util .stream .Collectors ;
24+ import software .amazon .awssdk .annotations .SdkInternalApi ;
25+ import software .amazon .awssdk .awscore .AwsRequestOverrideConfiguration ;
26+ import software .amazon .awssdk .awscore .exception .AwsErrorDetails ;
27+ import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
28+ import software .amazon .awssdk .services .sqs .batchmanager .BatchOverrideConfiguration ;
29+ import software .amazon .awssdk .services .sqs .model .BatchResultErrorEntry ;
30+ import software .amazon .awssdk .services .sqs .model .ChangeMessageVisibilityBatchRequest ;
31+ import software .amazon .awssdk .services .sqs .model .ChangeMessageVisibilityBatchRequestEntry ;
32+ import software .amazon .awssdk .services .sqs .model .ChangeMessageVisibilityBatchResponse ;
33+ import software .amazon .awssdk .services .sqs .model .ChangeMessageVisibilityBatchResultEntry ;
34+ import software .amazon .awssdk .services .sqs .model .ChangeMessageVisibilityRequest ;
35+ import software .amazon .awssdk .services .sqs .model .ChangeMessageVisibilityResponse ;
36+ import software .amazon .awssdk .services .sqs .model .SqsException ;
37+ import software .amazon .awssdk .utils .Either ;
38+
39+ @ SdkInternalApi
40+ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager <ChangeMessageVisibilityRequest ,
41+ ChangeMessageVisibilityResponse ,
42+ ChangeMessageVisibilityBatchResponse > {
43+
44+ private final SqsAsyncClient sqsAsyncClient ;
45+
46+ protected ChangeMessageVisibilityBatchManager (BatchOverrideConfiguration overrideConfiguration ,
47+ ScheduledExecutorService scheduledExecutor ,
48+ SqsAsyncClient sqsAsyncClient ) {
49+ super (overrideConfiguration , scheduledExecutor );
50+ this .sqsAsyncClient = sqsAsyncClient ;
51+ }
52+
53+ private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest (
54+ List <IdentifiableMessage <ChangeMessageVisibilityRequest >> identifiedRequests , String batchKey ) {
55+ List <ChangeMessageVisibilityBatchRequestEntry > entries = identifiedRequests
56+ .stream ()
57+ .map (identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry (identifiedRequest .id (),
58+ identifiedRequest .message ()))
59+ .collect (Collectors .toList ());
60+ // Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
61+ // all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
62+ // request.
63+ Optional <AwsRequestOverrideConfiguration > overrideConfiguration = identifiedRequests .get (0 ).message ()
64+ .overrideConfiguration ();
65+ return overrideConfiguration .map (
66+ config -> ChangeMessageVisibilityBatchRequest .builder ()
67+ .queueUrl (batchKey )
68+ .overrideConfiguration (config )
69+ .entries (entries )
70+ .build ())
71+ .orElseGet (() -> ChangeMessageVisibilityBatchRequest .builder ()
72+ .queueUrl (batchKey )
73+ .entries (entries )
74+ .build ());
75+ }
76+
77+ private static ChangeMessageVisibilityBatchRequestEntry createChangeMessageVisibilityBatchRequestEntry (
78+ String id ,
79+ ChangeMessageVisibilityRequest request ) {
80+ return ChangeMessageVisibilityBatchRequestEntry .builder ().id (id ).receiptHandle (request .receiptHandle ())
81+ .visibilityTimeout (request .visibilityTimeout ()).build ();
82+ }
83+
84+ private static IdentifiableMessage <ChangeMessageVisibilityResponse > createChangeMessageVisibilityResponse (
85+ ChangeMessageVisibilityBatchResultEntry successfulEntry , ChangeMessageVisibilityBatchResponse batchResponse ) {
86+ String key = successfulEntry .id ();
87+ ChangeMessageVisibilityResponse .Builder builder = ChangeMessageVisibilityResponse .builder ();
88+ if (batchResponse .responseMetadata () != null ) {
89+ builder .responseMetadata (batchResponse .responseMetadata ());
90+ }
91+ if (batchResponse .sdkHttpResponse () != null ) {
92+ builder .sdkHttpResponse (batchResponse .sdkHttpResponse ());
93+ }
94+ ChangeMessageVisibilityResponse response = builder .build ();
95+ return new IdentifiableMessage <>(key , response );
96+ }
97+
98+ private static IdentifiableMessage <Throwable > changeMessageVisibilityCreateThrowable (BatchResultErrorEntry failedEntry ) {
99+ String key = failedEntry .id ();
100+ AwsErrorDetails errorDetailsBuilder = AwsErrorDetails .builder ().errorCode (failedEntry .code ())
101+ .errorMessage (failedEntry .message ()).build ();
102+ Throwable response = SqsException .builder ().awsErrorDetails (errorDetailsBuilder ).build ();
103+ return new IdentifiableMessage <>(key , response );
104+ }
105+
106+
107+
108+ @ Override
109+ protected CompletableFuture <ChangeMessageVisibilityBatchResponse > batchAndSend (
110+ List <IdentifiableMessage <ChangeMessageVisibilityRequest >> identifiedRequests , String batchKey ) {
111+ ChangeMessageVisibilityBatchRequest batchRequest = createChangeMessageVisibilityBatchRequest (identifiedRequests ,
112+ batchKey );
113+ return sqsAsyncClient .changeMessageVisibilityBatch (batchRequest );
114+ }
115+
116+ @ Override
117+ protected String getBatchKey (ChangeMessageVisibilityRequest request ) {
118+ return request .overrideConfiguration ().map (overrideConfig -> request .queueUrl () + overrideConfig .hashCode ())
119+ .orElseGet (request ::queueUrl );
120+ }
121+
122+ @ Override
123+ protected List <Either <IdentifiableMessage <ChangeMessageVisibilityResponse >,
124+ IdentifiableMessage <Throwable >>> mapBatchResponse (ChangeMessageVisibilityBatchResponse batchResponse ) {
125+
126+ List <Either <IdentifiableMessage <ChangeMessageVisibilityResponse >, IdentifiableMessage <Throwable >>> mappedResponses =
127+ new ArrayList <>();
128+ batchResponse .successful ().forEach (
129+ batchResponseEntry -> {
130+ IdentifiableMessage <ChangeMessageVisibilityResponse > response = createChangeMessageVisibilityResponse (
131+ batchResponseEntry , batchResponse );
132+ mappedResponses .add (Either .left (response ));
133+ });
134+ batchResponse .failed ().forEach (batchResponseEntry -> {
135+ IdentifiableMessage <Throwable > response = changeMessageVisibilityCreateThrowable (batchResponseEntry );
136+ mappedResponses .add (Either .right (response ));
137+ });
138+ return mappedResponses ;
139+
140+ }
141+ }
0 commit comments