diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index a66c55359393f..bace0f274bfec 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -207,7 +207,7 @@ 1.12.720 2.29.52 3.1.1 - 1.0.0 + 1.2.0 1.0.1 2.7.1 1.11.2 diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 304ba032b416a..949099f52f750 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1920,7 +1920,10 @@ private FSDataInputStream executeOpen( .withCallbacks(createInputStreamCallbacks(auditSpan)) .withContext(readContext.build()) .withObjectAttributes(createObjectAttributes(path, fileStatus)) - .withStreamStatistics(inputStreamStats); + .withStreamStatistics(inputStreamStats) + .withAuditSpan(auditSpan) + .withEncryptionSecrets(getEncryptionSecrets()); + return new FSDataInputStream(getStore().readObject(parameters)); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index e91710a0af3a0..780875d93c55b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -21,6 +21,7 @@ import java.util.List; import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; @@ -50,6 +51,8 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE; +import static software.amazon.awssdk.core.interceptor.SdkExecutionAttribute.OPERATION_NAME; +import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID; /** * Extract information from a request. @@ -193,6 +196,18 @@ private RequestInfo writing(final String verb, || request instanceof CreateSessionRequest; } + /** + * If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing + * of requests which are made outside S3A's requestFactory. + * + * @param executionAttributes request execution attributes + * @return true if request is audited outside of current span + */ + public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) { + return executionAttributes.getAttribute(SPAN_ID) != null + && executionAttributes.getAttribute(OPERATION_NAME) != null; + } + /** * Predicate which returns true if the request is part of the * multipart upload API -and which therefore must be rejected diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java index 840ce5ffd3084..e1d28bea5f31a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java @@ -61,6 +61,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO; import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan; +import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT; @@ -69,6 +70,8 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER; import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName; +import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME; +import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID; /** * The LoggingAuditor logs operations at DEBUG (in SDK Request) and @@ -85,7 +88,6 @@ public class LoggingAuditor private static final Logger LOG = LoggerFactory.getLogger(LoggingAuditor.class); - /** * Some basic analysis for the logs. */ @@ -267,8 +269,9 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) { */ private class LoggingAuditSpan extends AbstractAuditSpanImpl { - private final HttpReferrerAuditHeader referrer; + private HttpReferrerAuditHeader referrer; + private final HttpReferrerAuditHeader.Builder headerBuilder; /** * Attach Range of data for GetObject Request. * @param request the sdk request to be modified @@ -300,7 +303,7 @@ private LoggingAuditSpan( final String path2) { super(spanId, operationName); - this.referrer = HttpReferrerAuditHeader.builder() + this.headerBuilder = HttpReferrerAuditHeader.builder() .withContextId(getAuditorId()) .withSpanId(spanId) .withOperationName(operationName) @@ -312,8 +315,9 @@ private LoggingAuditSpan( currentThreadID()) .withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp())) .withEvaluated(context.getEvaluatedEntries()) - .withFilter(filters) - .build(); + .withFilter(filters); + + this.referrer = this.headerBuilder.build(); this.description = referrer.buildHttpReferrer(); } @@ -384,12 +388,33 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, SdkHttpRequest httpRequest = context.httpRequest(); SdkRequest sdkRequest = context.request(); + // If spanId and operationName are set in execution attributes, then use these values, + // instead of the ones in the current span. This is useful when requests are happening in dependencies such as + // the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL + // will attach the current spanId and operationName via execution attributes during it's request creation. These + // can then used to update the values in the logger and referrer header. Without this overwriting, the operation + // name and corresponding span will be whichever is active on the thread the request is getting executed on. + boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes); + + String spanId = isRequestAuditedOutsideCurrentSpan ? + executionAttributes.getAttribute(SPAN_ID) : getSpanId(); + + String operationName = isRequestAuditedOutsideCurrentSpan ? + executionAttributes.getAttribute(OPERATION_NAME) : getOperationName(); + + if (isRequestAuditedOutsideCurrentSpan) { + this.headerBuilder.withSpanId(spanId); + this.headerBuilder.withOperationName(operationName); + this.referrer = this.headerBuilder.build(); + } + // attach range for GetObject requests attachRangeFromRequest(httpRequest, executionAttributes); // for delete op, attach the number of files to delete attachDeleteKeySizeAttribute(sdkRequest); + // build the referrer header final String header = referrer.buildHttpReferrer(); // update the outer class's field. @@ -400,11 +425,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, .appendHeader(HEADER_REFERRER, header) .build(); } + if (LOG.isDebugEnabled()) { LOG.debug("[{}] {} Executing {} with {}; {}", currentThreadID(), - getSpanId(), - getOperationName(), + spanId, + operationName, analyzer.analyze(context.request()), header); } @@ -533,10 +559,12 @@ public void beforeExecution(Context.BeforeExecution context, + analyzer.analyze(context.request()); final String unaudited = getSpanId() + " " + UNAUDITED_OPERATION + " " + error; + // If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it + // as an audited request. if (isRequestNotAlwaysInSpan(context.request())) { - // can get by auditing during a copy, so don't overreact + // can get by auditing during a copy, so don't overreact. LOG.debug(unaudited); - } else { + } else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) { final RuntimeException ex = new AuditFailureException(unaudited); LOG.debug(unaudited, ex); if (isRejectOutOfSpan()) { @@ -547,5 +575,4 @@ public void beforeExecution(Context.BeforeExecution context, super.beforeExecution(context, executionAttributes); } } - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 6b910c6538070..f1bfda2efc110 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -22,9 +22,26 @@ import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.IntFunction; + +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.common.ObjectRange; + + +import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; +import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations; +import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext; import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -38,6 +55,9 @@ import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; + + /** * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports * parquet specific optimisations such as parquet-aware prefetching. For more details, see @@ -128,6 +148,48 @@ public int read(byte[] buf, int off, int len) throws IOException { return bytesRead; } + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { + LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges); + throwIfClosed(); + + List objectRanges = new ArrayList<>(); + + for (FileRange range : ranges) { + CompletableFuture result = new CompletableFuture<>(); + ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength()); + objectRanges.add(objectRange); + range.setData(result); + } + + // AAL does not do any range coalescing, so input and combined ranges are the same. + this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size()); + inputStream.readVectored(objectRanges, allocate, release); + } @Override public boolean seekToNewSource(long l) throws IOException { @@ -152,6 +214,8 @@ protected boolean isClosed() { @Override protected void abortInFinalizer() { try { + // stream was leaked: update statistic + getS3AStreamStatistics().streamLeaked(); close(); } catch (IOException ignored) { @@ -205,6 +269,17 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa .etag(parameters.getObjectAttributes().getETag()).build()); } + openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder() + .operationName(parameters.getAuditSpan().getOperationName()) + .spanId(parameters.getAuditSpan().getSpanId()) + .build()); + + if(parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) { + EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets()) + .ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets( + EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build())); + } + return openStreamInformationBuilder.build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java index c67c08be7b986..d8377fed94766 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -96,8 +96,7 @@ public StreamFactoryRequirements factoryRequirements() { vectorContext.setMinSeekForVectoredReads(0); return new StreamFactoryRequirements(0, - 0, vectorContext, - StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests); + 0, vectorContext); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java index e784dadcb651a..fb1d98f889a16 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java @@ -23,7 +23,9 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.store.audit.AuditSpan; import static java.util.Objects.requireNonNull; @@ -69,6 +71,34 @@ public final class ObjectReadParameters { */ private LocalDirAllocator directoryAllocator; + /** + * Span for which this stream is being created. + */ + private AuditSpan auditSpan; + + /** + * Encryption secrets for this stream + */ + private EncryptionSecrets encryptionSecrets; + + /** + * Getter. + * @return Encryption secrets. + */ + public EncryptionSecrets getEncryptionSecrets() { + return encryptionSecrets; + } + + /** + * Set encryption secrets. + * @param value new value + * @return the builder + */ + public ObjectReadParameters withEncryptionSecrets(final EncryptionSecrets value) { + encryptionSecrets = value; + return this; + } + /** * @return Read operation context. */ @@ -172,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value return this; } + /** + * Getter. + * @return Audit span. + */ + public AuditSpan getAuditSpan() { + return auditSpan; + } + + /** + * Set audit span. + * @param value new value + * @return the builder + */ + public ObjectReadParameters withAuditSpan(final AuditSpan value) { + auditSpan = value; + return this; + } + /** * Validate that all attributes are as expected. * Mock tests can skip this if required. @@ -185,6 +233,9 @@ public ObjectReadParameters validate() { requireNonNull(directoryAllocator, "directoryAllocator"); requireNonNull(objectAttributes, "objectAttributes"); requireNonNull(streamStatistics, "streamStatistics"); + requireNonNull(auditSpan, "auditSpan"); + requireNonNull(encryptionSecrets, "encryptionSecrets"); + return this; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java index bb35a0580a20e..07159343d339b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java @@ -77,7 +77,7 @@ public final class StreamIntegration { /** * What is the default type? */ - public static final InputStreamType DEFAULT_STREAM_TYPE = InputStreamType.Classic; + public static final InputStreamType DEFAULT_STREAM_TYPE = InputStreamType.Analytics; /** * Configuration deprecation log for warning about use of the diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java index 41a107c36f07a..595663de70e70 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java @@ -18,13 +18,22 @@ package org.apache.hadoop.fs.contract.s3a; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; -import org.junit.jupiter.api.BeforeEach; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; /** * S3A contract tests for vectored reads with the Analytics stream. @@ -63,7 +72,6 @@ protected Configuration createConfiguration() { // This issue is tracked in: // https://github.com/awslabs/analytics-accelerator-s3/issues/218 skipForAnyEncryptionExceptSSES3(conf); - conf.set("fs.contract.vector-io-early-eof-check", "false"); return conf; } @@ -71,4 +79,23 @@ protected Configuration createConfiguration() { protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + @Test + public void testReadVectoredWithAALStatsCollection() throws Exception { + + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = openVectorFile()){ + in.readVectored(fileRanges, getAllocate()); + + IOStatistics st = in.getIOStatistics(); + + // Statistics such as GET requests will be added after IoStats support. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1); + + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 1); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index ef47acb459587..5663a84d68a5a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -97,7 +97,7 @@ protected AbstractFSContract createContract(Configuration conf) { public void setup() throws Exception { super.setup(); skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator does not support vectored reads"); + "AAL with readVectored() is tested in ITestS3AContractAnalyticsStreamVectoredRead"); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index dff171bbdd8eb..6b44c5d5f70c2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -42,6 +42,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; @@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable { verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); fs.close(); verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1); + + // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE, + // in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here + // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS: + // [5-8388612, 8388613-16777220, 16777221-21511173]. + verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java index 9f7f93e5bf9b7..5faad7d3adced 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java @@ -42,10 +42,10 @@ import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -99,8 +99,6 @@ protected Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support SSE-C"); assumeEnabled(); // although not a root dir test, this confuses paths enough it shouldn't be run in // parallel with other jobs @@ -331,6 +329,65 @@ public void testChecksumRequiresReadAccess() throws Throwable { () -> fsKeyB.getFileChecksum(path)); } + + /** + * Tests the creation and reading of a file using a different encryption key + * when Analytics Accelerator is enabled. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testCreateFileAndReadWithDifferentEncryptionKeyWithAnalyticsAcceleratorEnabled() throws Exception { + enableAnalyticsAccelerator(getConfiguration()); + testCreateFileAndReadWithDifferentEncryptionKey(); + } + + /** + * Tests the creation and movement of a file using a different SSE-C key + * when Analytics Accelerator is enabled. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testCreateFileThenMoveWithDifferentSSECKeyWithAnalyticsAcceleratorEnabled() throws Exception { + enableAnalyticsAccelerator(getConfiguration()); + testCreateFileThenMoveWithDifferentSSECKey(); + } + + /** + * Tests create and file rename operation when Analytics Accelerator is enabled. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testCreateFileAndRenameFileWithAnalyticsAcceleratorEnabled() throws Exception { + enableAnalyticsAccelerator(getConfiguration()); + testRenameFile(); + } + + /** + * Tests the creation and listing of encrypted files when Analytics Accelerator is enabled. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testCreateFileAndListStatusEncryptedFileWithAnalyticsAcceleratorEnabled() throws Exception { + enableAnalyticsAccelerator(getConfiguration()); + testListStatusEncryptedFile(); + } + + /** + * Tests the creation and deletion of an encrypted object using a different key + * when Analytics Accelerator is enabled.t. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testCreateFileAndDeleteEncryptedObjectWithDifferentKeyWithAnalyticsAcceleratorEnabled() throws Exception { + enableAnalyticsAccelerator(getConfiguration()); + testDeleteEncryptedObjectWithDifferentKey(); + } + private S3AFileSystem createNewFileSystemWithSSECKey(String sseCKey) throws IOException { Configuration conf = this.createConfiguration(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java index 31fe270a8b9e3..0aca12ee337e3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java @@ -37,7 +37,6 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS; import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs; @@ -90,10 +89,6 @@ public void setup() throws Exception { @Test public void testFinalizer() throws Throwable { Path path = methodPath(); - // Analytics accelerator currently does not support stream leak detection. This work is tracked - // in https://issues.apache.org/jira/browse/HADOOP-19451 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support leak detection"); final S3AFileSystem fs = getFileSystem(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java index 401db0969c657..012a9fd635cb7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java @@ -32,7 +32,6 @@ import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; /** @@ -57,8 +56,6 @@ public class ITestS3AHugeFilesSSECDiskBlocks public void setup() throws Exception { try { super.setup(); - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support SSE-C"); } catch (AccessDeniedException | AWSUnsupportedFeatureException e) { skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " encryption method"); }