Skip to content

Commit

Permalink
HADOOP-19256. filesystem options for overwrite, etag and small multip…
Browse files Browse the repository at this point in the history
…art files

createFile() options are passed down all the way to object creation
* fs.option.create.conditional.overwrite: sets the header.
  MUST be in close(). After all, create(overwrite=false) is eager.
* fs.option.create.conditional.overwrite.etag requests etag writes.
  MAY be in create(); may be in close().
* fs.option.create.content.type for mime type.
* fs.option.create.in.close to ask for create in close, as fs capability.
  (maybe make this an fs path capability only?)
* fs.s3a.create.multipart : allows tests to create small multipart files

Javadocs of the fs.option.create try to define semantics; will need
strict fs specification soon.

Bool params come down in an enumset of flags; I'm going to do that in
more code as it is more flexible over time than many booleans.

- etag values are passed down but not wired up/tested
- content type flag exists but is ignored

New WriteObjectFlags enum is in new package o.a.h.fs.s3a.write;
to match the streams package in another ongoing PR.
Goal, as we maintain things, all code related to writing can
go in here.

Change-Id: I301abd7397accbd278d05f42f858223ba1349fc8
  • Loading branch information
steveloughran committed Feb 10, 2025
1 parent d04aa2e commit e39ecb3
Show file tree
Hide file tree
Showing 15 changed files with 290 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -723,34 +723,87 @@ private OpenFileOptions() {
public interface CreateFileOptionKeys {

/**
* {code createFile()} option to write a file iff there is nothing at the destination.
* This may happen during create() or in the close.
* {code createFile()} option to write a file in the close() operation iff
* there is nothing at the destination.
* this is the equivalent of {@code create(path, overwrite=true}
* <i>except that the existence check is postponed to the end of the write</i>.
* <p>
* Explicitly set {@link #FS_OPTION_CREATE_IN_CLOSE} if you want to force the end of file
* creation.
*
* Value {@value}.
* <p>
* This can be set in the builder.
* <p>
* It should be exported as a path capability for all stores where
* the feature is available *and* enabled.
* <ol>
* <li>It is for object stores stores which only upload/manifest files
* at the end of the stream write.</li>
* <li>Streams which support it SHALL not manifest any object to
* the destination path until close()</li>
* <li>It MUST be declared as a stream capability in streams for which
* this overwrite is enabled.</li>
* <li>It MUST be exported as a path capability for all stores where
* the feature is available <i>and</i> enabled</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and it is supported/enabled,
* the FS SHALL omit all overwrite checks in {@code create},
* including for the existence of an object or a directory underneath.
* Instead, during {close()} the object will only be manifest
* at the target path if there is no object at the destination.
* </li>
* <li>The existence check and object creation SHALL be atomic.</li>
* <li>If passed to a filesystem as a {@code must()} parameter where
* the option value is {@code true}, and the FS does not recognise
* the feature, or it is recognized but disabled on this FS instance,
* the filesystem SHALL reject the request.
* </li>
* <li>If passed to a filesystem as a {@code opt()} parameter where
* the option value is {@code true}, the filesystem MAY ignore
* the request, or it MAY enable the feature.
* Any filesystem which does not support the feature, including
* from older releases, SHALL ignore it.
* </ol>
*/
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite";

/**
* Overwrite a file only if there is an Etag match. This option takes a string.
* Overwrite a file only if there is an Etag match. This option takes a string,
*
* Value {@value}.
* <p>
* This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
* <ol>
* <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li>
* <li>If supported and enabled, it SHALL be declared as a capability of the stream</li>
* <li>The string passed as the value SHALL be the etag value as returned by
* {@code EtagSource.getEtag()}</li>
* <li>This value MUST NOT be empty</li>
* <li>If passed to a filesystem which supports it, then when the file is created,
* the store SHALL check for the existence of a file/object at the destination
* path.
* </li>
* <li>If there is no object there, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li>
* <li>If there is an object there, its Etag SHALL be compared to the
* value passed here.<li>
* <li>If there is no match, the operation SHALL be rejected by raising
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
* </li> *
* <li>If the etag does match, the file SHALL be created.</li>
* <li>The check and create SHALL be atomic</li>
* <li>The check and create MAY be at the end of the write, in {@code close()},
* or it MAY be in the {@code create()} operation. That is: some stores
* MAY perform the check early</li>
* <li>If supported and enabled, stores MAY check for the existence of subdirectories;
* this behavior is implementation-specific.</li>
* <li></li>
* <li></li>
* </ol>
*/
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
"fs.option.create.conditional.overwrite.etag";

/**
* String to define the content filetype.
* Value {@value}.
*/
String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";

/**
* A flag which requires the filesystem to create files/objects in close(),
* rather than create/createFile.
Expand All @@ -760,5 +813,12 @@ public interface CreateFileOptionKeys {
* Value {@value}.
*/
String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";

/**
* String to define the content filetype.
* Value {@value}.
*/
String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -52,6 +53,7 @@
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -226,6 +228,11 @@ class S3ABlockOutputStream extends OutputStream implements
/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;

/**
* Object write option flags.
*/
private final EnumSet<WriteObjectFlags> writeObjectFlags;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand All @@ -251,6 +258,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.iostatistics = statistics.getIOStatistics();
this.writeOperationHelper = builder.writeOperations;
this.putTracker = builder.putTracker;
this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
this.executorService = MoreExecutors.listeningDecorator(
builder.executorService);
this.multiPartUpload = null;
Expand All @@ -268,9 +276,19 @@ class S3ABlockOutputStream extends OutputStream implements
? builder.blockSize
: -1;

// if required to be multipart by the committer put tracker or
// write flags (i.e createFile() options, initiate multipart uploads.
// this will fail fast if the store doesn't support multipart uploads
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
} else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
// this not merged simply to avoid confusion
// to what to do it both are set, so as to guarantee
// the put tracker initialization always takes priority
// over any file flag.
LOG.debug("Multipart initiated from createFile() options");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2177,7 +2177,10 @@ private FSDataOutputStream innerCreateFile(
// put options are derived from the option builder.
boolean conditionalCreate = options.isConditionalOverwrite();
final PutObjectOptions putOptions =
new PutObjectOptions(null, options.getHeaders(), conditionalCreate, null);
new PutObjectOptions(null,
options.getHeaders(),
options.writeObjectFlags(),
options.etag());


validateOutputStreamConfiguration(path, getConf());
Expand Down Expand Up @@ -3252,8 +3255,7 @@ private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
long length,
boolean isDirectoryMarker) {
return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker,
PutObjectOptions.defaultOptions());
return requestFactory.newPutObjectRequestBuilder(key, PutObjectOptions.defaultOptions(), length, isDirectoryMarker);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public PutObjectRequest createPutObjectRequest(String destKey,
activateAuditSpan();

return getRequestFactory()
.newPutObjectRequestBuilder(destKey, options, length, false, options)
.newPutObjectRequestBuilder(destKey, options, length, false)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,12 @@ CopyObjectRequest.Builder newCopyObjectRequestBuilder(String srcKey,
* @param options options for the request
* @param length length of object to be uploaded
* @param isDirectoryMarker true if object to be uploaded is a directory marker
* @param putOptions
* @return the request builder
*/
PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
PutObjectOptions options,
long length,
boolean isDirectoryMarker,
PutObjectOptions putOptions);
boolean isDirectoryMarker);

/**
* Create a {@link PutObjectRequest} request for creating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.commit.magic;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -33,13 +34,15 @@
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.util.Preconditions;

import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
import static org.apache.hadoop.fs.s3a.impl.PutObjectOptions.defaultOptions;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;

/**
Expand Down Expand Up @@ -79,7 +82,10 @@ public boolean aboutToComplete(String uploadId,
PutObjectRequest originalDestPut = getWriter().createPutObjectRequest(
getOriginalDestKey(),
0,
new PutObjectOptions(null, headers, false, null));
new PutObjectOptions(null,
headers,
EnumSet.noneOf(WriteObjectFlags.class),
""));
upload(originalDestPut, EMPTY);

// build the commit summary
Expand All @@ -103,7 +109,8 @@ public boolean aboutToComplete(String uploadId,
getPath(), getPendingPartKey(), commitData);
PutObjectRequest put = getWriter().createPutObjectRequest(
getPendingPartKey(),
bytes.length, null);
bytes.length,
defaultOptions());
upload(put, bytes);
return false;
}
Expand All @@ -117,7 +124,7 @@ public boolean aboutToComplete(String uploadId,
@Retries.RetryTranslated
private void upload(PutObjectRequest request, byte[] bytes) throws IOException {
trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(),
() -> getWriter().putObject(request, PutObjectOptions.defaultOptions(),
() -> getWriter().putObject(request, defaultOptions(),
new S3ADataBlocks.BlockUploadData(bytes, null), null));
}
}
Loading

0 comments on commit e39ecb3

Please sign in to comment.