diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java index 5c90e4bd2d601..01679a2564e48 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java @@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance( slower than enqueueing. */ final BlockingQueue workQueue = new LinkedBlockingQueue<>(waitingTasks + activeTasks); + final InnerExecutorRejection rejection = new InnerExecutorRejection(); ThreadPoolExecutor eventProcessingExecutor = new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit, workQueue, newDaemonThreadFactory(prefixName), - new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, - ThreadPoolExecutor executor) { - // This is not expected to happen. - LOG.error("Could not submit task to executor {}", - executor.toString()); - } - }); + rejection); eventProcessingExecutor.allowCoreThreadTimeOut(true); - return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks, - eventProcessingExecutor); + final BlockingThreadPoolExecutorService service = + new BlockingThreadPoolExecutorService(waitingTasks + activeTasks, + eventProcessingExecutor); + rejection.setDelegate((r, executor) -> { + service.shutdown(); + }); + + return service; } /** @@ -164,4 +163,28 @@ public String toString() { .append('}'); return sb.toString(); } + + private static class InnerExecutorRejection implements RejectedExecutionHandler { + + private RejectedExecutionHandler delegate; + + private RejectedExecutionHandler getDelegate() { + return delegate; + } + + private void setDelegate(final RejectedExecutionHandler delegate) { + this.delegate = delegate; + } + + @Override + public void rejectedExecution(Runnable r, + ThreadPoolExecutor executor) { + // This is not expected to happen. + LOG.error("Could not submit task to executor {}", + executor.toString()); + if (getDelegate() != null) { + delegate.rejectedExecution(r, executor); + } + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java index c4c11e57b3720..29208821b6d6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -127,6 +128,7 @@ public T invokeAny(Collection> tasks, long timeout, @Override public Future submit(Callable task) { + rejectWhenShutdown(); try (DurationTracker ignored = trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); @@ -139,6 +141,7 @@ public Future submit(Callable task) { @Override public Future submit(Runnable task, T result) { + rejectWhenShutdown(); try (DurationTracker ignored = trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); @@ -151,6 +154,7 @@ public Future submit(Runnable task, T result) { @Override public Future submit(Runnable task) { + rejectWhenShutdown(); try (DurationTracker ignored = trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); @@ -163,6 +167,7 @@ public Future submit(Runnable task) { @Override public void execute(Runnable command) { + rejectWhenShutdown(); try (DurationTracker ignored = trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); @@ -208,6 +213,16 @@ public String toString() { return sb.toString(); } + /** + * Raise an exception if invoked when the executor is shut down. + * @throws RejectedExecutionException if the executor is shut down. + */ + private void rejectWhenShutdown() throws RejectedExecutionException{ + if (isShutdown()) { + throw new RejectedExecutionException("ExecutorService is shutdown"); + } + } + /** * Releases a permit after the task is executed. */ @@ -222,6 +237,7 @@ class RunnableWithPermitRelease implements Runnable { @Override public void run() { try { + rejectWhenShutdown(); delegatee.run(); } finally { queueingPermits.release(); @@ -244,6 +260,7 @@ class CallableWithPermitRelease implements Callable { @Override public T call() throws Exception { try { + rejectWhenShutdown(); return delegatee.call(); } finally { queueingPermits.release(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestBlockingThreadPoolExecutorService.java similarity index 75% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestBlockingThreadPoolExecutorService.java index baaa6eb37a4fe..dec24b612c63c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestBlockingThreadPoolExecutorService.java @@ -16,15 +16,12 @@ * limitations under the License. */ -package org.apache.hadoop.fs.s3a; - -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; -import org.apache.hadoop.util.SemaphoredDelegatingExecutor; -import org.apache.hadoop.util.StopWatch; +package org.apache.hadoop.util; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,18 +29,21 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.apache.hadoop.test.AbstractHadoopTestBase; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** - * Basic test for S3A's blocking executor service. + * Test for the blocking executor service. */ -@Timeout(60) -public class ITestBlockingThreadPoolExecutorService { +public class TestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase { private static final Logger LOG = LoggerFactory.getLogger( - ITestBlockingThreadPoolExecutorService.class); + TestBlockingThreadPoolExecutorService.class); private static final int NUM_ACTIVE_TASKS = 4; private static final int NUM_WAITING_TASKS = 2; @@ -61,6 +61,7 @@ public static void afterClass() throws Exception { ensureDestroyed(); } + /** * Basic test of running one trivial task. */ @@ -68,8 +69,7 @@ public static void afterClass() throws Exception { public void testSubmitCallable() throws Exception { ensureCreated(); Future f = tpe.submit(callableSleeper); - Integer v = f.get(); - assertEquals(SOME_VALUE, v); + Assertions.assertThat(f.get()).isEqualTo(SOME_VALUE); } /** @@ -90,9 +90,9 @@ public void testSubmitRunnable() throws Exception { protected void verifyQueueSize(ExecutorService executorService, int expectedQueueSize) { CountDownLatch latch = new CountDownLatch(1); - for (int i = 0; i < expectedQueueSize; i++) { - executorService.submit(new LatchedSleeper(latch)); - } + IntStream.range(0, expectedQueueSize) + .mapToObj(i -> new LatchedSleeper(latch)) + .forEach(executorService::submit); StopWatch stopWatch = new StopWatch().start(); latch.countDown(); executorService.submit(sleeper); @@ -120,6 +120,27 @@ public void testChainedQueue() throws Throwable { verifyQueueSize(wrapper, size); } + @Test + public void testShutdownQueueRejectsOperations() throws Throwable { + ensureCreated(); + tpe.shutdown(); + try { + Assertions.assertThat(tpe.isShutdown()) + .describedAs("%s should be shutdown", tpe) + .isTrue(); + // runnable + intercept(RejectedExecutionException.class, () -> + tpe.submit(failToRun)); + // callable + intercept(RejectedExecutionException.class, () -> + tpe.submit(() -> 0)); + intercept(RejectedExecutionException.class, () -> + tpe.execute(failToRun)); + } finally { + tpe = null; + } + } + // Helper functions, etc. private void assertDidBlock(StopWatch sw) { @@ -132,27 +153,26 @@ private void assertDidBlock(StopWatch sw) { } } - private Runnable sleeper = new Runnable() { - @Override - public void run() { - String name = Thread.currentThread().getName(); - try { - Thread.sleep(TASK_SLEEP_MSEC); - } catch (InterruptedException e) { - LOG.info("Thread {} interrupted.", name); - Thread.currentThread().interrupt(); - } - } + private Runnable failToRun = () -> { + throw new RuntimeException("Failed to Run"); }; - private Callable callableSleeper = new Callable() { - @Override - public Integer call() throws Exception { - sleeper.run(); - return SOME_VALUE; + private Runnable sleeper = () -> { + String name = Thread.currentThread().getName(); + try { + Thread.sleep(TASK_SLEEP_MSEC); + } catch (InterruptedException e) { + LOG.info("Thread {} interrupted.", name); + Thread.currentThread().interrupt(); } }; + private Callable callableSleeper = () -> { + sleeper.run(); + return SOME_VALUE; + }; + + private class LatchedSleeper implements Runnable { private final CountDownLatch latch; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java deleted file mode 100644 index b2057c211da7b..0000000000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import java.io.IOException; -import java.util.ListIterator; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; - -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; -import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse; -import software.amazon.awssdk.services.s3.model.MultipartUpload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.s3a.api.RequestFactory; -import org.apache.hadoop.fs.s3a.impl.StoreContext; -import org.apache.hadoop.fs.store.audit.AuditSpan; - -import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; - - -/** - * MultipartUtils upload-specific functions for use by S3AFileSystem and Hadoop - * CLI. - * The Audit span active when - * {@link #listMultipartUploads(StoreContext, S3Client, String, int)} - * was invoked is retained for all subsequent operations. - */ -public final class MultipartUtils { - - private static final Logger LOG = - LoggerFactory.getLogger(MultipartUtils.class); - - /** Not instantiated. */ - private MultipartUtils() { } - - /** - * List outstanding multipart uploads. - * Package private: S3AFileSystem and tests are the users of this. - * - * @param storeContext store context - * @param s3 AmazonS3 client to use. - * @param prefix optional key prefix to narrow search. If null then whole - * bucket will be searched. - * @param maxKeys maximum batch size to request at a time from S3. - * @return an iterator of matching uploads - */ - static RemoteIterator listMultipartUploads( - final StoreContext storeContext, - S3Client s3, - @Nullable String prefix, - int maxKeys) - throws IOException { - return new MultipartUtils.UploadIterator(storeContext, - s3, - maxKeys, - prefix); - } - - /** - * Simple RemoteIterator wrapper for AWS `listMultipartUpload` API. - * Iterates over batches of multipart upload metadata listings. - * All requests are in the StoreContext's active span - * at the time the iterator was constructed. - */ - static class ListingIterator implements - RemoteIterator { - - private final String prefix; - - private final RequestFactory requestFactory; - - private final int maxKeys; - private final S3Client s3; - private final Invoker invoker; - - private final AuditSpan auditSpan; - - private final StoreContext storeContext; - - /** - * Most recent listing results. - */ - private ListMultipartUploadsResponse listing; - - /** - * Indicator that this is the first listing. - */ - private boolean firstListing = true; - - /** - * Count of list calls made. - */ - private int listCount = 0; - - ListingIterator(final StoreContext storeContext, - S3Client s3, - @Nullable String prefix, - int maxKeys) throws IOException { - this.storeContext = storeContext; - this.s3 = s3; - this.requestFactory = storeContext.getRequestFactory(); - this.maxKeys = maxKeys; - this.prefix = prefix; - this.invoker = storeContext.getInvoker(); - this.auditSpan = storeContext.getActiveAuditSpan(); - - // request the first listing. - requestNextBatch(); - } - - /** - * Iterator has data if it is either is the initial iteration, or - * the last listing obtained was incomplete. - * @throws IOException not thrown by this implementation. - */ - @Override - public boolean hasNext() throws IOException { - if (listing == null) { - // shouldn't happen, but don't trust AWS SDK - return false; - } else { - return firstListing || listing.isTruncated(); - } - } - - /** - * Get next listing. First call, this returns initial set (possibly - * empty) obtained from S3. Subsequent calls my block on I/O or fail. - * @return next upload listing. - * @throws IOException if S3 operation fails. - * @throws NoSuchElementException if there are no more uploads. - */ - @Override - @Retries.RetryTranslated - public ListMultipartUploadsResponse next() throws IOException { - if (firstListing) { - firstListing = false; - } else { - if (listing == null || !listing.isTruncated()) { - // nothing more to request: fail. - throw new NoSuchElementException("No more uploads under " + prefix); - } - // need to request a new set of objects. - requestNextBatch(); - } - return listing; - } - - @Override - public String toString() { - return "Upload iterator: prefix " + prefix - + "; list count " + listCount - + "; upload count " + listing.uploads().size() - + "; isTruncated=" + listing.isTruncated(); - } - - @Retries.RetryTranslated - private void requestNextBatch() throws IOException { - try (AuditSpan span = auditSpan.activate()) { - ListMultipartUploadsRequest.Builder requestBuilder = requestFactory - .newListMultipartUploadsRequestBuilder(prefix); - if (!firstListing) { - requestBuilder.keyMarker(listing.nextKeyMarker()); - requestBuilder.uploadIdMarker(listing.nextUploadIdMarker()); - } - requestBuilder.maxUploads(maxKeys); - - ListMultipartUploadsRequest request = requestBuilder.build(); - - LOG.debug("[{}], Requesting next {} uploads prefix {}, " + - "next key {}, next upload id {}", listCount, maxKeys, prefix, - request.keyMarker(), request.uploadIdMarker()); - listCount++; - - listing = invoker.retry("listMultipartUploads", prefix, true, - trackDurationOfOperation(storeContext.getInstrumentation(), - OBJECT_MULTIPART_UPLOAD_LIST.getSymbol(), - () -> s3.listMultipartUploads(requestBuilder.build()))); - LOG.debug("Listing found {} upload(s)", - listing.uploads().size()); - LOG.debug("New listing state: {}", this); - } - } - } - - /** - * Iterator over multipart uploads. Similar to - * {@link org.apache.hadoop.fs.s3a.Listing.FileStatusListingIterator}, but - * iterates over pending uploads instead of existing objects. - */ - public static class UploadIterator - implements RemoteIterator { - - /** - * Iterator for issuing new upload list requests from - * where the previous one ended. - */ - private ListingIterator lister; - /** Current listing: the last upload listing we fetched. */ - private ListMultipartUploadsResponse listing; - /** Iterator over the current listing. */ - private ListIterator batchIterator; - - /** - * Construct an iterator to list uploads under a path. - * @param storeContext store context - * @param s3 s3 client - * @param maxKeys max # of keys to list per batch - * @param prefix prefix - * @throws IOException listing failure. - */ - @Retries.RetryTranslated - public UploadIterator( - final StoreContext storeContext, - S3Client s3, - int maxKeys, - @Nullable String prefix) - throws IOException { - - lister = new ListingIterator(storeContext, s3, prefix, - maxKeys); - requestNextBatch(); - } - - @Override - public boolean hasNext() throws IOException { - return (batchIterator.hasNext() || requestNextBatch()); - } - - @Override - public MultipartUpload next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return batchIterator.next(); - } - - private boolean requestNextBatch() throws IOException { - if (lister.hasNext()) { - listing = lister.next(); - batchIterator = listing.uploads().listIterator(); - return batchIterator.hasNext(); - } - return false; - } - } -} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a45ed720b9ce3..e33db9c4c2766 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -25,13 +25,9 @@ import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.AccessDeniedException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.EnumSet; import java.util.Iterator; import java.util.List; @@ -53,19 +49,13 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; import software.amazon.awssdk.services.s3.model.HeadBucketResponse; import software.amazon.awssdk.services.s3.model.MultipartUpload; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; -import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; @@ -77,8 +67,6 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.StorageClass; -import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.transfer.s3.model.CompletedCopy; import software.amazon.awssdk.transfer.s3.model.Copy; import software.amazon.awssdk.transfer.s3.model.CopyRequest; @@ -146,7 +134,6 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.impl.StoreContextFactory; -import org.apache.hadoop.fs.s3a.impl.UploadContentProviders; import org.apache.hadoop.fs.s3a.impl.CSEUtils; import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters; @@ -253,7 +240,6 @@ import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; -import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; @@ -272,7 +258,6 @@ import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.pairedTrackerFactory; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.util.RateLimitingFactory.unlimitedRate; @@ -1363,9 +1348,12 @@ public FlagSet getPerformanceFlags() { /** * Get the store for low-level operations. + * This is absolutely not for external access; it's a single method + * to ease use throughout internal code. * @return the store the S3A FS is working through. */ - private S3AStore getStore() { + @InterfaceAudience.Private + public S3AStore getStore() { return store; } @@ -1430,7 +1418,11 @@ private void initMultipartUploads(Configuration conf) throws IOException { Duration.ofSeconds(DEFAULT_PURGE_EXISTING_MULTIPART_AGE), TimeUnit.SECONDS, Duration.ZERO); - abortOutstandingMultipartUploads(purgeDuration.getSeconds()); + getStore().getStoreWriter().abortOutstandingMultipartUploads( + purgeDuration.getSeconds(), + "", + maxKeys, + createStoreContext()); } catch (AccessDeniedException e) { instrumentation.errorIgnored(); LOG.debug("Failed to purge multipart uploads against {}," + @@ -1439,34 +1431,6 @@ private void initMultipartUploads(Configuration conf) throws IOException { } } - /** - * Abort all outstanding MPUs older than a given age. - * @param seconds time in seconds - * @throws IOException on any failure, other than 403 "permission denied" - */ - @Retries.RetryTranslated - public void abortOutstandingMultipartUploads(long seconds) - throws IOException { - Preconditions.checkArgument(seconds >= 0); - Instant purgeBefore = - Instant.now().minusSeconds(seconds); - LOG.debug("Purging outstanding multipart uploads older than {}", - purgeBefore); - invoker.retry("Purging multipart uploads", bucket, true, - () -> { - RemoteIterator uploadIterator = - MultipartUtils.listMultipartUploads(createStoreContext(), - getS3Client(), null, maxKeys); - - while (uploadIterator.hasNext()) { - MultipartUpload upload = uploadIterator.next(); - if (upload.initiated().compareTo(purgeBefore) < 0) { - abortMultipartUpload(upload); - } - } - }); - } - /** * Return the protocol scheme for the FileSystem. * @@ -1933,31 +1897,6 @@ private ObjectInputStreamCallbacks createInputStreamCallbacks( } - /** - * Callbacks for WriteOperationHelper. - */ - private final class WriteOperationHelperCallbacksImpl - implements WriteOperationHelper.WriteOperationHelperCallbacks { - - @Override - @Retries.OnceRaw - public CompleteMultipartUploadResponse completeMultipartUpload( - CompleteMultipartUploadRequest request) { - return getStore().completeMultipartUpload(request); - } - - @Override - @Retries.OnceRaw - public UploadPartResponse uploadPart( - final UploadPartRequest request, - final RequestBody body, - final DurationTrackerFactory durationTrackerFactory) - throws AwsServiceException, UncheckedIOException { - return getStore().uploadPart(request, body, durationTrackerFactory); - } - - } - /** * Create the read context for reading from the referenced file, * using FS state as well as the status. @@ -2222,21 +2161,6 @@ private FSDataOutputStream innerCreateFile( new S3ABlockOutputStream(builder), null); } - /** - * Create a Write Operation Helper with the current active span. - * All operations made through this helper will activate the - * span before execution. - * - * This class permits other low-level operations against the store. - * It is unstable and - * only intended for code with intimate knowledge of the object store. - * If using this, be prepared for changes even on minor point releases. - * @return a new helper. - */ - @InterfaceAudience.Private - public WriteOperationHelper getWriteOperationHelper() { - return createWriteOperationHelper(getActiveAuditSpan()); - } /** * Create a Write Operation Helper with the given span. @@ -2247,12 +2171,11 @@ public WriteOperationHelper getWriteOperationHelper() { */ @InterfaceAudience.Private public WriteOperationHelper createWriteOperationHelper(AuditSpan auditSpan) { - return new WriteOperationHelper(this, - getConf(), - statisticsContext, + return new WriteOperationHelper( getAuditSpanSource(), auditSpan, - new WriteOperationHelperCallbacksImpl()); + getStore().createWriteOperationHelperCallbacks() + ); } /** @@ -2702,12 +2625,12 @@ private long abortMultipartUploadsUnderPrefix(StoreContext storeContext, span.activate(); // this deactivates the audit span somehow final RemoteIterator uploads = - listUploadsUnderPrefix(storeContext, prefix); + getStore().getStoreWriter().listMultipartUploads(storeContext, prefix, maxKeys); // so reactivate it. span.activate(); return foreach(uploads, upload -> invoker.retry("Aborting multipart commit", upload.key(), true, () -> - abortMultipartUpload(upload))); + getStore().getStoreWriter().abortMultipartUpload(upload))); } /** @@ -2939,14 +2862,7 @@ protected void incrementGauge(Statistic statistic, long count) { * @param ex exception. */ public void operationRetried(Exception ex) { - if (isThrottleException(ex)) { - LOG.debug("Request throttled"); - incrementStatistic(STORE_IO_THROTTLED); - statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1); - } else { - incrementStatistic(STORE_IO_RETRY); - incrementStatistic(IGNORED_ERRORS); - } + getStore().operationRetried(ex); } /** @@ -3178,7 +3094,9 @@ public void incrementWriteOperations() { * Increments the {@code OBJECT_DELETE_REQUESTS} and write * operation statistics. * This call does not create any mock parent entries. - * + *

+ * This method MUST NOT check for the fs being open, because + * it will be called during any exit cleanup. * Retry policy: retry untranslated; delete considered idempotent. * @param key key to blob to delete. * @throws SdkException problems working with S3 @@ -3287,128 +3205,8 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, @Retries.OnceRaw public UploadInfo putObject(PutObjectRequest putObjectRequest, File file, ProgressableProgressListener listener) throws IOException { - return getStore().putObject(putObjectRequest, file, listener); - } - - /** - * PUT an object directly (i.e. not via the transfer manager). - * Byte length is calculated from the file length, or, if there is no - * file, from the content length of the header. - * - * Retry Policy: none. - * Auditing: must be inside an audit span. - * Important: this call will close any input stream in the request. - * @param putObjectRequest the request - * @param putOptions put object options - * @param uploadData data to be uploaded - * @param durationTrackerFactory factory for duration tracking - * @return the upload initiated - * @throws SdkException on problems - */ - @VisibleForTesting - @Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed") - PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest, - PutObjectOptions putOptions, - S3ADataBlocks.BlockUploadData uploadData, - DurationTrackerFactory durationTrackerFactory) - throws SdkException { - - long len = getPutRequestLength(putObjectRequest); - LOG.debug("PUT {} bytes to {}", len, putObjectRequest.key()); - incrementPutStartStatistics(len); - final UploadContentProviders.BaseContentProvider provider = - uploadData.getContentProvider(); - try { - PutObjectResponse response = - trackDurationOfSupplier(nonNullDurationTrackerFactory(durationTrackerFactory), - OBJECT_PUT_REQUESTS.getSymbol(), - () -> getS3Client().putObject(putObjectRequest, - RequestBody.fromContentProvider( - provider, - provider.getSize(), - CONTENT_TYPE_OCTET_STREAM))); - incrementPutCompletedStatistics(true, len); - return response; - } catch (SdkException e) { - incrementPutCompletedStatistics(false, len); - throw e; - } - } - - /** - * Get the length of the PUT, verifying that the length is known. - * @param putObjectRequest a request bound to a file or a stream. - * @return the request length - * @throws IllegalArgumentException if the length is negative - */ - private long getPutRequestLength(PutObjectRequest putObjectRequest) { - long len = putObjectRequest.contentLength(); - - Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length"); - return len; - } - - /** - * Upload part of a multi-partition file. - * Increments the write and put counters. - * Important: this call does not close any input stream in the body. - * - * Retry Policy: none. - * @param durationTrackerFactory duration tracker factory for operation - * @param request the upload part request. - * @param body the request body. - * @return the result of the operation. - * @throws AwsServiceException on problems - */ - @Retries.OnceRaw - UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body, - final DurationTrackerFactory durationTrackerFactory) - throws AwsServiceException { - long len = request.contentLength(); - incrementPutStartStatistics(len); - try { - UploadPartResponse uploadPartResponse = trackDurationOfSupplier( - nonNullDurationTrackerFactory(durationTrackerFactory), - MULTIPART_UPLOAD_PART_PUT.getSymbol(), () -> - getS3Client().uploadPart(request, body)); - incrementPutCompletedStatistics(true, len); - return uploadPartResponse; - } catch (AwsServiceException e) { - incrementPutCompletedStatistics(false, len); - throw e; - } - } - - /** - * At the start of a put/multipart upload operation, update the - * relevant counters. - * - * @param bytes bytes in the request. - */ - protected void incrementPutStartStatistics(long bytes) { - getStore().incrementPutStartStatistics(bytes); - } - - /** - * At the end of a put/multipart upload operation, update the - * relevant counters and gauges. - * - * @param success did the operation succeed? - * @param bytes bytes in the request. - */ - protected void incrementPutCompletedStatistics(boolean success, long bytes) { - getStore().incrementPutCompletedStatistics(success, bytes); - } - - /** - * Callback for use in progress callbacks from put/multipart upload events. - * Increments those statistics which are expected to be updated during - * the ongoing upload operation. - * @param key key to file that is being written (for logging) - * @param bytes bytes successfully uploaded. - */ - protected void incrementPutProgressStatistics(String key, long bytes) { - getStore().incrementPutProgressStatistics(key, bytes); + return getStore().getStoreWriter() + .putObject(putObjectRequest, file, listener); } /** @@ -4276,7 +4074,8 @@ PutObjectResponse executePut( ProgressableProgressListener listener = new ProgressableProgressListener(store, key, progress); UploadInfo info = putObject(putObjectRequest, file, listener); - PutObjectResponse result = getStore().waitForUploadCompletion(key, info).response(); + PutObjectResponse result = getStore().getStoreWriter() + .waitForUploadCompletion(key, info).response(); listener.uploadCompleted(info.getFileUpload()); return result; } @@ -4421,6 +4220,14 @@ private void checkNotClosed() throws PathIOException { } } + /** + * Check the FS is running. + * @throws IllegalStateException if closed + */ + protected void checkRunning() throws IllegalStateException { + Preconditions.checkState(!isClosed, "FileSystem is closed"); + } + /** * Get the delegation token support for this filesystem; * not null iff delegation support is enabled. @@ -4641,23 +4448,6 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size, return response; } - /** - * Initiate a multipart upload from the preconfigured request. - * Retry policy: none + untranslated. - * @param request request to initiate - * @return the result of the call - * @throws SdkException on failures inside the AWS SDK - * @throws IOException Other IO problems - */ - @Retries.OnceRaw - CreateMultipartUploadResponse initiateMultipartUpload( - CreateMultipartUploadRequest request) throws IOException { - LOG.debug("Initiate multipart upload to {}", request.key()); - return trackDurationOfSupplier(getDurationTrackerFactory(), - OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), - () -> getS3Client().createMultipartUpload(request)); - } - /** * Perform post-write actions. *

@@ -4707,12 +4497,12 @@ private void createEmptyObject(final String objectName, PutObjectOptions putOpti new byte[0], 0, 0, null); invoker.retry("PUT 0-byte object ", objectName, true, - () -> putObjectDirect( + () -> getStore().getStoreWriter().putObjectDirect( getRequestFactory().newDirectoryMarkerRequest(objectName).build(), putOptions, uploadData, getDurationTrackerFactory())); - incrementPutProgressStatistics(objectName, 0); + incrementWriteOperations(); instrumentation.directoryCreated(); } @@ -5251,8 +5041,6 @@ S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status) /** * List any pending multipart uploads whose keys begin with prefix, using * an iterator that can handle an unlimited number of entries. - * See {@link #listMultipartUploads(String)} for a non-iterator version of - * this. * * @param prefix optional key prefix to search * @return Iterator over multipart uploads. @@ -5263,100 +5051,15 @@ S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status) @AuditEntryPoint public RemoteIterator listUploads(@Nullable String prefix) throws IOException { - // span is picked up retained in the listing. checkNotClosed(); + // span is picked up retained in the listing. try (AuditSpan span = createSpan(MULTIPART_UPLOAD_LIST.getSymbol(), prefix, null)) { - return listUploadsUnderPrefix(createStoreContext(), prefix); + return getStore().getStoreWriter() + .listMultipartUploads(createStoreContext(), prefix, maxKeys); } } - /** - * List any pending multipart uploads whose keys begin with prefix, using - * an iterator that can handle an unlimited number of entries. - * See {@link #listMultipartUploads(String)} for a non-iterator version of - * this. - * @param storeContext store conext. - * @param prefix optional key prefix to search - * @return Iterator over multipart uploads. - * @throws IOException on failure - */ - @InterfaceAudience.Private - @Retries.RetryTranslated - public RemoteIterator listUploadsUnderPrefix( - final StoreContext storeContext, - final @Nullable String prefix) - throws IOException { - // span is picked up retained in the listing. - String p = prefix; - if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) { - p = prefix + "/"; - } - // duration tracking is done in iterator. - return MultipartUtils.listMultipartUploads(storeContext, getS3Client(), p, maxKeys); - } - - /** - * Listing all multipart uploads; limited to the first few hundred. - * See {@link #listUploads(String)} for an iterator-based version that does - * not limit the number of entries returned. - * Retry policy: retry, translated. - * @return a listing of multipart uploads. - * @param prefix prefix to scan for, "" for none - * @throws IOException IO failure, including any uprated SdkException - */ - @InterfaceAudience.Private - @Retries.RetryTranslated - public List listMultipartUploads(String prefix) - throws IOException { - // add a trailing / if needed. - if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) { - prefix = prefix + "/"; - } - String p = prefix; - return invoker.retry("listMultipartUploads", p, true, () -> { - final ListMultipartUploadsRequest request = getRequestFactory() - .newListMultipartUploadsRequestBuilder(p).build(); - return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () -> - getS3Client().listMultipartUploads(request).uploads()); - }); - } - - /** - * Abort a multipart upload. - * Retry policy: none. - * @param destKey destination key - * @param uploadId Upload ID - * @throws IOException IO failure, including any uprated SdkException - */ - @Retries.OnceTranslated - public void abortMultipartUpload(String destKey, String uploadId) throws IOException { - LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey); - trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> - getS3Client().abortMultipartUpload( - getRequestFactory().newAbortMultipartUploadRequestBuilder( - destKey, - uploadId).build())); - } - - /** - * Abort a multipart upload. - * Retry policy: none. - * @param upload the listed upload to abort. - * @throws IOException IO failure, including any uprated SdkException - */ - @Retries.OnceTranslated - public void abortMultipartUpload(MultipartUpload upload) throws IOException { - String destKey = upload.key(); - String uploadId = upload.uploadId(); - if (LOG.isDebugEnabled()) { - DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}", - uploadId, destKey, upload.initiator(), - df.format(Date.from(upload.initiated()))); - } - abortMultipartUpload(destKey, uploadId); - } /** * Create a new instance of the committer statistics. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java index 95019807b383b..f41bf08655c48 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java @@ -24,24 +24,15 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CancellationException; -import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.services.s3.model.UploadPartResponse; -import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -52,12 +43,15 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.ClientManager; import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException; +import org.apache.hadoop.fs.s3a.impl.StoreWriterService; +import org.apache.hadoop.fs.s3a.impl.write.StoreWriter; import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.s3a.api.IORateLimiting; import org.apache.hadoop.service.Service; /** @@ -77,6 +71,7 @@ @InterfaceStability.Unstable public interface S3AStore extends ClientManager, + IORateLimiting, IOStatisticsSource, ObjectInputStreamFactory, PathCapabilities, @@ -98,16 +93,75 @@ public interface S3AStore extends */ Duration acquireReadCapacity(int capacity); + /** + * Create a new store context. + * @return a new store context. + */ + StoreContext createStoreContext(); + StoreContext getStoreContext(); DurationTrackerFactory getDurationTrackerFactory(); + S3AInstrumentation getInstrumentation(); + S3AStatisticsContext getStatisticsContext(); RequestFactory getRequestFactory(); ClientManager clientManager(); + /** + * Get the store writer operations. + * @return an instance of StoreWriterService. + */ + default StoreWriterService getStoreWriter() { + return lookupService(StoreWriter.STORE_WRITER, StoreWriterService.class); + } + + /** + * Look up a service by name, validate its classtype and then return the cast value. + * This allows for the lookup of any registered service within the store, if the name + * and type is known. + * @param name service name + * @param serviceClass service class. + * @return the class + * @param type of service + * @throws IllegalStateException if the service is not found or the class type is wrong. + */ + S lookupService(String name, Class serviceClass); + + /** + * Decrement a gauge by a specific value. + * @param statistic The operation to decrement + * @param count the count to decrement + */ + void decrementGauge(Statistic statistic, long count); + + /** + * Increment a gauge by a specific value. + * @param statistic The operation to increment + * @param count the count to increment + */ + void incrementGauge(Statistic statistic, long count); + + /** + * Callback when an operation was retried. + * Increments the statistics of ignored errors or throttled requests, + * depending up on the exception class. + * @param ex exception. + */ + void operationRetried(Exception ex); + + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + void operationRetried(String text, Exception ex, int retries, boolean idempotent); + /** * Increment read operations. */ @@ -148,7 +202,7 @@ public interface S3AStore extends /** * Given a possibly null duration tracker factory, return a non-null - * one for use in tracking durations -either that or the FS tracker + * one for use in tracking durations -either that or the store tracker * itself. * * @param factory factory. @@ -207,10 +261,11 @@ Map.Entry deleteObjects(DeleteObjectsRequest de * @throws SdkException problems working with S3 * @throws IllegalArgumentException if the request was rejected due to * a mistaken attempt to delete the root directory. + * @throws UncheckedIOException from invocation operations */ @Retries.RetryRaw Map.Entry> deleteObject( - DeleteObjectRequest request) throws SdkException; + DeleteObjectRequest request) throws SdkException, UncheckedIOException; /** * Performs a HEAD request on an S3 object to retrieve its metadata. @@ -246,76 +301,20 @@ ResponseInputStream getRangedS3Object(String key, long end) throws IOException; /** - * Upload part of a multi-partition file. - * Increments the write and put counters. - * Important: this call does not close any input stream in the body. - *

- * Retry Policy: none. - * @param durationTrackerFactory duration tracker factory for operation - * @param request the upload part request. - * @param body the request body. - * @return the result of the operation. - * @throws AwsServiceException on problems - * @throws UncheckedIOException failure to instantiate the s3 client - */ - @Retries.OnceRaw - UploadPartResponse uploadPart( - UploadPartRequest request, - RequestBody body, - DurationTrackerFactory durationTrackerFactory) - throws AwsServiceException, UncheckedIOException; - - /** - * Start a transfer-manager managed async PUT of an object, - * incrementing the put requests and put bytes - * counters. - *

- * It does not update the other counters, - * as existing code does that as progress callbacks come in. - * Byte length is calculated from the file length, or, if there is no - * file, from the content length of the header. - *

- * Because the operation is async, any stream supplied in the request - * must reference data (files, buffers) which stay valid until the upload - * completes. - * Retry policy: N/A: the transfer manager is performing the upload. - * Auditing: must be inside an audit span. - * @param putObjectRequest the request - * @param file the file to be uploaded - * @param listener the progress listener for the request - * @return the upload initiated - * @throws IOException if transfer manager creation failed. + * Delete an object after acquiring write capacity. + * This call does not create any mock parent entries. + * Retry policy: retry untranslated; delete considered idempotent. + * @param key key of entry + * @param isFile is the path a file (used for instrumentation only) + * @throws SdkException problems working with S3 + * @throws UncheckedIOException from invoker signature only -should not be raised. */ - @Retries.OnceRaw - UploadInfo putObject( - PutObjectRequest putObjectRequest, - File file, - ProgressableProgressListener listener) throws IOException; + @Retries.RetryRaw + void deleteObjectAtPath( + String key, + boolean isFile) + throws SdkException, UncheckedIOException; - /** - * Wait for an upload to complete. - * If the upload (or its result collection) failed, this is where - * the failure is raised as an AWS exception. - * Calls {@link S3AStore#incrementPutCompletedStatistics(boolean, long)} - * to update the statistics. - * @param key destination key - * @param uploadInfo upload to wait for - * @return the upload result - * @throws IOException IO failure - * @throws CancellationException if the wait() was cancelled - */ - @Retries.OnceTranslated - CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo) - throws IOException; - - /** - * Complete a multipart upload. - * @param request request - * @return the response - */ - @Retries.OnceRaw - CompleteMultipartUploadResponse completeMultipartUpload( - CompleteMultipartUploadRequest request); /** * Get the directory allocator. @@ -366,4 +365,19 @@ default boolean hasCapability(String capability) { /* =============== END ObjectInputStreamFactory =============== */ + + /* + =============== BEGIN WriteOperationHelperCallbacks =============== + */ + + /** + * Create a new instance of the write operation callbacks. + * @return a callback instance + */ + WriteOperationHelper.WriteOperationHelperCallbacks createWriteOperationHelperCallbacks(); + + + /* + =============== END WriteOperationHelperCallbacks =============== + */ } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 364f780863a01..8a0f2f7998a54 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -26,11 +26,13 @@ import java.util.concurrent.atomic.AtomicInteger; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.MultipartUpload; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; @@ -43,18 +45,15 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.api.RequestFactory; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; -import org.apache.hadoop.fs.s3a.impl.StoreContext; -import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.util.functional.CallableRaisingIOE; -import static org.apache.hadoop.util.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.Invoker.*; import static org.apache.hadoop.fs.store.audit.AuditingFunctions.withinAuditSpan; @@ -81,7 +80,7 @@ * This API is for internal use only. * Span scoping: This helper is instantiated with span; it will be used * before operations which query/update S3 - * + *

