2323import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_SIGNED_PAYLOAD_TRAILER ;
2424import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_UNSIGNED_PAYLOAD_TRAILER ;
2525import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_CONTENT_SHA256 ;
26- import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_DECODED_CONTENT_LENGTH ;
2726import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_TRAILER ;
2827import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils .moveContentLength ;
2928
3231import java .util .ArrayList ;
3332import java .util .Collections ;
3433import java .util .List ;
35- import java .util .Optional ;
36- import java .util .concurrent .CompletableFuture ;
3734import org .reactivestreams .Publisher ;
3835import software .amazon .awssdk .annotations .SdkInternalApi ;
3936import software .amazon .awssdk .checksums .SdkChecksum ;
4037import software .amazon .awssdk .checksums .spi .ChecksumAlgorithm ;
4138import software .amazon .awssdk .http .ContentStreamProvider ;
4239import software .amazon .awssdk .http .Header ;
4340import software .amazon .awssdk .http .SdkHttpRequest ;
44- import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .AsyncChunkEncodedPayload ;
4541import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
4642import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
47- import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPayload ;
48- import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
4943import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4ChunkExtensionProvider ;
5044import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SigV4TrailerProvider ;
51- import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SyncChunkEncodedPayload ;
5245import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
46+ import software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
5347import software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
54- import software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils ;
5548import software .amazon .awssdk .http .auth .spi .signer .PayloadChecksumStore ;
5649import software .amazon .awssdk .utils .BinaryUtils ;
5750import software .amazon .awssdk .utils .Logger ;
@@ -86,140 +79,81 @@ public static Builder builder() {
8679
8780 @ Override
8881 public ContentStreamProvider sign (ContentStreamProvider payload , V4RequestSigningResult requestSigningResult ) {
82+ SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
83+
84+ String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
85+ () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
86+ );
87+
8988 ChunkedEncodedInputStream .Builder chunkedEncodedInputStreamBuilder = ChunkedEncodedInputStream
9089 .builder ()
9190 .inputStream (payload .newStream ())
9291 .chunkSize (chunkSize )
9392 .header (chunk -> Integer .toHexString (chunk .remaining ()).getBytes (StandardCharsets .UTF_8 ));
9493
95- SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload (chunkedEncodedInputStreamBuilder );
96- signCommon (chunkedPayload , requestSigningResult );
97-
98- return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
99- }
100-
101- @ Override
102- public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
103- ChunkedEncodedPublisher .Builder chunkedStreamBuilder = ChunkedEncodedPublisher .builder ()
104- .publisher (payload )
105- .chunkSize (chunkSize )
106- .addEmptyTrailingChunk (true );
107-
108- AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload (chunkedStreamBuilder );
109- signCommon (chunkedPayload , requestSigningResult );
110-
111- return chunkedStreamBuilder .build ();
112- }
113-
114- private void signCommon (ChunkedEncodedPayload payload , V4RequestSigningResult requestSigningResult ) {
115- preExistingTrailers .forEach (t -> payload .addTrailer (() -> t ));
116-
117- SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
118-
119- payload .decodedContentLength (request .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
120- .map (Long ::parseLong )
121- .orElseThrow (() -> {
122- String msg = String .format ("Expected header '%s' to be present" ,
123- X_AMZ_DECODED_CONTENT_LENGTH );
124- return new RuntimeException (msg );
125- }));
126-
127- String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
128- () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
129- );
94+ preExistingTrailers .forEach (trailer -> chunkedEncodedInputStreamBuilder .addTrailer (() -> trailer ));
13095
13196 switch (checksum ) {
13297 case STREAMING_SIGNED_PAYLOAD : {
13398 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
13499 requestSigningResult .getSignature ());
135- payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
100+ chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
136101 break ;
137102 }
138103 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
139- setupChecksumTrailerIfNeeded (payload );
104+ setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
140105 break ;
141106 case STREAMING_SIGNED_PAYLOAD_TRAILER : {
142- setupChecksumTrailerIfNeeded (payload );
143107 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSigningKey (),
144108 requestSigningResult .getSignature ());
145- payload .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
146- payload .addTrailer (
147- new SigV4TrailerProvider (payload .trailers (), rollingSigner , credentialScope )
109+ chunkedEncodedInputStreamBuilder .addExtension (new SigV4ChunkExtensionProvider (rollingSigner , credentialScope ));
110+ setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
111+ chunkedEncodedInputStreamBuilder .addTrailer (
112+ new SigV4TrailerProvider (chunkedEncodedInputStreamBuilder .trailers (), rollingSigner , credentialScope )
148113 );
149114 break ;
150115 }
151116 default :
152117 throw new UnsupportedOperationException ();
153118 }
154- }
155119
156- @ Override
157- public void beforeSigning (SdkHttpRequest .Builder request , ContentStreamProvider payload ) {
158- long encodedContentLength = 0 ;
159- long contentLength = SignerUtils .computeAndMoveContentLength (request , payload );
160- setupPreExistingTrailers (request );
161-
162- // pre-existing trailers
163- encodedContentLength = calculateEncodedContentLength (request , contentLength );
164-
165- if (checksumAlgorithm != null ) {
166- String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
167- request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
168- }
169- request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
170- request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
120+ return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
171121 }
172122
173123 @ Override
174- public CompletableFuture <Pair <SdkHttpRequest .Builder , Optional <Publisher <ByteBuffer >>>> beforeSigningAsync (
175- SdkHttpRequest .Builder request , Publisher <ByteBuffer > payload ) {
176- return moveContentLength (request , payload )
177- .thenApply (p -> {
178- SdkHttpRequest .Builder requestBuilder = p .left ();
179- setupPreExistingTrailers (requestBuilder );
180-
181- long decodedContentLength = requestBuilder .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
182- .map (Long ::parseLong )
183- // should not happen, this header is added by moveContentLength
184- .orElseThrow (() -> new RuntimeException (X_AMZ_DECODED_CONTENT_LENGTH
185- + " header not present" ));
186-
187- long encodedContentLength = calculateEncodedContentLength (request , decodedContentLength );
188-
189- if (checksumAlgorithm != null ) {
190- String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
191- request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
192- }
193- request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
194- request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
195- return Pair .of (requestBuilder , p .right ());
196- });
124+ public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4RequestSigningResult requestSigningResult ) {
125+ // TODO(sra-identity-and-auth): implement this first and remove addFlexibleChecksumInTrailer logic in HttpChecksumStage
126+ throw new UnsupportedOperationException ();
197127 }
198128
199- private long calculateEncodedContentLength (SdkHttpRequest .Builder requestBuilder , long decodedContentLength ) {
129+ @ Override
130+ public void beforeSigning (SdkHttpRequest .Builder request , ContentStreamProvider payload ) {
200131 long encodedContentLength = 0 ;
132+ long contentLength = moveContentLength (request , payload );
133+ setupPreExistingTrailers (request );
201134
135+ // pre-existing trailers
202136 encodedContentLength += calculateExistingTrailersLength ();
203137
204- String checksum = requestBuilder .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
138+ String checksum = request .firstMatchingHeader (X_AMZ_CONTENT_SHA256 ).orElseThrow (
205139 () -> new IllegalArgumentException (X_AMZ_CONTENT_SHA256 + " must be set!" )
206140 );
207141
208142 switch (checksum ) {
209143 case STREAMING_SIGNED_PAYLOAD : {
210144 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
211- encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
145+ encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
212146 break ;
213147 }
214148 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
215149 if (checksumAlgorithm != null ) {
216150 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
217151 }
218- encodedContentLength += calculateChunksLength (decodedContentLength , 0 );
152+ encodedContentLength += calculateChunksLength (contentLength , 0 );
219153 break ;
220154 case STREAMING_SIGNED_PAYLOAD_TRAILER : {
221155 long extensionsLength = 81 ; // ;chunk-signature:<sigv4 hex signature, 64 bytes>
222- encodedContentLength += calculateChunksLength (decodedContentLength , extensionsLength );
156+ encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
223157 if (checksumAlgorithm != null ) {
224158 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
225159 }
@@ -233,7 +167,12 @@ private long calculateEncodedContentLength(SdkHttpRequest.Builder requestBuilder
233167 // terminating \r\n
234168 encodedContentLength += 2 ;
235169
236- return encodedContentLength ;
170+ if (checksumAlgorithm != null ) {
171+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
172+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
173+ }
174+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
175+ request .appendHeader (CONTENT_ENCODING , AWS_CHUNKED );
237176 }
238177
239178 /**
@@ -317,7 +256,12 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) {
317256 return lengthInBytes + 2 ;
318257 }
319258
320- private void setupChecksumTrailerIfNeeded (ChunkedEncodedPayload payload ) {
259+ /**
260+ * Add the checksum as a trailer to the chunk-encoded stream.
261+ * <p>
262+ * If the checksum-algorithm is not present, then nothing is done.
263+ */
264+ private void setupChecksumTrailerIfNeeded (ChunkedEncodedInputStream .Builder builder ) {
321265 if (checksumAlgorithm == null ) {
322266 return ;
323267 }
@@ -329,17 +273,20 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedPayload payload) {
329273 if (cachedChecksum != null ) {
330274 LOG .debug (() -> String .format ("Cached payload checksum available for algorithm %s: %s. Using cached value" ,
331275 checksumAlgorithm .algorithmId (), checksumHeaderName ));
332- payload .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
276+ builder .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
333277 return ;
334278 }
335279
336280 SdkChecksum sdkChecksum = fromChecksumAlgorithm (checksumAlgorithm );
281+ ChecksumInputStream checksumInputStream = new ChecksumInputStream (
282+ builder .inputStream (),
283+ Collections .singleton (sdkChecksum )
284+ );
337285
338286 TrailerProvider checksumTrailer =
339287 new ChecksumTrailerProvider (sdkChecksum , checksumHeaderName , checksumAlgorithm , payloadChecksumStore );
340288
341- payload .checksumPayload (sdkChecksum );
342- payload .addTrailer (checksumTrailer );
289+ builder .inputStream (checksumInputStream ).addTrailer (checksumTrailer );
343290 }
344291
345292 private String getCachedChecksum () {
0 commit comments