Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19256. steve's pr of conditional writes #7362

Draft
wants to merge 16 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ org.objenesis:objenesis:2.6
org.xerial.snappy:snappy-java:1.1.10.4
org.yaml:snakeyaml:2.0
org.wildfly.openssl:wildfly-openssl:2.1.4.Final
software.amazon.awssdk:bundle:2.25.53
software.amazon.awssdk:bundle:jar:2.27.14


--------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,4 +710,115 @@ private OpenFileOptions() {
public static final String FS_OPTION_OPENFILE_EC_POLICY =
FS_OPTION_OPENFILE + "ec.policy";
}

/**
* The standard {@code createFile()} options.
* <p>
* If an option is not supported during file creation and it is considered
* part of a commit protocol, then, when supplied in a must() option,
* it MUST be rejected.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CreateFileOptionKeys {

/**
* {code createFile()} option to write a file in the close() operation iff
* there is nothing at the destination.
* this is the equivalent of {@code create(path, overwrite=true}
* <i>except that the existence check is postponed to the end of the write</i>.
* <p>
* Value {@value}.
* <p>
* This can be set in the builder.
* <p>
* <ol>
* <li>It is for object stores stores which only upload/manifest files
* at the end of the stream write.</li>
* <li>Streams which support it SHALL not manifest any object to
* the destination path until close()</li>
* <li>It MUST be declared as a stream capability in streams for which
* this overwrite is enabled.</li>
* <li>It MUST be exported as a path capability for all stores where
* the feature is available <i>and</i> enabled</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and it is supported/enabled,
* the FS SHALL omit all overwrite checks in {@code create},
* including for the existence of an object or a directory underneath.
* Instead, during {close()} the object will only be manifest
* at the target path if there is no object at the destination.
* </li>
* <li>The existence check and object creation SHALL be atomic.</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and the FS does not recognise
* the feature, or it is recognized but disabled on this FS instance,
* the filesystem SHALL reject the request.
* </li>
* <li>If passed to a filesystem as a {@code opt()} parameter where
* the option value is {@code true}, the filesystem MAY ignore
* the request, or it MAY enable the feature.
* Any filesystem which does not support the feature, including
* from older releases, SHALL ignore it.
* </ol>

*/
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite";

/**
* Overwrite a file only if there is an Etag match. This option takes a string,
*
* Value {@value}.
* <p>
* This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
* <ol>
* <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li>
* <li>If supported and enabled, it SHALL be declared as a capability of the stream</li>
* <li>The string passed as the value SHALL be the etag value as returned by
* {@code EtagSource.getEtag()}</li>
* <li>This value MUST NOT be empty</li>
* <li>If passed to a filesystem which supports it, then when the file is created,
* the store SHALL check for the existence of a file/object at the destination
* path.
* </li>
* <li>If there is no object there, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li>
* <li>If there is an object there, its Etag SHALL be compared to the
* value passed here.<li>
* <li>If there is no match, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li> *
* <li>If the etag does match, the file SHALL be created.</li>
* <li>The check and create SHALL be atomic</li>
* <li>The check and create MAY be at the end of the write, in {@code close()},
* or it MAY be in the {@code create()} operation. That is: some stores
* MAY perform the check early</li>
* <li>If supported and enabled, stores MAY check for the existence of subdirectories;
* this behavior is implementation-specific.</li>
* <li></li>
* <li></li>
* </ol>
*/
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
"fs.option.create.conditional.overwrite.etag";

/**
* A flag which requires the filesystem to create files/objects in close(),
* rather than create/createFile.
* <p>
* Object stores with this behavior should also export it as a path capability.
*
* Value {@value}.
*/
String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";

