Skip to content

HADOOP-19559. Make AAL the default input stream. [DRAFT] #7776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<amazon-s3-analyticsaccelerator-s3.version>1.0.0</amazon-s3-analyticsaccelerator-s3.version>
<amazon-s3-analyticsaccelerator-s3.version>1.2.0</amazon-s3-analyticsaccelerator-s3.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
<hsqldb.version>2.7.1</hsqldb.version>
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,10 @@ private FSDataInputStream executeOpen(
.withCallbacks(createInputStreamCallbacks(auditSpan))
.withContext(readContext.build())
.withObjectAttributes(createObjectAttributes(path, fileStatus))
.withStreamStatistics(inputStreamStats);
.withStreamStatistics(inputStreamStats)
.withAuditSpan(auditSpan)
.withEncryptionSecrets(getEncryptionSecrets());

return new FSDataInputStream(getStore().readObject(parameters));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
Expand Down Expand Up @@ -50,6 +51,8 @@
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
import static software.amazon.awssdk.core.interceptor.SdkExecutionAttribute.OPERATION_NAME;
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;

/**
* Extract information from a request.
Expand Down Expand Up @@ -193,6 +196,18 @@ private RequestInfo writing(final String verb,
|| request instanceof CreateSessionRequest;
}

/**
* If spanId and operation name are set by dependencies such as AAL, then this returns true. Allows for auditing
* of requests which are made outside S3A's requestFactory.
*
* @param executionAttributes request execution attributes
* @return true if request is audited outside of current span
*/
public static boolean isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
return executionAttributes.getAttribute(SPAN_ID) != null
&& executionAttributes.getAttribute(OPERATION_NAME) != null;
}

/**
* Predicate which returns true if the request is part of the
* multipart upload API -and which therefore must be rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
Expand All @@ -69,6 +70,8 @@
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;

/**
* The LoggingAuditor logs operations at DEBUG (in SDK Request) and
Expand All @@ -85,7 +88,6 @@ public class LoggingAuditor
private static final Logger LOG =
LoggerFactory.getLogger(LoggingAuditor.class);


/**
* Some basic analysis for the logs.
*/
Expand Down Expand Up @@ -267,8 +269,9 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
*/
private class LoggingAuditSpan extends AbstractAuditSpanImpl {

private final HttpReferrerAuditHeader referrer;
private HttpReferrerAuditHeader referrer;

private final HttpReferrerAuditHeader.Builder headerBuilder;
/**
* Attach Range of data for GetObject Request.
* @param request the sdk request to be modified
Expand Down Expand Up @@ -300,7 +303,7 @@ private LoggingAuditSpan(
final String path2) {
super(spanId, operationName);

this.referrer = HttpReferrerAuditHeader.builder()
this.headerBuilder = HttpReferrerAuditHeader.builder()
.withContextId(getAuditorId())
.withSpanId(spanId)
.withOperationName(operationName)
Expand All @@ -312,8 +315,9 @@ private LoggingAuditSpan(
currentThreadID())
.withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
.withEvaluated(context.getEvaluatedEntries())
.withFilter(filters)
.build();
.withFilter(filters);

this.referrer = this.headerBuilder.build();

this.description = referrer.buildHttpReferrer();
}
Expand Down Expand Up @@ -384,12 +388,33 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
SdkHttpRequest httpRequest = context.httpRequest();
SdkRequest sdkRequest = context.request();

// If spanId and operationName are set in execution attributes, then use these values,
// instead of the ones in the current span. This is useful when requests are happening in dependencies such as
// the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL
// will attach the current spanId and operationName via execution attributes during it's request creation. These
// can then used to update the values in the logger and referrer header. Without this overwriting, the operation
// name and corresponding span will be whichever is active on the thread the request is getting executed on.
boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes);

String spanId = isRequestAuditedOutsideCurrentSpan ?
executionAttributes.getAttribute(SPAN_ID) : getSpanId();

String operationName = isRequestAuditedOutsideCurrentSpan ?
executionAttributes.getAttribute(OPERATION_NAME) : getOperationName();

if (isRequestAuditedOutsideCurrentSpan) {
this.headerBuilder.withSpanId(spanId);
this.headerBuilder.withOperationName(operationName);
this.referrer = this.headerBuilder.build();
}

// attach range for GetObject requests
attachRangeFromRequest(httpRequest, executionAttributes);

// for delete op, attach the number of files to delete
attachDeleteKeySizeAttribute(sdkRequest);


// build the referrer header
final String header = referrer.buildHttpReferrer();
// update the outer class's field.
Expand All @@ -400,11 +425,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context,
.appendHeader(HEADER_REFERRER, header)
.build();
}

if (LOG.isDebugEnabled()) {
LOG.debug("[{}] {} Executing {} with {}; {}",
currentThreadID(),
getSpanId(),
getOperationName(),
spanId,
operationName,
analyzer.analyze(context.request()),
header);
}
Expand Down Expand Up @@ -533,10 +559,12 @@ public void beforeExecution(Context.BeforeExecution context,
+ analyzer.analyze(context.request());
final String unaudited = getSpanId() + " "
+ UNAUDITED_OPERATION + " " + error;
// If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it
// as an audited request.
if (isRequestNotAlwaysInSpan(context.request())) {
// can get by auditing during a copy, so don't overreact
// can get by auditing during a copy, so don't overreact.
LOG.debug(unaudited);
} else {
} else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
final RuntimeException ex = new AuditFailureException(unaudited);
LOG.debug(unaudited, ex);
if (isRejectOutOfSpan()) {
Expand All @@ -547,5 +575,4 @@ public void beforeExecution(Context.BeforeExecution context,
super.beforeExecution(context, executionAttributes);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,26 @@
import java.io.EOFException;
import java.io.IOException;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.VectoredReadUtils;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;


import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
Expand All @@ -38,6 +55,9 @@
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;


/**
* Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
* parquet specific optimisations such as parquet-aware prefetching. For more details, see
Expand Down Expand Up @@ -128,6 +148,48 @@ public int read(byte[] buf, int off, int len) throws IOException {
return bytesRead;
}

/**
* {@inheritDoc}
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
* @throws IOException IOE if any.
*/
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

/**
* {@inheritDoc}
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
* @throws IOException IOE if any.
*/
@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
throwIfClosed();

List<ObjectRange> objectRanges = new ArrayList<>();

for (FileRange range : ranges) {
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength());
objectRanges.add(objectRange);
range.setData(result);
}