* History *

  * - A nested class in S3AFileSystem
@@ -89,6 +88,7 @@
  * - [HADOOP-13786] A separate class, single instance in S3AFS
  * - [HDFS-13934] Split into interface and implementation
  * - [HADOOP-15711] Adds audit tracking; one instance per use.
+ * - [HADOOP-19569] Bond to S3AStore services, removing all use of S3AFS.
  * 
*/ @InterfaceAudience.Private @@ -97,11 +97,6 @@ public class WriteOperationHelper implements WriteOperations { private static final Logger LOG = LoggerFactory.getLogger(WriteOperationHelper.class); - /** - * Owning filesystem. - */ - private final S3AFileSystem owner; - /** * Invoker for operations; uses the S3A retry policy and calls int * {@link #operationRetried(String, Exception, int, boolean)} on retries. @@ -114,16 +109,6 @@ public class WriteOperationHelper implements WriteOperations { /** Bucket of the owner FS. */ private final String bucket; - /** - * statistics context. - */ - private final S3AStatisticsContext statisticsContext; - - /** - * Store Context; extracted from owner. - */ - private final StoreContext storeContext; - /** * Source of Audit spans. */ @@ -132,7 +117,7 @@ public class WriteOperationHelper implements WriteOperations { /** * Audit Span. */ - private AuditSpan auditSpan; + private final AuditSpan auditSpan; /** * Factory for AWS requests. @@ -142,34 +127,26 @@ public class WriteOperationHelper implements WriteOperations { /** * WriteOperationHelper callbacks. */ - private final WriteOperationHelperCallbacks writeOperationHelperCallbacks; + private final WriteOperationHelperCallbacks callbacks; /** * Constructor. - * @param owner owner FS creating the helper - * @param conf Configuration object - * @param statisticsContext statistics context * @param auditSpanSource source of spans * @param auditSpan span to activate - * @param writeOperationHelperCallbacks callbacks used by writeOperationHelper + * @param callbacks callbacks used by writeOperationHelper */ - protected WriteOperationHelper(S3AFileSystem owner, - Configuration conf, - S3AStatisticsContext statisticsContext, + protected WriteOperationHelper( final AuditSpanSource auditSpanSource, final AuditSpan auditSpan, - final WriteOperationHelperCallbacks writeOperationHelperCallbacks) { - this.owner = owner; - this.invoker = new Invoker(new S3ARetryPolicy(conf), + final WriteOperationHelperCallbacks callbacks) { + this.conf = requireNonNull(callbacks.getConf()); + this.invoker = new Invoker(new S3ARetryPolicy(this.conf), this::operationRetried); - this.conf = conf; - this.statisticsContext = statisticsContext; - this.storeContext = owner.createStoreContext(); - this.bucket = owner.getBucket(); - this.auditSpanSource = auditSpanSource; - this.auditSpan = checkNotNull(auditSpan); - this.requestFactory = owner.getRequestFactory(); - this.writeOperationHelperCallbacks = writeOperationHelperCallbacks; + this.bucket = requireNonNull(callbacks.getBucket()); + this.auditSpanSource = requireNonNull(auditSpanSource); + this.auditSpan = requireNonNull(auditSpan); + this.requestFactory = requireNonNull(callbacks.getRequestFactory()); + this.callbacks = callbacks; } /** @@ -183,7 +160,7 @@ void operationRetried(String text, Exception ex, int retries, boolean idempotent) { LOG.info("{}: Retried {}: {}", text, retries, ex.toString()); LOG.debug("Stack", ex); - owner.operationRetried(text, ex, retries, idempotent); + callbacks.operationRetried(text, ex, retries, idempotent); } /** @@ -280,7 +257,7 @@ public String initiateMultiPartUpload( final CreateMultipartUploadRequest.Builder initiateMPURequestBuilder = getRequestFactory().newMultipartUploadRequestBuilder( destKey, options); - return owner.initiateMultipartUpload(initiateMPURequestBuilder.build()) + return callbacks.initiateMultipartUpload(initiateMPURequestBuilder.build()) .uploadId(); }); } @@ -319,7 +296,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload( () -> { final CompleteMultipartUploadRequest.Builder requestBuilder = getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions); - return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build()); + return callbacks.completeMultipartUpload(requestBuilder.build()); }); return uploadResult; } @@ -350,8 +327,8 @@ public CompleteMultipartUploadResponse completeMPUwithRetries( AtomicInteger errorCount, PutObjectOptions putOptions) throws IOException { - checkNotNull(uploadId); - checkNotNull(partETags); + requireNonNull(uploadId); + requireNonNull(partETags); LOG.debug("Completing multipart upload {} with {} parts", uploadId, partETags.size()); return finalizeMultipartUpload(destKey, @@ -382,14 +359,14 @@ public void abortMultipartUpload(String destKey, String uploadId, true, retrying, withinAuditSpan(getAuditSpan(), () -> - owner.abortMultipartUpload( + callbacks.abortMultipartUpload( destKey, uploadId))); } else { // single pass attempt. once("Aborting multipart upload ID " + uploadId, destKey, withinAuditSpan(getAuditSpan(), () -> - owner.abortMultipartUpload( + callbacks.abortMultipartUpload( destKey, uploadId))); } @@ -406,7 +383,7 @@ public void abortMultipartUpload(MultipartUpload upload) throws FileNotFoundException, IOException { invoker.retry("Aborting multipart commit", upload.key(), true, withinAuditSpan(getAuditSpan(), - () -> owner.abortMultipartUpload(upload))); + () -> callbacks.abortMultipartUpload(upload))); } @@ -440,7 +417,7 @@ public int abortMultipartUploadsUnderPath(String prefix) public List listMultipartUploads(final String prefix) throws IOException { activateAuditSpan(); - return owner.listMultipartUploads(prefix); + return callbacks.listMultipartUploads(prefix); } /** @@ -518,7 +495,7 @@ public PutObjectResponse putObject(PutObjectRequest putObjectRequest, DurationTrackerFactory durationTrackerFactory) throws IOException { return retry("Writing Object", putObjectRequest.key(), true, withinAuditSpan(getAuditSpan(), - () -> owner.putObjectDirect(putObjectRequest, putOptions, uploadData, + () -> callbacks.putObjectDirect(putObjectRequest, putOptions, uploadData, durationTrackerFactory))); } @@ -533,9 +510,7 @@ public PutObjectResponse putObject(PutObjectRequest putObjectRequest, public void revertCommit(String destKey) throws IOException { once("revert commit", destKey, withinAuditSpan(getAuditSpan(), () -> { - Path destPath = owner.keyToQualifiedPath(destKey); - owner.deleteObjectAtPath(destPath, - destKey, true); + callbacks.deleteObjectAtPath(destKey, true); })); } @@ -559,8 +534,8 @@ public CompleteMultipartUploadResponse commitUpload( List partETags, long length) throws IOException { - checkNotNull(uploadId); - checkNotNull(partETags); + requireNonNull(uploadId); + requireNonNull(partETags); LOG.debug("Completing multipart upload {} with {} parts", uploadId, partETags.size()); return finalizeMultipartUpload(destKey, @@ -588,7 +563,7 @@ public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body request.key(), true, withinAuditSpan(getAuditSpan(), - () -> writeOperationHelperCallbacks.uploadPart(request, + () -> callbacks.uploadPart(request, body, durationTrackerFactory))); } @@ -611,7 +586,7 @@ public AuditSpan createSpan(final String operation, @Override public void incrementWriteOperations() { - owner.incrementWriteOperations(); + callbacks.incrementWriteOperations(); } /** @@ -635,6 +610,28 @@ public RequestFactory getRequestFactory() { */ public interface WriteOperationHelperCallbacks { + /** + * PUT an object directly (i.e. not via the transfer manager). + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + * + * Retry Policy: none. + * Auditing: must be inside an audit span. + * Important: this call will close any input stream in the request. + * @param putObjectRequest the request + * @param putOptions put object options + * @param uploadData data to be uploaded + * @param durationTrackerFactory factory for duration tracking + * @return the upload initiated + * @throws SdkException on problems + */ + @Retries.OnceRaw() + PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest, + PutObjectOptions putOptions, + S3ADataBlocks.BlockUploadData uploadData, + DurationTrackerFactory durationTrackerFactory) + throws SdkException; + /** * Initiates a complete multi-part upload request. * @param request Complete multi-part upload request @@ -663,6 +660,94 @@ UploadPartResponse uploadPart( RequestBody body, DurationTrackerFactory durationTrackerFactory) throws AwsServiceException, UncheckedIOException; + + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + void operationRetried( + String text, + Exception ex, + int retries, + boolean idempotent); + + /** + * Initiate a multipart upload from the preconfigured request. + * Retry policy: none + untranslated. + * @param request request to initiate + * @return the result of the call + * @throws SdkException on failures inside the AWS SDK + * @throws IOException Other IO problems + */ + @Retries.OnceRaw + CreateMultipartUploadResponse initiateMultipartUpload( + CreateMultipartUploadRequest request) throws IOException; + + /** + * Abort a multipart upload. + * Retry policy: none. + * @param upload the listed upload to abort. + * @throws IOException IO failure, including any uprated SdkException + */ + @Retries.OnceTranslated + void abortMultipartUpload(MultipartUpload upload) throws IOException; + + @Retries.OnceTranslated + void abortMultipartUpload(String destKey, String uploadId) throws IOException; + + /** + * List in-progress multipart uploads under a path: limited to the first + * few hundred. + * @param prefix prefix for uploads to list + * @return a list of in-progress multipart uploads + * @throws IOException on problems + */ + @Retries.RetryTranslated + List listMultipartUploads(String prefix) + throws IOException; + + /** + * Delete an object after acquiring write capacity. + * This call does not create any mock parent entries. + * Retry policy: retry untranslated; delete considered idempotent. + * @param key key of entry + * @param isFile is the path a file (used for instrumentation only) + * @throws SdkException problems working with S3 + * @throws UncheckedIOException from invoker signature only -should not be raised. + */ + @Retries.RetryRaw + void deleteObjectAtPath( + String key, + boolean isFile) + throws SdkException, UncheckedIOException; + + /** + * Increment the write operation counter. + * This is somewhat inaccurate, as it appears to be invoked more + * often than needed in progress callbacks. + */ + void incrementWriteOperations(); + + /** + * Get the name of the bucket this store is bound to. + * @return a non-empty string + */ + String getBucket(); + + /** + * Accessor for the store request factory.. + * @return request factory + */ + RequestFactory getRequestFactory(); + + /** + * Get the configuration. + * @return configuration of the store + */ + Configuration getConf(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 93d2506a4f3ad..725157dbc5bba 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -171,6 +171,7 @@ int abortMultipartUploadsUnderPath(String prefix) * @return a list of in-progress multipart uploads * @throws IOException on problems */ + @Retries.RetryTranslated List listMultipartUploads(String prefix) throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/IORateLimiting.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/IORateLimiting.java new file mode 100644 index 0000000000000..21dc8d02029af --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/IORateLimiting.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.api; + +import java.time.Duration; + +/** + * Interface for specific rate limiting of read and write operations. + */ +public interface IORateLimiting { + + /** + * Acquire write capacity for operations. + * This should be done within retry loops. + * @param capacity capacity to acquire. + * @return time spent waiting for output. + */ + Duration acquireWriteCapacity(int capacity); + + /** + * Acquire read capacity for operations. + * This should be done within retry loops. + * @param capacity capacity to acquire. + * @return time spent waiting for output. + */ + Duration acquireReadCapacity(int capacity); + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java index ba1dd400f6d7b..deb4da3a32a8a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -109,11 +109,13 @@ public PutTracker createTracker(Path path, String key, Statistic.COMMITTER_MAGIC_FILES_CREATED); if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) { tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(), - key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + key, destKey, pendingsetPath, + owner.createWriteOperationHelper(owner.getActiveAuditSpan()), trackerStatistics); } else { tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(), - key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + key, destKey, pendingsetPath, + owner.createWriteOperationHelper(owner.getActiveAuditSpan()), trackerStatistics); } LOG.debug("Created {}", tracker); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java index b60551088824c..05a9c473f7b26 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java @@ -33,6 +33,11 @@ */ public interface ClientManager extends Service { + /** + * Registered service name: {@value}. + */ + String CLIENT_MANAGER = "ClientManager"; + /** * Get the transfer manager, creating it and any dependencies if needed. * @return a transfer manager diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java index 350d9f09d16b7..7c85567b6386c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java @@ -112,7 +112,7 @@ public ClientManagerImpl( final S3ClientFactory unencryptedClientFactory, final S3ClientFactory.S3ClientCreationParameters clientCreationParameters, final DurationTrackerFactory durationTrackerFactory) { - super("ClientManager"); + super(CLIENT_MANAGER); this.clientFactory = requireNonNull(clientFactory); this.unencryptedClientFactory = unencryptedClientFactory; this.clientCreationParameters = requireNonNull(clientCreationParameters); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 8cf435f7ca603..0b7ad41fd2e2e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -313,4 +313,19 @@ private InternalConstants() { public static final String UPLOAD_PROGRESS_LOG_NAME = "org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"; + + /** + * Read capacity to acquire for each page of a multipart list: {@value}. + */ + public static final int MULTIPART_LIST_READ_CAPACITY = 1; + + /** + * Write capacity to abort a multipart upload: {@value}. + */ + public static final int MULTIPART_ABORT_WRITE_CAPACITY = 1; + + /** + * Write capacity to delete a single object: {@value}. + */ + public static final int OBJECT_DELETE_WRITE_CAPACITY = 1; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index 96ab44a8597f7..ac42ef5597104 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -22,11 +22,10 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionException; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -34,11 +33,8 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; @@ -48,14 +44,8 @@ import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Error; -import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.transfer.s3.S3TransferManager; -import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; -import software.amazon.awssdk.transfer.s3.model.FileUpload; -import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -64,14 +54,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.s3a.Invoker; -import org.apache.hadoop.fs.s3a.ProgressableProgressListener; import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3AStorageStatistics; import org.apache.hadoop.fs.s3a.S3AStore; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.UploadInfo; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.api.RequestFactory; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; import org.apache.hadoop.fs.s3a.impl.streams.FactoryBindingParameters; @@ -86,20 +75,17 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; import org.apache.hadoop.util.DurationInfo; -import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.RateLimiting; import org.apache.hadoop.util.functional.Tuples; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR; -import static org.apache.hadoop.fs.s3a.S3AUtils.extractException; -import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength; import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_HEAD_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS; -import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_BULK_DELETE_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUEST; @@ -114,11 +100,13 @@ import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.OBJECT_DELETE_WRITE_CAPACITY; +import static org.apache.hadoop.fs.s3a.impl.write.StoreWriter.STORE_WRITER; import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.factoryFromConfig; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.Preconditions.checkState; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** @@ -194,6 +182,17 @@ public class S3AStoreImpl */ private ObjectInputStreamFactory objectInputStreamFactory; + /** + * store writer Service. + */ + private final StoreWriterService storeWriter; + + /** + * Map of services, used for easy lookup and so allow service retrieval to + * be a stable part of the of S3AStore API. + */ + private final Map services = new HashMap<>(); + /** * Constructor to create S3A store. * Package private, as {@link S3AStoreBuilder} creates them. @@ -224,7 +223,43 @@ public class S3AStoreImpl this.invoker = requireNonNull(storeContext.getInvoker()); this.bucket = requireNonNull(storeContext.getBucket()); this.requestFactory = requireNonNull(storeContext.getRequestFactory()); - addService(clientManager); + registerChildService(CLIENT_MANAGER, clientManager); + this.storeWriter = registerChildService(STORE_WRITER, new StoreWriterService()); + } + + /** + * Add and register a service, using an ID which may be used to retrieve it later. + * @param id ID of the service + * @param service service instance. + * @return the service + * @param type of the service. + */ + public T registerChildService(final String id, final T service) { + services.put(id, service); + super.addService(service); + return service; + } + + /** + * Add a service using its name as the service ID. + * This is not a robust way to do this as any extension point with multiple services + * (e.g object stream factory) will not be resolvable by a well known name. + * @param service the {@link Service} to be added + */ + @Override + protected void addService(final Service service) { + registerChildService(service.getName(), service); + } + + @SuppressWarnings("unchecked") + @Override + public S lookupService(String id, Class serviceClass) { + final Service service = services.get(id); + checkState(service != null, "No service found for ID " + id); + checkState(serviceClass.isAssignableFrom(service.getClass()), + "Service ID %s is of type %s but the desired class is %s", + id, service.getClass(), serviceClass); + return (S) service; } /** @@ -237,10 +272,11 @@ protected void serviceInit(final Configuration conf) throws Exception { // create and register the stream factory, which will // then follow the service lifecycle objectInputStreamFactory = factoryFromConfig(conf); - addService(objectInputStreamFactory); + registerChildService(OBJECT_INPUT_STREAM_FACTORY, objectInputStreamFactory); // init all child services, including the stream factory super.serviceInit(conf); + storeWriter.bind(this, clientManager, this); // pass down extra information to the stream factory. finishStreamFactoryInit(); @@ -252,6 +288,15 @@ protected void serviceStart() throws Exception { initLocalDirAllocator(); } + /** + * Check the service is running. + * @throws IllegalStateException if not in STARTED. + */ + protected void checkRunning() throws IllegalStateException { + checkState(isInState(STATE.STARTED), + "Store is in state %s", getServiceState()); + } + /** * Return the store path capabilities. * If the object stream factory is non-null, hands off the @@ -262,6 +307,7 @@ protected void serviceStart() throws Exception { */ @Override public boolean hasPathCapability(final Path path, final String capability) { + checkRunning(); switch (toLowerCase(capability)) { case StreamCapabilities.IOSTATISTICS: return true; @@ -278,6 +324,7 @@ public boolean hasPathCapability(final Path path, final String capability) { */ @Override public boolean inputStreamHasCapability(final String capability) { + checkRunning(); if (objectInputStreamFactory != null) { return objectInputStreamFactory.hasCapability(capability); } @@ -298,74 +345,91 @@ private void initLocalDirAllocator() { /** Acquire write capacity for rate limiting {@inheritDoc}. */ @Override public Duration acquireWriteCapacity(final int capacity) { + checkRunning(); return writeRateLimiter.acquire(capacity); } /** Acquire read capacity for rate limiting {@inheritDoc}. */ @Override public Duration acquireReadCapacity(final int capacity) { + checkRunning(); return readRateLimiter.acquire(capacity); - } - /** - * Create a new store context. - * @return a new store context. - */ - private StoreContext createStoreContext() { + @Override + public StoreContext createStoreContext() { + checkRunning(); return storeContextFactory.createStoreContext(); } @Override public StoreContext getStoreContext() { + checkRunning(); return storeContext; } + /** + * Get the S3 client manager. + * @return client manager. + */ + public ClientManager getClientManager() { + return clientManager; + } + /** * Get the S3 client. * @return the S3 client. * @throws UncheckedIOException on any failure to create the client. */ - private S3Client getS3Client() throws UncheckedIOException { + S3Client getS3Client() throws UncheckedIOException { + checkRunning(); return clientManager.getOrCreateS3ClientUnchecked(); } @Override public S3TransferManager getOrCreateTransferManager() throws IOException { + checkRunning(); return clientManager.getOrCreateTransferManager(); } @Override public S3Client getOrCreateS3Client() throws IOException { + checkRunning(); return clientManager.getOrCreateS3Client(); } @Override public S3AsyncClient getOrCreateAsyncClient() throws IOException { + checkRunning(); return clientManager.getOrCreateAsyncClient(); } @Override public S3Client getOrCreateS3ClientUnchecked() throws UncheckedIOException { + checkRunning(); return clientManager.getOrCreateS3ClientUnchecked(); } @Override public S3Client getOrCreateAsyncS3ClientUnchecked() throws UncheckedIOException { + checkRunning(); return clientManager.getOrCreateAsyncS3ClientUnchecked(); } @Override public S3Client getOrCreateUnencryptedS3Client() throws IOException { + checkRunning(); return clientManager.getOrCreateUnencryptedS3Client(); } @Override public DurationTrackerFactory getDurationTrackerFactory() { + checkRunning(); return durationTrackerFactory; } - private S3AInstrumentation getInstrumentation() { + @Override + public S3AInstrumentation getInstrumentation() { return instrumentation; } @@ -411,30 +475,17 @@ protected void incrementStatistic(Statistic statistic, long count) { statisticsContext.incrementCounter(statistic, count); } - /** - * Decrement a gauge by a specific value. - * @param statistic The operation to decrement - * @param count the count to decrement - */ - protected void decrementGauge(Statistic statistic, long count) { + @Override + public void decrementGauge(Statistic statistic, long count) { statisticsContext.decrementGauge(statistic, count); } - /** - * Increment a gauge by a specific value. - * @param statistic The operation to increment - * @param count the count to increment - */ - protected void incrementGauge(Statistic statistic, long count) { + @Override + public void incrementGauge(Statistic statistic, long count) { statisticsContext.incrementGauge(statistic, count); } - /** - * Callback when an operation was retried. - * Increments the statistics of ignored errors or throttled requests, - * depending up on the exception class. - * @param ex exception. - */ + @Override public void operationRetried(Exception ex) { if (isThrottleException(ex)) { LOG.debug("Request throttled"); @@ -446,13 +497,7 @@ public void operationRetried(Exception ex) { } } - /** - * Callback from {@link Invoker} when an operation is retried. - * @param text text of the operation - * @param ex exception - * @param retries number of retries - * @param idempotent is the method idempotent - */ + @Override public void operationRetried(String text, Exception ex, int retries, boolean idempotent) { operationRetried(ex); } @@ -507,6 +552,7 @@ private void incrementBytesWritten(final long bytes) { */ @Override public void incrementPutStartStatistics(long bytes) { + checkRunning(); LOG.debug("PUT start {} bytes", bytes); incrementWriteOperations(); incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1); @@ -599,6 +645,7 @@ public Map.Entry deleteObjects( final DeleteObjectsRequest deleteRequest) throws SdkException { + checkRunning(); DeleteObjectsResponse response; BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext()); @@ -671,6 +718,7 @@ public HeadObjectResponse headObject(String key, Invoker changeInvoker, S3AFileSystemOperations fsHandler, String operation) throws IOException { + checkRunning(); HeadObjectResponse response = getStoreContext().getInvoker() .retryUntranslated("HEAD " + key, true, () -> { @@ -729,6 +777,7 @@ public HeadObjectResponse headObject(String key, public ResponseInputStream getRangedS3Object(String key, long start, long end) throws IOException { + checkRunning(); final GetObjectRequest request = getRequestFactory().newGetObjectRequestBuilder(key) .range(S3AUtils.formatRange(start, end)) .build(); @@ -756,8 +805,8 @@ public ResponseInputStream getRangedS3Object(String key, @Retries.RetryRaw public Map.Entry> deleteObject( final DeleteObjectRequest request) - throws SdkException { - + throws SdkException, UncheckedIOException { + checkRunning(); String key = request.key(); blockRootDelete(key); DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key); @@ -777,134 +826,38 @@ public Map.Entry> deleteObject( d.close(); return Tuples.pair(d.asDuration(), Optional.of(response)); } catch (AwsServiceException ase) { + d.close(); // 404 errors get swallowed; this can be raised by // third party stores (GCS). if (!isObjectNotFound(ase)) { throw ase; } - d.close(); return Tuples.pair(d.asDuration(), Optional.empty()); } catch (IOException e) { + d.close(); // convert to unchecked. throw new UncheckedIOException(e); } } - /** - * Upload part of a multi-partition file. - * Increments the write and put counters. - * Important: this call does not close any input stream in the body. - *

- * Retry Policy: none. - * @param trackerFactory duration tracker factory for operation - * @param request the upload part request. - * @param body the request body. - * @return the result of the operation. - * @throws AwsServiceException on problems - * @throws UncheckedIOException failure to instantiate the s3 client - */ - @Override - @Retries.OnceRaw - public UploadPartResponse uploadPart( - final UploadPartRequest request, - final RequestBody body, - @Nullable final DurationTrackerFactory trackerFactory) - throws AwsServiceException, UncheckedIOException { - long len = request.contentLength(); - incrementPutStartStatistics(len); - try { - UploadPartResponse uploadPartResponse = trackDurationOfSupplier( - nonNullDurationTrackerFactory(trackerFactory), - MULTIPART_UPLOAD_PART_PUT.getSymbol(), () -> - getS3Client().uploadPart(request, body)); - incrementPutCompletedStatistics(true, len); - return uploadPartResponse; - } catch (AwsServiceException e) { - incrementPutCompletedStatistics(false, len); - throw e; - } - } - - /** - * Start a transfer-manager managed async PUT of an object, - * incrementing the put requests and put bytes - * counters. - *

- * It does not update the other counters, - * as existing code does that as progress callbacks come in. - * Byte length is calculated from the file length, or, if there is no - * file, from the content length of the header. - *

- * Because the operation is async, any stream supplied in the request - * must reference data (files, buffers) which stay valid until the upload - * completes. - * Retry policy: N/A: the transfer manager is performing the upload. - * Auditing: must be inside an audit span. - * @param putObjectRequest the request - * @param file the file to be uploaded - * @param listener the progress listener for the request - * @return the upload initiated - * @throws IOException if transfer manager creation failed. - */ - @Override - @Retries.OnceRaw - public UploadInfo putObject( - PutObjectRequest putObjectRequest, - File file, - ProgressableProgressListener listener) throws IOException { - long len = getPutRequestLength(putObjectRequest); - LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key()); - incrementPutStartStatistics(len); - - FileUpload upload = getOrCreateTransferManager().uploadFile( - UploadFileRequest.builder() - .putObjectRequest(putObjectRequest) - .source(file) - .addTransferListener(listener) - .build()); - - return new UploadInfo(upload, len); - } - - /** - * Wait for an upload to complete. - * If the upload (or its result collection) failed, this is where - * the failure is raised as an AWS exception. - * Calls {@link S3AStore#incrementPutCompletedStatistics(boolean, long)} - * to update the statistics. - * @param key destination key - * @param uploadInfo upload to wait for - * @return the upload result - * @throws IOException IO failure - * @throws CancellationException if the wait() was cancelled - */ - @Override - @Retries.OnceTranslated - public CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo) - throws IOException { - FileUpload upload = uploadInfo.getFileUpload(); - try { - CompletedFileUpload result = upload.completionFuture().join(); - incrementPutCompletedStatistics(true, uploadInfo.getLength()); - return result; - } catch (CompletionException e) { - LOG.info("Interrupted: aborting upload"); - incrementPutCompletedStatistics(false, uploadInfo.getLength()); - throw extractException("upload", key, e); + @Retries.RetryRaw + @Override + public void deleteObjectAtPath( + String key, + boolean isFile) + throws SdkException, UncheckedIOException { + checkRunning(); + if (isFile) { + instrumentation.fileDeleted(1); + } else { + instrumentation.directoryDeleted(); } + acquireWriteCapacity(OBJECT_DELETE_WRITE_CAPACITY); + deleteObject(getRequestFactory() + .newDeleteObjectRequestBuilder(key) + .build()); } - /** - * Complete a multipart upload. - * @param request request - * @return the response - */ - @Override - @Retries.OnceRaw - public CompleteMultipartUploadResponse completeMultipartUpload( - CompleteMultipartUploadRequest request) { - return getS3Client().completeMultipartUpload(request); - } /** * Get the directory allocator. @@ -931,6 +884,7 @@ public LocalDirAllocator getDirectoryAllocator() { public File createTemporaryFileForWriting(String pathStr, long size, Configuration conf) throws IOException { + checkRunning(); requireNonNull(directoryAllocator, "directory allocator not initialized"); Path path = directoryAllocator.getLocalPathForWrite(pathStr, size, conf); @@ -950,9 +904,9 @@ public File createTemporaryFileForWriting(String pathStr, */ private void finishStreamFactoryInit() throws IOException { // must be on be invoked during service initialization - Preconditions.checkState(isInState(STATE.INITED), + checkState(isInState(STATE.INITED), "Store is in wrong state: %s", getServiceState()); - Preconditions.checkState(clientManager.isInState(STATE.INITED), + checkState(clientManager.isInState(STATE.INITED), "Client Manager is in wrong state: %s", clientManager.getServiceState()); // finish initialization and pass down callbacks to self @@ -1009,4 +963,19 @@ public void incrementFactoryStatistic(Statistic statistic) { /* =============== END ObjectInputStreamFactory =============== */ + + + /* + =============== BEGIN WriteOperationHelperCallbacks =============== + These either invoke internal store operations or those of multipart IO. + */ + + @Override + public WriteOperationHelper.WriteOperationHelperCallbacks createWriteOperationHelperCallbacks() { + return new WriteOperationHelperCallbacksImpl(this); + } + + /* + =============== END WriteOperationHelperCallbacks =============== + */ } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreWriterService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreWriterService.java new file mode 100644 index 0000000000000..a783c8c53ee91 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreWriterService.java @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.Date; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest; +import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse; +import software.amazon.awssdk.services.s3.model.MultipartUpload; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; +import software.amazon.awssdk.transfer.s3.model.FileUpload; +import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; + +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.ProgressableProgressListener; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.UploadInfo; +import org.apache.hadoop.fs.s3a.api.IORateLimiting; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.write.StoreWriter; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.Preconditions; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.S3AUtils.extractException; +import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength; +import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST; +import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.MULTIPART_ABORT_WRITE_CAPACITY; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; +import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.Preconditions.checkState; + +/** + * Store Writing Operations. + * The service is not ready to use until + * {@link #bind(S3AStore, ClientManager, IORateLimiting)} + * is invoked and the service started. + */ +public class StoreWriterService extends AbstractService + implements StoreWriter { + + private static final Logger LOG = + LoggerFactory.getLogger(StoreWriterService.class); + + /** + * Store for some statistics invocations. + */ + private S3AStore store; + + /** + * Rate limiter (likely to be the store). + */ + private IORateLimiting limiting; + + /** + * SDK client. + */ + private ClientManager clientManager; + + /** + * Create the Service with the service name {@link #STORE_WRITER}. + */ + public StoreWriterService() { + this(STORE_WRITER); + } + + /** + * Constructor. + * @param name service name + */ + public StoreWriterService(final String name) { + super(name); + } + + /** + * Bind to dependencies. + * This MUST be called before service start + * @param aStore store + * @param manager sdk client manager + * @param rateLimiting rate limiting. + */ + public void bind( + final S3AStore aStore, + final ClientManager manager, + final IORateLimiting rateLimiting) { + this.store = requireNonNull(aStore); + this.clientManager = manager; + this.limiting = aStore; + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + requireNonNull(store); + } + + /** + * Check the service is running. + * @throws IllegalStateException if not in STARTED. + */ + public void checkRunning() throws IllegalStateException { + Preconditions.checkState(isInState(STATE.STARTED), + "Store is in state %s", getServiceState()); + } + + + /** + * Start a transfer-manager managed async PUT of an object, + * incrementing the put requests and put bytes + * counters. + *

+ * It does not update the other counters, + * as existing code does that as progress callbacks come in. + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + *

+ * Because the operation is async, any stream supplied in the request + * must reference data (files, buffers) which stay valid until the upload + * completes. + * Retry policy: N/A: the transfer manager is performing the upload. + * Auditing: must be inside an audit span. + * @param putObjectRequest the request + * @param file the file to be uploaded + * @param listener the progress listener for the request + * @return the upload initiated + * @throws IOException if transfer manager creation failed. + */ + @Override + @Retries.OnceRaw + public UploadInfo putObject( + PutObjectRequest putObjectRequest, + File file, + ProgressableProgressListener listener) throws IOException { + checkRunning(); + long len = getPutRequestLength(putObjectRequest); + LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key()); + store.incrementPutStartStatistics(len); + + FileUpload upload = store.getOrCreateTransferManager().uploadFile( + UploadFileRequest.builder() + .putObjectRequest(putObjectRequest) + .source(file) + .addTransferListener(listener) + .build()); + + return new UploadInfo(upload, len); + } + + @Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed") + @Override + public PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest, + PutObjectOptions putOptions, + S3ADataBlocks.BlockUploadData uploadData, + DurationTrackerFactory trackerFactory) + throws SdkException { + checkRunning(); + long len = putObjectRequest.contentLength(); + + checkState(len >= 0, "Cannot PUT object of unknown length"); + LOG.debug("PUT {} bytes to {}", len, putObjectRequest.key()); + store.incrementPutStartStatistics(len); + final UploadContentProviders.BaseContentProvider provider = + uploadData.getContentProvider(); + try { + PutObjectResponse response = + trackDurationOfSupplier(store.nonNullDurationTrackerFactory(trackerFactory), + OBJECT_PUT_REQUESTS.getSymbol(), + () -> getS3ClientUnchecked().putObject(putObjectRequest, + RequestBody.fromContentProvider( + provider, + provider.getSize(), + CONTENT_TYPE_OCTET_STREAM))); + store.incrementPutCompletedStatistics(true, len); + return response; + } catch (SdkException e) { + store.incrementPutCompletedStatistics(false, len); + throw e; + } + } + + /** + * Wait for an upload to complete. + * If the upload (or its result collection) failed, this is where + * the failure is raised as an AWS exception. + * Calls {@link S3AStore#incrementPutCompletedStatistics(boolean, long)} + * to update the statistics. + * @param key destination key + * @param uploadInfo upload to wait for + * @return the upload result + * @throws IOException IO failure + * @throws CancellationException if the wait() was cancelled + */ + @Retries.OnceTranslated + public CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo) + throws IOException { + checkRunning(); + FileUpload upload = uploadInfo.getFileUpload(); + try { + CompletedFileUpload result = upload.completionFuture().join(); + store.incrementPutCompletedStatistics(true, uploadInfo.getLength()); + return result; + } catch (CompletionException e) { + LOG.info("Interrupted: aborting upload"); + store.incrementPutCompletedStatistics(false, uploadInfo.getLength()); + throw extractException("upload", key, e); + } + } + + @Retries.OnceRaw + @Override + public CreateMultipartUploadResponse initiateMultipartUpload( + CreateMultipartUploadRequest request) { + checkRunning(); + LOG.debug("Initiate multipart upload to {}", request.key()); + return trackDurationOfSupplier(store.getDurationTrackerFactory(), + OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), + () -> getS3ClientUnchecked().createMultipartUpload(request)); + } + + @Retries.OnceRaw + @Override + public UploadPartResponse uploadPart( + final UploadPartRequest request, + final RequestBody body, + @Nullable final DurationTrackerFactory trackerFactory) + throws AwsServiceException, UncheckedIOException { + checkRunning(); + long len = request.contentLength(); + store.incrementPutStartStatistics(len); + try { + UploadPartResponse uploadPartResponse = trackDurationOfSupplier( + store.nonNullDurationTrackerFactory(trackerFactory), + MULTIPART_UPLOAD_PART_PUT.getSymbol(), () -> + getS3ClientUnchecked().uploadPart(request, body)); + store.incrementPutCompletedStatistics(true, len); + return uploadPartResponse; + } catch (AwsServiceException e) { + store.incrementPutCompletedStatistics(false, len); + throw e; + } + } + + @Retries.OnceRaw + @Override + public CompleteMultipartUploadResponse completeMultipartUpload( + CompleteMultipartUploadRequest request) { + return getS3ClientUnchecked().completeMultipartUpload(request); + } + + @Retries.OnceTranslated + @Override + public AbortMultipartUploadResponse abortMultipartUpload(MultipartUpload upload) { + checkRunning(); + + String destKey = upload.key(); + String uploadId = upload.uploadId(); + if (LOG.isDebugEnabled()) { + DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}", + uploadId, destKey, upload.initiator(), + df.format(Date.from(upload.initiated()))); + } + return abortMultipartUpload(destKey, uploadId); + } + + @Retries.OnceRaw + @Override + public AbortMultipartUploadResponse abortMultipartUpload(String destKey, String uploadId) + throws AwsServiceException { + checkRunning(); + LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey); + return trackDurationOfSupplier(store.getInstrumentation(), + OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> { + limiting.acquireWriteCapacity(MULTIPART_ABORT_WRITE_CAPACITY); + return getS3ClientUnchecked().abortMultipartUpload( + store.getRequestFactory().newAbortMultipartUploadRequestBuilder( + destKey, + uploadId).build()); + }); + } + + @Retries.RetryTranslated + @Override + public void abortOutstandingMultipartUploads(long seconds, @Nullable String prefix, int maxKeys, + StoreContext context) + throws IOException { + checkArgument(seconds >= 0); + checkRunning(); + Instant purgeBefore = Instant.now().minusSeconds(seconds); + LOG.debug("Purging outstanding multipart uploads older than {}", + purgeBefore); + context.getInvoker().retry("Purging multipart uploads", + context.getBucket(), true, + () -> { + RemoteIterator uploadIterator = + listMultipartUploads( + context, + prefix, + maxKeys); + + while (uploadIterator.hasNext()) { + MultipartUpload upload = uploadIterator.next(); + if (upload.initiated().compareTo(purgeBefore) < 0) { + abortMultipartUpload(upload); + } + } + }); + } + + @Retries.RetryTranslated + @Override + public List listMultipartUploads(@Nullable String prefix) + throws IOException { + checkRunning(); + + // add a trailing / if needed. + String p = qualifyPrefix(prefix); + return store.createStoreContext().getInvoker().retry("listMultipartUploads", p, true, () -> { + final ListMultipartUploadsRequest request = store.getRequestFactory() + .newListMultipartUploadsRequestBuilder(p).build(); + return trackDuration(store.getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () -> { + limiting.acquireReadCapacity(InternalConstants.MULTIPART_LIST_READ_CAPACITY); + return getS3ClientUnchecked().listMultipartUploads(request).uploads(); + }); + }); + } + + @Override + @Retries.RetryTranslated + public RemoteIterator listMultipartUploads( + final StoreContext storeContext, + @Nullable String prefix, + int maxKeys) + throws IOException { + checkRunning(); + + // span is picked up retained in the listing. + return new UploadIterator(storeContext, + maxKeys, + qualifyPrefix(prefix)); + } + + /** + * Add a trailing / if needed. + * @param prefix prefix; may be null or empty + * @return the prefix to use in the listing operation. + */ + private static String qualifyPrefix(@Nullable final String prefix) { + if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) { + return prefix + "/"; + } else { + return prefix; + } + } + + /** + * Simple RemoteIterator wrapper for AWS `listMultipartUpload` API. + * Iterates over batches of multipart upload metadata listings. + * All requests are in the StoreContext's active span + * at the time the iterator was constructed. + */ + public final class ListingIterator implements + RemoteIterator { + + private final String prefix; + + private final RequestFactory requestFactory; + + private final int maxKeys; + + private final Invoker invoker; + + private final AuditSpan auditSpan; + + private final StoreContext storeContext; + + /** + * Most recent listing results. + */ + private ListMultipartUploadsResponse listing; + + /** + * Indicator that this is the first listing. + */ + private boolean firstListing = true; + + /** + * Count of list calls made. + */ + private int listCount = 0; + + ListingIterator(final StoreContext storeContext, + @Nullable String prefix, + int maxKeys) throws IOException { + this.storeContext = storeContext; + this.requestFactory = storeContext.getRequestFactory(); + this.maxKeys = maxKeys; + this.prefix = prefix; + this.invoker = storeContext.getInvoker(); + this.auditSpan = storeContext.getActiveAuditSpan(); + + // request the first listing. + requestNextBatch(); + } + + /** + * Iterator has data if it is either is the initial iteration, or + * the last listing obtained was incomplete. + * @throws IOException not thrown by this implementation. + */ + @Override + public boolean hasNext() throws IOException { + if (listing == null) { + // shouldn't happen, but don't trust AWS SDK + return false; + } else { + return firstListing || listing.isTruncated(); + } + } + + /** + * Get next listing. First call, this returns initial set (possibly + * empty) obtained from S3. Subsequent calls my block on I/O or fail. + * @return next upload listing. + * @throws IOException if S3 operation fails. + * @throws NoSuchElementException if there are no more uploads. + */ + @Override + @Retries.RetryTranslated + public ListMultipartUploadsResponse next() throws IOException { + if (firstListing) { + firstListing = false; + } else { + if (listing == null || !listing.isTruncated()) { + // nothing more to request: fail. + throw new NoSuchElementException("No more uploads under " + prefix); + } + // need to request a new set of objects. + requestNextBatch(); + } + return listing; + } + + @Override + public String toString() { + return "Upload iterator: prefix " + prefix + + "; list count " + listCount + + "; upload count " + + (listing != null ? listing.uploads().size() : "n/a") + + "; isTruncated=" + + (listing != null ? listing.isTruncated() : "n/a"); + } + + @Retries.RetryTranslated + private void requestNextBatch() throws IOException { + checkRunning(); + + try (AuditSpan span = auditSpan.activate()) { + ListMultipartUploadsRequest.Builder requestBuilder = + requestFactory.newListMultipartUploadsRequestBuilder(prefix); + if (!firstListing) { + requestBuilder.keyMarker(listing.nextKeyMarker()); + requestBuilder.uploadIdMarker(listing.nextUploadIdMarker()); + } + requestBuilder.maxUploads(maxKeys); + + ListMultipartUploadsRequest request = requestBuilder.build(); + + LOG.debug("[{}], Requesting next {} uploads prefix {}, " + + "next key {}, next upload id {}", listCount, maxKeys, prefix, + request.keyMarker(), request.uploadIdMarker()); + listCount++; + + listing = invoker.retry("listMultipartUploads", prefix, true, + trackDurationOfOperation(storeContext.getInstrumentation(), + OBJECT_MULTIPART_UPLOAD_LIST.getSymbol(), + () -> { + limiting.acquireReadCapacity(InternalConstants.MULTIPART_LIST_READ_CAPACITY); + return getS3ClientUnchecked().listMultipartUploads(requestBuilder.build()); + })); + LOG.debug("Listing found {} upload(s)", + listing.uploads().size()); + LOG.debug("New listing state: {}", this); + } + } + } + + private S3Client getS3ClientUnchecked() { + return clientManager.getOrCreateS3ClientUnchecked(); + } + + /** + * Iterator over multipart uploads. + */ + public final class UploadIterator + implements RemoteIterator { + + /** + * Iterator for issuing new upload list requests from + * where the previous one ended. + */ + private final ListingIterator lister; + + /** Iterator over the current listing. */ + private ListIterator batchIterator; + + /** + * Construct an iterator to list uploads under a path. + * @param storeContext store context + * @param maxKeys max # of keys to list per batch + * @param prefix prefix + * @throws IOException listing failure. + */ + @Retries.RetryTranslated + public UploadIterator( + final StoreContext storeContext, + int maxKeys, + @Nullable String prefix) + throws IOException { + + lister = new ListingIterator(storeContext, prefix, maxKeys); + requestNextBatch(); + } + + @Override + @Retries.RetryTranslated + public boolean hasNext() throws IOException { + return (batchIterator.hasNext() || requestNextBatch()); + } + + @Override + @Retries.RetryTranslated + public MultipartUpload next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return batchIterator.next(); + } + + @Retries.RetryTranslated + private boolean requestNextBatch() throws IOException { + if (lister.hasNext()) { + // Current listing: the last upload listing we fetched. + ListMultipartUploadsResponse listing = lister.next(); + batchIterator = listing.uploads().listIterator(); + return batchIterator.hasNext(); + } + return false; + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/WriteOperationHelperCallbacksImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/WriteOperationHelperCallbacksImpl.java new file mode 100644 index 0000000000000..b9a9304ebf160 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/WriteOperationHelperCallbacksImpl.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; + +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.MultipartUpload; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.write.StoreWriter; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; + +/** + * Callbacks for WriteOperationHelper. + */ +public final class WriteOperationHelperCallbacksImpl + implements WriteOperationHelper.WriteOperationHelperCallbacks { + + private final S3AStoreImpl store; + + private StoreWriter storeWriter; + + WriteOperationHelperCallbacksImpl(final S3AStoreImpl store) { + this.store = store; + this.storeWriter = store.getStoreWriter(); + } + + @Retries.OnceRaw + @Override + public PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest, + PutObjectOptions putOptions, + S3ADataBlocks.BlockUploadData uploadData, + DurationTrackerFactory durationTrackerFactory) + throws SdkException { + return storeWriter.putObjectDirect(putObjectRequest, putOptions, uploadData, + durationTrackerFactory); + } + + @Override + @Retries.OnceRaw + public CompleteMultipartUploadResponse completeMultipartUpload( + CompleteMultipartUploadRequest request) { + return storeWriter.completeMultipartUpload(request); + } + + @Override + @Retries.OnceRaw + public UploadPartResponse uploadPart( + final UploadPartRequest request, + final RequestBody body, + final DurationTrackerFactory durationTrackerFactory) + throws AwsServiceException, UncheckedIOException { + return storeWriter.uploadPart(request, body, durationTrackerFactory); + } + + @Override + public void operationRetried( + String text, + Exception ex, + int retries, + boolean idempotent) { + store.operationRetried(ex); + } + + + @Retries.OnceRaw + @Override + public CreateMultipartUploadResponse initiateMultipartUpload( + CreateMultipartUploadRequest request) throws IOException { + return storeWriter.initiateMultipartUpload(request); + } + + @Override + public void abortMultipartUpload(final MultipartUpload upload) throws IOException { + storeWriter.abortMultipartUpload(upload); + } + + @Retries.OnceTranslated + @Override + public void abortMultipartUpload(String destKey, String uploadId) throws IOException { + storeWriter.abortMultipartUpload(destKey, uploadId); + } + + @Retries.RetryTranslated + @Override + public List listMultipartUploads(final String prefix) + throws IOException { + return storeWriter.listMultipartUploads(prefix); + } + + @Retries.RetryRaw + @Override + public void deleteObjectAtPath( + String key, + boolean isFile) + throws SdkException, UncheckedIOException { + store.deleteObjectAtPath(key, isFile); + } + + @Override + public void incrementWriteOperations() { + store.incrementWriteOperations(); + } + + @Override + public String getBucket() { + return store.getStoreContext().getBucket(); + } + + @Override + public RequestFactory getRequestFactory() { + return store.getRequestFactory(); + } + + @Override + public Configuration getConf() { + return store.getConfig(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java index 9b2b54c48e50e..bf3712ce2108b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java @@ -41,6 +41,11 @@ public interface ObjectInputStreamFactory extends Service, StreamCapabilities { + /** + * Registered service name: {@value}. + */ + String OBJECT_INPUT_STREAM_FACTORY = "ObjectInputStreamFactory"; + /** * Set extra initialization parameters. * This MUST ONLY be invoked between {@code init()} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/StoreWriter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/StoreWriter.java new file mode 100644 index 0000000000000..e101e07e0f39f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/StoreWriter.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.write; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.concurrent.CancellationException; +import javax.annotation.Nullable; + +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.MultipartUpload; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.ProgressableProgressListener; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.UploadInfo; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.impl.StoreContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.service.Service; + +/** + * Interface for store writing and multipart IO operations: + * put, create, upload part, complete, list, abort. + */ +public interface StoreWriter extends Service { + + /** + * Registered service name: {@value}. + */ + String STORE_WRITER = "StoreWriter"; + + /** + * Start a transfer-manager managed async PUT of an object, + * incrementing the put requests and put bytes + * counters. + *

+ * It does not update the other counters, + * as existing code does that as progress callbacks come in. + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + *

+ * Because the operation is async, any stream supplied in the request + * must reference data (files, buffers) which stay valid until the upload + * completes. + * Retry policy: N/A: the transfer manager is performing the upload. + * Auditing: must be inside an audit span. + * @param putObjectRequest the request + * @param file the file to be uploaded + * @param listener the progress listener for the request + * @return the upload initiated + * @throws IOException if transfer manager creation failed. + */ + @Retries.OnceRaw + UploadInfo putObject( + PutObjectRequest putObjectRequest, + File file, + ProgressableProgressListener listener) throws IOException; + + /** + * PUT an object directly (i.e. not via the transfer manager). + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + *

+ * Retry Policy: none. + * Auditing: must be inside an audit span. + * Important: this call will close any input stream in the request. + * @param putObjectRequest the request + * @param putOptions put object options + * @param uploadData data to be uploaded + * @param durationTrackerFactory factory for duration tracking + * @return the upload initiated + * @throws SdkException on problems + */ + @VisibleForTesting + @Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed") + PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest, + PutObjectOptions putOptions, + S3ADataBlocks.BlockUploadData uploadData, + DurationTrackerFactory durationTrackerFactory) + throws SdkException; + + /** + * Wait for an upload to complete. + * If the upload (or its result collection) failed, this is where + * the failure is raised as an AWS exception. + * Calls {@link S3AStore#incrementPutCompletedStatistics(boolean, long)} + * to update the statistics. + * @param key destination key + * @param uploadInfo upload to wait for + * @return the upload result + * @throws IOException IO failure + * @throws CancellationException if the wait() was cancelled + */ + @Retries.OnceTranslated + CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo) + throws IOException; + + /** + * Initiate an MPU. + * @param request request. + * @return the result of the operation. + * @throws AwsServiceException on problems + * @throws UncheckedIOException failure to instantiate the s3 client + */ + @Retries.OnceRaw + CreateMultipartUploadResponse initiateMultipartUpload( + CreateMultipartUploadRequest request) + throws AwsServiceException, UncheckedIOException; + + /** + * Upload part of a multi-partition file. + * Increments the write and put counters. + * Important: this call does not close any input stream in the body. + *

+ * Retry Policy: none. + * @param trackerFactory duration tracker factory for operation + * @param request the upload part request. + * @param body the request body. + * @return the result of the operation. + * @throws AwsServiceException on problems + * @throws UncheckedIOException failure to instantiate the s3 client + */ + @Retries.OnceRaw + UploadPartResponse uploadPart( + UploadPartRequest request, + RequestBody body, + @Nullable DurationTrackerFactory trackerFactory) + throws AwsServiceException, UncheckedIOException; + + /** + * Complete a multipart upload. + * @param request request + * @return the response + * @throws AwsServiceException on problems + * @throws UncheckedIOException failure to instantiate the s3 client + */ + @Retries.OnceRaw + CompleteMultipartUploadResponse completeMultipartUpload( + CompleteMultipartUploadRequest request) + throws AwsServiceException, UncheckedIOException; + + /** + * Abort a multipart upload. + * @param upload upload + * @return the response from the request + * @throws AwsServiceException on problems + * @throws UncheckedIOException failure to instantiate the s3 client + */ + @Retries.OnceRaw + AbortMultipartUploadResponse abortMultipartUpload(MultipartUpload upload) + throws AwsServiceException, UncheckedIOException; + + /** + * Abort a multipart upload. + * @param destKey key of destination path. + * @param uploadId upload operation ID + * @return the response + * @throws UncheckedIOException failure to instantiate the s3 client + */ + @Retries.OnceRaw + AbortMultipartUploadResponse abortMultipartUpload(String destKey, String uploadId) + throws AwsServiceException, UncheckedIOException; + + /** + * List and abort all multipart uploads older than a specified age. + * @param seconds age of multiparts to abort. + * @param prefix prefix to scan for, "" for none + * @param maxKeys maximum number of keys to list and abort + * @param context store context to use + * @throws IOException IO failure, including any uprated SdkException + */ + @Retries.RetryTranslated + void abortOutstandingMultipartUploads(long seconds, @Nullable String prefix, int maxKeys, + StoreContext context) + throws IOException; + + /** + * Listing all multipart uploads; limited to the first few hundred. + * Retry policy: retry, translated. + * @param prefix prefix to scan for, "" for none + * @return a listing of multipart uploads. + * @throws IOException IO failure, including any uprated SdkException + */ + @Retries.RetryTranslated + List listMultipartUploads(@Nullable String prefix) + throws IOException; + + + /** + * List multipart uploads under a path. + * @param storeContext store context. + * @param prefix prefix, may be null or empty. + * @param maxKeys maximum number of keys. + * @return an iterator. + * @throws IOException failure to initiate the listing operation. + */ + @Retries.RetryTranslated + RemoteIterator listMultipartUploads( + StoreContext storeContext, + @Nullable String prefix, + int maxKeys) + throws IOException; +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java index e0738424fbace..808cd4b20c97f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java @@ -19,4 +19,7 @@ /** * Classes related to writing objects. */ -package org.apache.hadoop.fs.s3a.impl.write; \ No newline at end of file +@InterfaceAudience.Private +package org.apache.hadoop.fs.s3a.impl.write; + +import org.apache.hadoop.classification.InterfaceAudience; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index e1b995de68df4..5160ae976227f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -718,7 +718,7 @@ private void processUploads(PrintStream out) throws IOException { fs.createSpan(MULTIPART_UPLOAD_ABORTED, prefix, null); final WriteOperationHelper writeOperationHelper - = fs.getWriteOperationHelper(); + = fs.createWriteOperationHelper(fs.getActiveAuditSpan()); int count = 0; while (uploads.hasNext()) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/StoreStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/StoreStatistics.java new file mode 100644 index 0000000000000..cb034fa8f83fe --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/StoreStatistics.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.statistics; + +/** + * Interface for statistics of the S3AStore service. + */ +public interface StoreStatistics extends S3AStatisticInterface { +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java index 96470b70d1489..0108f69930b9c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java @@ -336,7 +336,8 @@ public void testSizeOfEncryptedObjectFromHeaderWithV1Compatibility() throws Exce false); putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE))); putObjectRequestBuilder.metadata(metadata); - fs.putObjectDirect(putObjectRequestBuilder.build(), + fs.getStore().getStoreWriter().putObjectDirect( + putObjectRequestBuilder.build(), PutObjectOptions.defaultOptions(), new S3ADataBlocks.BlockUploadData(new byte[SMALL_FILE_SIZE], null), null); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 7dbba293a6b70..a2949b988d5da 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -114,7 +114,7 @@ public void testPutObjectDirect() throws Throwable { -1, false); putObjectRequestBuilder.contentLength(-1L); LambdaTestUtils.intercept(IllegalStateException.class, - () -> fs.putObjectDirect( + () -> fs.getStore().getStoreWriter().putObjectDirect( putObjectRequestBuilder.build(), PutObjectOptions.defaultOptions(), new S3ADataBlocks.BlockUploadData("PUT".getBytes(), null), diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index f938494eef0b5..a4147becfe103 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -181,20 +181,18 @@ public Path qualify(final Path path) { public void initialize(URI name, Configuration originalConf) throws IOException { conf = originalConf; - writeHelper = new WriteOperationHelper(this, - conf, - new EmptyS3AStatisticsContext(), + writeHelper = new WriteOperationHelper( noopAuditor(conf), AuditTestSupport.NOOP_SPAN, - new MinimalWriteOperationHelperCallbacks(this::getS3Client)); + new MinimalWriteOperationHelperCallbacks("bucket", this::getS3Client, REQUEST_FACTORY) + ); } @Override public void close() { } - @Override - public WriteOperationHelper getWriteOperationHelper() { + public WriteOperationHelper createWriteOperationHelperWithinActiveSpan() { return writeHelper; } @@ -335,18 +333,6 @@ public void incrementReadOperations() { public void incrementWriteOperations() { } - @Override - public void incrementPutStartStatistics(long bytes) { - } - - @Override - public void incrementPutCompletedStatistics(boolean success, long bytes) { - } - - @Override - public void incrementPutProgressStatistics(String key, long bytes) { - } - @Override @SuppressWarnings("deprecation") public long getDefaultBlockSize() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java index 64eb846661608..f0194c227185e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java @@ -67,17 +67,16 @@ private MultipartTestUtils() { } * Clean up all provided uploads. * @param keySet set of uploads to abort */ - static void cleanupParts(S3AFileSystem fs, Set keySet) { + public static void cleanupParts(S3AFileSystem fs, Set keySet) { boolean anyFailure = false; for (IdKey ik : keySet) { try (AuditSpan span = fs.createSpan("multipart", ik.key, null)) { LOG.debug("aborting upload id {}", ik.getUploadId()); - fs.abortMultipartUpload(ik.getKey(), ik.getUploadId()); + fs.getStore().getStoreWriter().abortMultipartUpload(ik.getKey(), ik.getUploadId()); } catch (Exception e) { - LOG.error(String.format("Failure aborting upload %s, continuing.", - ik.getKey()), e); + LOG.error("Failure aborting upload {}, continuing.", ik.getKey(), e); anyFailure = true; } } @@ -87,7 +86,7 @@ static void cleanupParts(S3AFileSystem fs, Set keySet) { public static IdKey createPartUpload(S3AFileSystem fs, String key, int len, int partNo) throws IOException { try (AuditSpan span = fs.createSpan("multipart", key, null)) { - WriteOperationHelper writeHelper = fs.getWriteOperationHelper(); + WriteOperationHelper writeHelper = fs.createWriteOperationHelper(span); byte[] data = dataset(len, 'a', 'z'); InputStream in = new ByteArrayInputStream(data); String uploadId = writeHelper.initiateMultiPartUpload(key, PutObjectOptions.defaultOptions()); @@ -144,8 +143,7 @@ public static List listMultipartUploads(S3AFileSystem fs, String prefix) throws IOException { try (AuditSpan span = fs.createSpan("multipart", prefix, null)) { - return fs - .listMultipartUploads(prefix).stream() + return fs.getStore().getStoreWriter().listMultipartUploads(prefix).stream() .map(upload -> String.format("Upload to %s with ID %s; initiated %s", upload.key(), upload.uploadId(), @@ -191,7 +189,7 @@ public static class IdKey { private String key; private String uploadId; - IdKey(String key, String uploadId) { + public IdKey(String key, String uploadId) { this.key = key; this.uploadId = uploadId; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index 703da8574c70a..beb05633cdb66 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.s3a.audit.AuditTestSupport; import org.apache.hadoop.fs.s3a.commit.PutTracker; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; -import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3a.test.MinimalWriteOperationHelperCallbacks; import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.util.Progressable; @@ -98,12 +97,14 @@ public void testWriteOperationHelperPartLimits() throws Throwable { when(s3a.getRequestFactory()) .thenReturn(MockS3AFileSystem.REQUEST_FACTORY); final Configuration conf = new Configuration(); - WriteOperationHelper woh = new WriteOperationHelper(s3a, - conf, - new EmptyS3AStatisticsContext(), + WriteOperationHelper woh = new WriteOperationHelper( noopAuditor(conf), AuditTestSupport.NOOP_SPAN, - new MinimalWriteOperationHelperCallbacks(null)); // raises NPE if S3 client used + new MinimalWriteOperationHelperCallbacks("bucket", + null, + MockS3AFileSystem.REQUEST_FACTORY) + ); + // raises NPE if S3 client used // first one works String key = "destKey"; woh.newUploadPartRequestBuilder(key, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java index 56eede93f416e..4f1e8bce55cdb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java @@ -93,7 +93,7 @@ public void testInvokeOutOfSpanRejected() throws Throwable { // this will be out of span final WriteOperationHelper writer - = fs.getWriteOperationHelper(); + = fs.createWriteOperationHelper(fs.getActiveAuditSpan()); // which can be verified Assertions.assertThat(writer.getAuditSpan()) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java index 45d7469f7970a..bb25bf197bb46 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AStore; import org.apache.hadoop.fs.s3a.commit.CommitUtils; import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; @@ -210,7 +211,10 @@ public void test_100_renameHugeFile() { public void test_800_DeleteHugeFiles() throws IOException { if (getFileSystem() != null) { try { - getFileSystem().abortOutstandingMultipartUploads(0); + final S3AStore store = getFileSystem().getStore(); + store.getStoreWriter() + .abortOutstandingMultipartUploads(0, "", 100, + store.createStoreContext()); } catch (IOException e) { LOG.info("Exception while purging old uploads", e); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java index 5661a4969f3f5..49b6be843ffcb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java @@ -516,10 +516,16 @@ public void testTaskInitializeFailure(int pNumThreads, "Should fail during init", () -> committer.commitTask(tac)); - assertEquals(1, results.getUploads().size(), - "Should have initialized one file upload"); - assertEquals(new HashSet<>(results.getUploads()), - getAbortedIds(results.getAborts()), "Should abort the upload"); + assertThat(results.getUploads()) + .describedAs("Should have initialized one file upload") + .hasSize(1); + assertThat(results.getAborts()) + .describedAs("abort count") + .hasSize(1); + assertThat(results.getUploads()) + .describedAs("upload list must match aborted list") + .containsExactlyElementsOf(getAbortedIds(results.getAborts())); + assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AStoreWriterService.java similarity index 92% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AStoreWriterService.java index 1fcc41a3bde28..c16cc266a5328 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3AStoreWriterService.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.s3a; +package org.apache.hadoop.fs.s3a.impl; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -25,6 +25,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.MultipartTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.store.audit.AuditSpan; import java.io.IOException; @@ -34,9 +39,9 @@ import static org.apache.hadoop.util.functional.RemoteIterators.foreach; /** - * Tests for {@link MultipartUtils}. + * Tests for StoreWriter. */ -public class ITestS3AMultipartUtils extends AbstractS3ATestBase { +public class ITestS3AStoreWriterService extends AbstractS3ATestBase { private static final int UPLOAD_LEN = 1024; private static final String PART_FILENAME_BASE = "pending-part"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestStoreClose.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestStoreClose.java new file mode 100644 index 0000000000000..4b628c900bb89 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestStoreClose.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.util.concurrent.RejectedExecutionException; + +import org.assertj.core.api.Assumptions; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.S3AFileSystem; + +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test for store closure. + */ +public class ITestStoreClose extends AbstractS3ATestBase { + + + /** + * Open a file in forced multipart, then close the fs. + */ + @Test + public void testStreamWriteClosed() throws Throwable { + + final S3AFileSystem fs = getFileSystem(); + final Path path = methodPath(); + Assumptions.assumeThat(fs.hasPathCapability(path, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)) + .describedAs("Path capability %s is required", STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED) + .isTrue(); + final FSDataOutputStreamBuilder builder = fs.createFile(path); + final FSDataOutputStream out = builder.build(); + out.write('a'); + out.flush(); + + fs.close(); + intercept(IllegalStateException.class, out::close); + } + + /** + * Open a file in forced multipart, then close the fs. + */ + @Test + public void testMultipartUploadClosed() throws Throwable { + + final S3AFileSystem fs = getFileSystem(); + final Path path = methodPath(); + Assumptions.assumeThat(fs.hasPathCapability(path, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED)) + .describedAs("Path capability %s is required", STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED) + .isTrue(); + final FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CREATE_MULTIPART, true); + final FSDataOutputStream out = builder.build(); + out.write('a'); + out.flush(); + + fs.close(); + intercept(RejectedExecutionException.class, out::close); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java index afc4a62a8e87e..a98d8ba8f76b4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java @@ -27,18 +27,15 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.services.s3.model.MultipartUpload; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; -import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocatedFileStatusFetcher; import org.apache.hadoop.mapreduce.JobID; @@ -66,7 +63,6 @@ import static org.apache.hadoop.util.ToolRunner.run; import static org.apache.hadoop.util.functional.RemoteIterators.foreach; import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromArray; -import static org.apache.hadoop.util.functional.RemoteIterators.toList; /** * Test behavior of treewalking when there are pending @@ -454,31 +450,4 @@ private void shell(int expected, final Path base, final String... command) throw .isEqualTo(expected); } - /** - * Assert the upload count under a dir is the expected value. - * Failure message will include the list of entries. - * @param dir dir - * @param expected expected count - * @throws IOException listing problem - */ - private void assertUploadCount(final Path dir, final int expected) throws IOException { - Assertions.assertThat(toList(listUploads(dir))) - .describedAs("uploads under %s", dir) - .hasSize(expected); - } - - /** - * List uploads; use the same APIs that the directory operations use, - * so implicitly validating them. - * @param dir directory to list - * @return full list of entries - * @throws IOException listing problem - */ - private RemoteIterator listUploads(Path dir) throws IOException { - final S3AFileSystem fs = getFileSystem(); - try (AuditSpan ignored = span()) { - final StoreContext sc = fs.createStoreContext(); - return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir)); - } - } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java index 301f348981462..3a8b21db35717 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.store.audit.AuditSpan; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAX_PAGING_KEYS; import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile; @@ -125,7 +126,7 @@ public void testRenameWithPendingUpload() throws Throwable { * @throws IOException listing problem */ private void assertUploadCount(final Path dir, final int expected) throws IOException { - Assertions.assertThat(toList(listUploads(dir))) + Assertions.assertThat(toList(listUploads(dir, getFileSystem()))) .describedAs("uploads under %s", dir) .hasSize(expected); } @@ -134,14 +135,17 @@ private void assertUploadCount(final Path dir, final int expected) throws IOExce * List uploads; use the same APIs that the directory operations use, * so implicitly validating them. * @param dir directory to list + * @param fs * @return full list of entries * @throws IOException listing problem */ - private RemoteIterator listUploads(Path dir) throws IOException { - final S3AFileSystem fs = getFileSystem(); + private RemoteIterator listUploads(Path dir, S3AFileSystem fs) + throws IOException { try (AuditSpan ignored = span()) { final StoreContext sc = fs.createStoreContext(); - return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir)); + final String prefix = sc.pathToKey(dir); + return fs.getStore().getStoreWriter() + .listMultipartUploads(sc, prefix, DEFAULT_MAX_PAGING_KEYS); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java index dcbf61574fb9d..7fb9629b8367a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java @@ -239,7 +239,7 @@ public void testMultiPagesListingPerformanceAndCorrectness() final AuditSpan span = fs.getAuditSpanSource() .createSpan(OBJECT_PUT_REQUESTS.getSymbol(), dir.toString(), null); final WriteOperationHelper writeOperationHelper - = fs.getWriteOperationHelper(); + = fs.createWriteOperationHelper(span); final RequestFactory requestFactory = writeOperationHelper.getRequestFactory(); List> futures = diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalWriteOperationHelperCallbacks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalWriteOperationHelperCallbacks.java index c90ac823ad8e0..f47d6922f8ccf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalWriteOperationHelperCallbacks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalWriteOperationHelperCallbacks.java @@ -18,18 +18,31 @@ package org.apache.hadoop.fs.s3a.test; +import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; import java.util.function.Supplier; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.MultipartUpload; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; /** @@ -46,13 +59,22 @@ public class MinimalWriteOperationHelperCallbacks */ private final Supplier s3clientSupplier; + private final String bucket; + + private final RequestFactory requestFactory; + /** * Constructor. + * @param bucket bucket name * @param s3clientSupplier supplier of the S3 client. */ public MinimalWriteOperationHelperCallbacks( - final Supplier s3clientSupplier) { + final String bucket, + final Supplier s3clientSupplier, + final RequestFactory requestFactory) { this.s3clientSupplier = s3clientSupplier; + this.requestFactory = requestFactory; + this.bucket = bucket; } @Override @@ -69,5 +91,71 @@ public UploadPartResponse uploadPart(final UploadPartRequest request, return s3clientSupplier.get().uploadPart(request, body); } + @Override + public PutObjectResponse putObjectDirect(final PutObjectRequest putObjectRequest, + final PutObjectOptions putOptions, + final S3ADataBlocks.BlockUploadData uploadData, + final DurationTrackerFactory durationTrackerFactory) throws SdkException { + return null; + } + + @Override + public void operationRetried(final String text, + final Exception ex, + final int retries, + final boolean idempotent) { + + } + + @Override + public CreateMultipartUploadResponse initiateMultipartUpload( + final CreateMultipartUploadRequest request) + throws IOException { + return s3clientSupplier.get().createMultipartUpload(request); + } + + @Override + public void abortMultipartUpload(final MultipartUpload upload) throws IOException { + abortMultipartUpload(upload.key(), upload.uploadId()); + } + + @Override + public void abortMultipartUpload(final String destKey, final String uploadId) throws IOException { + s3clientSupplier.get().abortMultipartUpload( + getRequestFactory().newAbortMultipartUploadRequestBuilder(destKey, uploadId) + .build()); + } + + @Override + public List listMultipartUploads(final String prefix) throws IOException { + return Collections.emptyList(); + } + + @Override + public void deleteObjectAtPath(final String key, final boolean isFile) + throws SdkException, UncheckedIOException { + s3clientSupplier.get() + .deleteObject(getRequestFactory().newDeleteObjectRequestBuilder(key).build()); + } + + @Override + public void incrementWriteOperations() { + + } + + @Override + public String getBucket() { + return bucket; + } + + @Override + public RequestFactory getRequestFactory() { + return requestFactory; + } + + @Override + public Configuration getConf() { + return new Configuration(); + } }