Skip to content

Commit b66a4e9

Browse files
committed
HADOOP-19569. put operations from store to multipart, now StoreWriter
All upload operations are in MultipartIO service, which has been renamed and move to package org.apache.hadoop.fs.s3a.impl.write to match. For completeness deletion should also go into this class or an adjacent one on deletion.
1 parent f03e1b9 commit b66a4e9

16 files changed

+322
-301
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,7 +1419,7 @@ private void initMultipartUploads(Configuration conf) throws IOException {
14191419
Duration.ofSeconds(DEFAULT_PURGE_EXISTING_MULTIPART_AGE),
14201420
TimeUnit.SECONDS,
14211421
Duration.ZERO);
1422-
getStore().getMultipartIO().abortOutstandingMultipartUploads(
1422+
getStore().getStoreWriter().abortOutstandingMultipartUploads(
14231423
purgeDuration.getSeconds(),
14241424
"",
14251425
maxKeys,
@@ -2626,12 +2626,12 @@ private long abortMultipartUploadsUnderPrefix(StoreContext storeContext,
26262626
span.activate();
26272627
// this deactivates the audit span somehow
26282628
final RemoteIterator<MultipartUpload> uploads =
2629-
getStore().getMultipartIO().listMultipartUploads(storeContext, prefix, maxKeys);
2629+
getStore().getStoreWriter().listMultipartUploads(storeContext, prefix, maxKeys);
26302630
// so reactivate it.
26312631
span.activate();
26322632
return foreach(uploads, upload ->
26332633
invoker.retry("Aborting multipart commit", upload.key(), true, () ->
2634-
getStore().getMultipartIO().abortMultipartUpload(upload)));
2634+
getStore().getStoreWriter().abortMultipartUpload(upload)));
26352635
}
26362636