// AAL does not do any range coalescing, so input and combined ranges are the same.
this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size());
inputStream.readVectored(objectRanges, allocate, release);
}

@Override
public boolean seekToNewSource(long l) throws IOException {
Expand All @@ -152,6 +214,8 @@ protected boolean isClosed() {
@Override
protected void abortInFinalizer() {
try {
// stream was leaked: update statistic
getS3AStreamStatistics().streamLeaked();
close();
} catch (IOException ignored) {

Expand Down Expand Up @@ -205,6 +269,17 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa
.etag(parameters.getObjectAttributes().getETag()).build());
}

openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
.operationName(parameters.getAuditSpan().getOperationName())
.spanId(parameters.getAuditSpan().getSpanId())
.build());

if(parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) {
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
.ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets(
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
}

return openStreamInformationBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public StreamFactoryRequirements factoryRequirements() {
vectorContext.setMinSeekForVectoredReads(0);

return new StreamFactoryRequirements(0,
0, vectorContext,
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
0, vectorContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.store.audit.AuditSpan;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -69,6 +71,34 @@ public final class ObjectReadParameters {
*/
private LocalDirAllocator directoryAllocator;

/**
* Span for which this stream is being created.
*/
private AuditSpan auditSpan;

/**
* Encryption secrets for this stream
*/
private EncryptionSecrets encryptionSecrets;

/**
* Getter.
* @return Encryption secrets.
*/
public EncryptionSecrets getEncryptionSecrets() {
return encryptionSecrets;
}

/**
* Set encryption secrets.
* @param value new value
* @return the builder
*/
public ObjectReadParameters withEncryptionSecrets(final EncryptionSecrets value) {
encryptionSecrets = value;
return this;
}

/**
* @return Read operation context.
*/
Expand Down Expand Up @@ -172,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final LocalDirAllocator value
return this;
}

/**
* Getter.
* @return Audit span.
*/
public AuditSpan getAuditSpan() {
return auditSpan;
}

/**
* Set audit span.
* @param value new value
* @return the builder
*/
public ObjectReadParameters withAuditSpan(final AuditSpan value) {
auditSpan = value;
return this;
}

/**
* Validate that all attributes are as expected.
* Mock tests can skip this if required.
Expand All @@ -185,6 +233,9 @@ public ObjectReadParameters validate() {
requireNonNull(directoryAllocator, "directoryAllocator");
requireNonNull(objectAttributes, "objectAttributes");
requireNonNull(streamStatistics, "streamStatistics");
requireNonNull(auditSpan, "auditSpan");
requireNonNull(encryptionSecrets, "encryptionSecrets");

return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public final class StreamIntegration {
/**
* What is the default type?
*/
public static final InputStreamType DEFAULT_STREAM_TYPE = InputStreamType.Classic;
public static final InputStreamType DEFAULT_STREAM_TYPE = InputStreamType.Analytics;

/**
* Configuration deprecation log for warning about use of the
Expand Down
Loading