/**
* String to define the content filetype.
* Value {@value}.
*/
String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";

}
}
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@
<make-maven-plugin.version>1.0-beta-1</make-maven-plugin.version>
<surefire.fork.timeout>900</surefire.fork.timeout>
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<aws-java-sdk-v2.version>2.27.14</aws-java-sdk-v2.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
<hsqldb.version>2.7.1</hsqldb.version>
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,23 @@ private Constants() {
*/
public static final String FS_S3A_PERFORMANCE_FLAGS =
"fs.s3a.performance.flags";


/**
* Is the create overwrite feature enabled or not?
* A configuration option and a path status probe.
* Value {@value}.
*/
public static final String FS_S3A_CREATE_OVERWRITE_SUPPORTED = "fs.s3a.create.overwrite.supported";

/**
* Create a multipart file, always: {@value}.
* <p>
* This is inefficient and will not work on a store which doesn't support that feature,
* so is primarily for testing.
*/
public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";

/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.
Expand Down Expand Up @@ -1780,4 +1797,12 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";

/**
* Value for the {@code If-None-Match} HTTP header in S3 requests.
* Value: {@value}.
* More information: <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html">
* AWS S3 PutObject API Documentation</a>
*/
public static final String IF_NONE_MATCH_STAR = "*";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -49,9 +50,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.ClosedIOException;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -79,6 +82,7 @@
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*;
import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
Expand Down Expand Up @@ -148,6 +152,7 @@ class S3ABlockOutputStream extends OutputStream implements
* the blocks themselves are closed: 15 seconds.
*/
private static final Duration TIME_TO_AWAIT_CANCEL_COMPLETION = Duration.ofSeconds(15);
public static final String IF_NONE_MATCH_HEADER = "If-None-Match";

/** Object being uploaded. */
private final String key;
Expand Down Expand Up @@ -224,6 +229,11 @@ class S3ABlockOutputStream extends OutputStream implements
/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;

/**
* Object write option flags.
*/
private final EnumSet<WriteObjectFlags> writeObjectFlags;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand All @@ -249,6 +259,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.iostatistics = statistics.getIOStatistics();
this.writeOperationHelper = builder.writeOperations;
this.putTracker = builder.putTracker;
this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
this.executorService = MoreExecutors.listeningDecorator(
builder.executorService);
this.multiPartUpload = null;
Expand All @@ -266,9 +277,19 @@ class S3ABlockOutputStream extends OutputStream implements
? builder.blockSize
: -1;

// if required to be multipart by the committer put tracker or
// write flags (i.e createFile() options, initiate multipart uploads.
// this will fail fast if the store doesn't support multipart uploads
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
} else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
// this not merged simply to avoid confusion
// to what to do it both are set, so as to guarantee
// the put tracker initialization always takes priority
// over any file flag.
LOG.debug("Multipart initiated from createFile() options");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
Expand Down Expand Up @@ -692,28 +713,43 @@ private long putObject() throws IOException {
final S3ADataBlocks.DataBlock block = getActiveBlock();
final long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest =
PutObjectRequest putObjectRequest =
writeOperationHelper.createPutObjectRequest(
key,
uploadData.getSize(),
builder.putOptions);
clearActiveBlock();

/*
PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder();
Map<String, String> optionHeaders = builder.putOptions.getHeaders();
if (builder.isConditionalPutEnabled) {
maybeModifiedPutIfAbsentRequest.overrideConfiguration(
override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH)));
}
final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build();
*/

BlockUploadProgress progressCallback =
new BlockUploadProgress(block, progressListener, now());
statistics.blockUploadQueued(size);
try {
progressCallback.progressChanged(PUT_STARTED_EVENT);
// the putObject call automatically closes the upload data
writeOperationHelper.putObject(putObjectRequest,
writeOperationHelper.putObject(finalizedRequest,
builder.putOptions,
uploadData,
statistics);
progressCallback.progressChanged(REQUEST_BYTE_TRANSFER_EVENT);
progressCallback.progressChanged(PUT_COMPLETED_EVENT);
} catch (InterruptedIOException ioe){
} catch (InterruptedIOException ioe) {
progressCallback.progressChanged(PUT_INTERRUPTED_EVENT);
throw ioe;
} catch (RemoteFileChangedException ex) {
// this is a put if none/if match failure
// TODO, progress callback
progressCallback.progressChanged(CONDITIONAL_PUT_FAILED_EVENT);
throw new FileAlreadyExistsException(ex);
} catch (IOException ioe){
progressCallback.progressChanged(PUT_FAILED_EVENT);
throw ioe;
Expand Down Expand Up @@ -772,7 +808,8 @@ BlockOutputStreamStatistics getStatistics() {
@SuppressWarnings("deprecation")
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
final String cap = capability.toLowerCase(Locale.ENGLISH);
switch (cap) {

// does the output stream have delayed visibility
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
Expand All @@ -797,6 +834,12 @@ public boolean hasCapability(String capability) {
return true;

default:
// scan flags for the capability
for (WriteObjectFlags flag : writeObjectFlags) {
if (flag.hasKey(cap)) {
return true;
}
}
return false;
}
}
Expand Down Expand Up @@ -1399,6 +1442,11 @@ public static final class BlockOutputStreamBuilder {
*/
private boolean isMultipartUploadEnabled;

/**
* Is conditional create enabled.
*/
private boolean isConditionalPutEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1560,5 +1608,11 @@ public BlockOutputStreamBuilder withMultipartEnabled(
isMultipartUploadEnabled = value;
return this;
}

public BlockOutputStreamBuilder withConditionalPutEnabled(
final boolean value){
isConditionalPutEnabled = value;
return this;
}
}
}
Loading