26372637
/**
@@ -3206,7 +3206,8 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
32063206
@Retries.OnceRaw
32073207
public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
32083208
ProgressableProgressListener listener) throws IOException {
3209-
return getStore().putObject(putObjectRequest, file, listener);
3209+
return getStore().getStoreWriter()
3210+
.putObject(putObjectRequest, file, listener);
32103211
}
32113212

32123213
/**
@@ -4074,7 +4075,8 @@ PutObjectResponse executePut(
40744075
ProgressableProgressListener listener =
40754076
new ProgressableProgressListener(store, key, progress);
40764077
UploadInfo info = putObject(putObjectRequest, file, listener);
4077-
PutObjectResponse result = getStore().waitForUploadCompletion(key, info).response();
4078+
PutObjectResponse result = getStore().getStoreWriter()
4079+
.waitForUploadCompletion(key, info).response();
40784080
listener.uploadCompleted(info.getFileUpload());
40794081
return result;
40804082
}
@@ -4496,7 +4498,7 @@ private void createEmptyObject(final String objectName, PutObjectOptions putOpti
44964498
new byte[0], 0, 0, null);
44974499

44984500
invoker.retry("PUT 0-byte object ", objectName, true,
4499-
() -> getStore().putObjectDirect(
4501+
() -> getStore().getStoreWriter().putObjectDirect(
45004502
getRequestFactory().newDirectoryMarkerRequest(objectName).build(),
45014503
putOptions,
45024504
uploadData,
@@ -5054,7 +5056,7 @@ public RemoteIterator<MultipartUpload> listUploads(@Nullable String prefix)
50545056
// span is picked up retained in the listing.
50555057
try (AuditSpan span = createSpan(MULTIPART_UPLOAD_LIST.getSymbol(),
50565058
prefix, null)) {
5057-
return getStore().getMultipartIO()
5059+
return getStore().getStoreWriter()
50585060
.listMultipartUploads(createStoreContext(), prefix, maxKeys);
50595061
}
50605062
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java

Lines changed: 6 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.time.Duration;
2525
import java.util.Map;
2626
import java.util.Optional;
27-
import java.util.concurrent.CancellationException;
2827

2928
import software.amazon.awssdk.core.ResponseInputStream;
3029
import software.amazon.awssdk.core.exception.SdkException;
@@ -34,23 +33,18 @@
3433
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
3534
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
3635
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
37-
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
38-
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
39-
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
4036

4137
import org.apache.hadoop.classification.InterfaceAudience;
4238
import org.apache.hadoop.classification.InterfaceStability;
43-
import org.apache.hadoop.classification.VisibleForTesting;
4439
import org.apache.hadoop.conf.Configuration;
4540
import org.apache.hadoop.fs.LocalDirAllocator;
4641
import org.apache.hadoop.fs.PathCapabilities;
4742
import org.apache.hadoop.fs.s3a.api.RequestFactory;
4843
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
4944
import org.apache.hadoop.fs.s3a.impl.ClientManager;
5045
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
51-
import org.apache.hadoop.fs.s3a.impl.MultipartIO;
52-
import org.apache.hadoop.fs.s3a.impl.MultipartIOService;
53-
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
46+
import org.apache.hadoop.fs.s3a.impl.StoreWriterService;
47+
import org.apache.hadoop.fs.s3a.impl.write.StoreWriter;
5448
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
5549
import org.apache.hadoop.fs.s3a.impl.StoreContext;
5650
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
@@ -118,11 +112,11 @@ public interface S3AStore extends
118112
ClientManager clientManager();
119113

120114
/**
121-
* Get the Multipart IO operations.
122-
* @return an instance of multipart IO.
115+
* Get the store writer operations.
116+
* @return an instance of StoreWriterService.
123117
*/
124-
default MultipartIOService getMultipartIO() {
125-
return lookupService(MultipartIO.MULTIPART_IO, MultipartIOService.class);
118+
default StoreWriterService getStoreWriter() {
119+
return lookupService(StoreWriter.STORE_WRITER, StoreWriterService.class);
126120
}
127121

128122
/**
@@ -321,71 +315,6 @@ void deleteObjectAtPath(
321315
boolean isFile)
322316
throws SdkException, UncheckedIOException;
323317

324-
/**
325-
* Start a transfer-manager managed async PUT of an object,
326-
* incrementing the put requests and put bytes
327-
* counters.
328-
* <p>
329-
* It does not update the other counters,
330-
* as existing code does that as progress callbacks come in.
331-
* Byte length is calculated from the file length, or, if there is no
332-
* file, from the content length of the header.
333-
* <p>
334-
* Because the operation is async, any stream supplied in the request
335-
* must reference data (files, buffers) which stay valid until the upload
336-
* completes.
337-
* Retry policy: N/A: the transfer manager is performing the upload.
338-
* Auditing: must be inside an audit span.
339-
* @param putObjectRequest the request
340-
* @param file the file to be uploaded
341-
* @param listener the progress listener for the request
342-
* @return the upload initiated
343-
* @throws IOException if transfer manager creation failed.
344-
*/
345-
@Retries.OnceRaw
346-
UploadInfo putObject(
347-
PutObjectRequest putObjectRequest,
348-
File file,
349-
ProgressableProgressListener listener) throws IOException;
350-
351-
/**
352-
* PUT an object directly (i.e. not via the transfer manager).
353-
* Byte length is calculated from the file length, or, if there is no
354-
* file, from the content length of the header.
355-
*
356-
* Retry Policy: none.
357-
* Auditing: must be inside an audit span.
358-
* <i>Important: this call will close any input stream in the request.</i>
359-
* @param putObjectRequest the request
360-
* @param putOptions put object options
361-
* @param uploadData data to be uploaded
362-
* @param durationTrackerFactory factory for duration tracking
363-
* @return the upload initiated
364-
* @throws SdkException on problems
365-
*/
366-
@VisibleForTesting
367-
@Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
368-
PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
369-
PutObjectOptions putOptions,
370-
S3ADataBlocks.BlockUploadData uploadData,
371-
DurationTrackerFactory durationTrackerFactory)
372-
throws SdkException;
373-
374-
/**
375-
* Wait for an upload to complete.
376-
* If the upload (or its result collection) failed, this is where
377-
* the failure is raised as an AWS exception.
378-
* Calls {@link S3AStore#incrementPutCompletedStatistics(boolean, long)}
379-
* to update the statistics.
380-
* @param key destination key
381-
* @param uploadInfo upload to wait for
382-
* @return the upload result
383-
* @throws IOException IO failure
384-
* @throws CancellationException if the wait() was cancelled
385-
*/
386-
@Retries.OnceTranslated
387-
CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo)
388-
throws IOException;
389318

390319
/**
391320
* Get the directory allocator.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/IORateLimiting.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
/**
2424
* Interface for specific rate limiting of read and write operations.
25-
* {@see org.apache.hadoop.util.RateLimiting}.
2625
*/
2726
public interface IORateLimiting {
2827

0 commit comments

Comments
 (0)