Skip to content

Commit f03e1b9

Browse files
committed
HADOOP-19569. stream close
Continuing pulling out multipart IO such that there are no back references from it to S3AStore -the final change is to define a store statistics class which it and other things can use to update stats. Yes, those stats will get back into the FS, but we don't want to have that recursive complexity of S3AFS utility classes. Once an inner class has been pulled out of S3AStoreImpl, it SHOULD have a restricted interface of operations it can call back on the store.
1 parent 87f8140 commit f03e1b9

File tree

10 files changed

+277
-204
lines changed

10 files changed

+277
-204
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,7 +1422,8 @@ private void initMultipartUploads(Configuration conf) throws IOException {
14221422
getStore().getMultipartIO().abortOutstandingMultipartUploads(
14231423
purgeDuration.getSeconds(),
14241424
"",
1425-
maxKeys);
1425+
maxKeys,
1426+
createStoreContext());
14261427
} catch (AccessDeniedException e) {
14271428
instrumentation.errorIgnored();
14281429
LOG.debug("Failed to purge multipart uploads against {}," +
@@ -3208,38 +3209,6 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
32083209
return getStore().putObject(putObjectRequest, file, listener);
32093210
}
32103211

3211-
/**
3212-
* At the start of a put/multipart upload operation, update the
3213-
* relevant counters.
3214-
*
3215-
* @param bytes bytes in the request.
3216-
*/
3217-
protected void incrementPutStartStatistics(long bytes) {
3218-
getStore().incrementPutStartStatistics(bytes);
3219-
}
3220-
3221-
/**
3222-
* At the end of a put/multipart upload operation, update the
3223-
* relevant counters and gauges.
3224-
*
3225-
* @param success did the operation succeed?
3226-
* @param bytes bytes in the request.
3227-
*/
3228-
protected void incrementPutCompletedStatistics(boolean success, long bytes) {
3229-
getStore().incrementPutCompletedStatistics(success, bytes);
3230-
}
3231-
3232-
/**
3233-
* Callback for use in progress callbacks from put/multipart upload events.
3234-
* Increments those statistics which are expected to be updated during
3235-
* the ongoing upload operation.
3236-
* @param key key to file that is being written (for logging)
3237-
* @param bytes bytes successfully uploaded.
3238-
*/
3239-
protected void incrementPutProgressStatistics(String key, long bytes) {
3240-
getStore().incrementPutProgressStatistics(key, bytes);
3241-
}
3242-
32433212
/**
32443213
* Delete a list of keys on a s3-backend.
32453214
* Retry policy: retry untranslated; delete considered idempotent.
@@ -4532,7 +4501,7 @@ private void createEmptyObject(final String objectName, PutObjectOptions putOpti
45324501
putOptions,
45334502
uploadData,
45344503
getDurationTrackerFactory()));
4535-
incrementPutProgressStatistics(objectName, 0);
4504+
incrementWriteOperations();
45364505
instrumentation.directoryCreated();
45374506
}
45384507

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ default MultipartIOService getMultipartIO() {
208208

209209
/**
210210
* Given a possibly null duration tracker factory, return a non-null
211-
* one for use in tracking durations -either that or the FS tracker
211+
* one for use in tracking durations -either that or the store tracker
212212
* itself.
213213
*
214214
* @param factory factory.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultipartIO.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,16 @@ AbortMultipartUploadResponse abortMultipartUpload(String destKey, String uploadI
109109
throws AwsServiceException;
110110

111111
/**
112-
* Listing and abort all multipart uploads.
112+
* List and abort all multipart uploads older than a specified age.
113+
* @param seconds age of multiparts to abort.
113114
* @param prefix prefix to scan for, "" for none
114-
* @return a listing of multipart uploads.
115+
* @param maxKeys maximum number of keys to list and abort
116+
* @param context store context to use
115117
* @throws IOException IO failure, including any uprated SdkException
116118
*/
117119
@Retries.RetryTranslated
118-
void abortOutstandingMultipartUploads(long seconds, @Nullable String prefix, int maxKeys)
120+
void abortOutstandingMultipartUploads(long seconds, @Nullable String prefix, int maxKeys,
121+
StoreContext context)
119122
throws IOException;
120123

121124
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultipartIOService.java

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@
4949
import org.apache.hadoop.fs.s3a.Invoker;
5050
import org.apache.hadoop.fs.s3a.Retries;
5151
import org.apache.hadoop.fs.s3a.S3AStore;
52+
import org.apache.hadoop.fs.s3a.api.IORateLimiting;
5253
import org.apache.hadoop.fs.s3a.api.RequestFactory;
5354
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
5455
import org.apache.hadoop.fs.store.audit.AuditSpan;
5556
import org.apache.hadoop.service.AbstractService;
5657
import org.apache.hadoop.util.Preconditions;
5758

59+
import static java.util.Objects.requireNonNull;
5860
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
5961
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT;
6062
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
@@ -63,25 +65,69 @@
6365
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
6466
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
6567
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
68+
import static org.apache.hadoop.util.Preconditions.checkArgument;
6669

70+
/**
71+
* All Multipart IO Operations.
72+
* The service is not ready to use until {@link #bind(S3AStore, ClientManager, IORateLimiting)}
73+
* is invoked and the service started.
74+
*/
6775
public class MultipartIOService extends AbstractService
6876
implements MultipartIO {
6977

7078
private static final Logger LOG =
7179
LoggerFactory.getLogger(MultipartIOService.class);
7280

81+
/**
82+
* Store for some statistics invocations.
83+
*/
7384
private S3AStore store;
7485

86+
/**
87+
* Rate limiter (likely to be the store)
88+
*/
89+
private IORateLimiting limiting;
90+
91+
/**
92+
* SDK client.
93+
*/
7594
private ClientManager clientManager;
7695

96+
/**
97+
* Create the Service with the service name {@link #MULTIPART_IO}.
98+
*/
7799
public MultipartIOService() {
78-
super(MULTIPART_IO);
100+
this(MULTIPART_IO);
79101
}
80102

103+
/**
104+
* Constructor.
105+
* @param name service name
106+
*/
107+
public MultipartIOService(final String name) {
108+
super(name);
109+
}
81110

82-
public void bind(final S3AStore store, ClientManager clientManager) {
83-
this.store = store;
111+
/**
112+
* Bind to dependencies.
113+
* This MUST be called before service start
114+
* @param store store
115+
* @param clientManager sdk client manager
116+
* @param limiting rate limiting.
117+
*/
118+
public void bind(final S3AStore store,
119+
ClientManager clientManager,
120+
IORateLimiting limiting) {
121+
this.store = requireNonNull(store);
84122
this.clientManager = clientManager;
123+
this.limiting = store;
124+
}
125+
126+
127+
@Override
128+
protected void serviceStart() throws Exception {
129+
super.serviceStart();
130+
requireNonNull(store);
85131
}
86132

87133
/**
@@ -104,7 +150,6 @@ public CreateMultipartUploadResponse initiateMultipartUpload(
104150
() -> getS3ClientUnchecked().createMultipartUpload(request));
105151
}
106152

107-
108153
@Retries.OnceRaw
109154
@Override
110155
public UploadPartResponse uploadPart(
@@ -158,26 +203,24 @@ public AbortMultipartUploadResponse abortMultipartUpload(String destKey, String
158203
checkRunning();
159204
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
160205
return trackDurationOfSupplier(store.getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> {
161-
store.acquireWriteCapacity(InternalConstants.MULTIPART_ABORT_WRITE_CAPACITY);
206+
limiting.acquireWriteCapacity(InternalConstants.MULTIPART_ABORT_WRITE_CAPACITY);
162207
return getS3ClientUnchecked().abortMultipartUpload(
163208
store.getRequestFactory().newAbortMultipartUploadRequestBuilder(
164209
destKey,
165210
uploadId).build());
166211
});
167212
}
168213

169-
170214
@Retries.RetryTranslated
171215
@Override
172-
public void abortOutstandingMultipartUploads(long seconds, @Nullable String prefix, int maxKeys)
216+
public void abortOutstandingMultipartUploads(long seconds, @Nullable String prefix, int maxKeys,
217+
StoreContext context)
173218
throws IOException {
174-
Preconditions.checkArgument(seconds >= 0);
219+
checkArgument(seconds >= 0);
175220
checkRunning();
176-
Instant purgeBefore =
177-
Instant.now().minusSeconds(seconds);
221+
Instant purgeBefore = Instant.now().minusSeconds(seconds);
178222
LOG.debug("Purging outstanding multipart uploads older than {}",
179223
purgeBefore);
180-
final StoreContext context = store.createStoreContext();
181224
context.getInvoker().retry("Purging multipart uploads",
182225
context.getBucket(), true,
183226
() -> {
@@ -208,7 +251,7 @@ public List<MultipartUpload> listMultipartUploads(@Nullable String prefix)
208251
final ListMultipartUploadsRequest request = store.getRequestFactory()
209252
.newListMultipartUploadsRequestBuilder(p).build();
210253
return trackDuration(store.getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () -> {
211-
store.acquireReadCapacity(InternalConstants.MULTIPART_LIST_READ_CAPACITY);
254+
limiting.acquireReadCapacity(InternalConstants.MULTIPART_LIST_READ_CAPACITY);
212255
return getS3ClientUnchecked().listMultipartUploads(request).uploads();
213256
});
214257
});
@@ -258,7 +301,6 @@ public class ListingIterator implements
258301

259302
private final int maxKeys;
260303

261-
262304
private final Invoker invoker;
263305

264306
private final AuditSpan auditSpan;
@@ -345,8 +387,8 @@ private void requestNextBatch() throws IOException {
345387
checkRunning();
346388

347389
try (AuditSpan span = auditSpan.activate()) {
348-
ListMultipartUploadsRequest.Builder requestBuilder = requestFactory
349-
.newListMultipartUploadsRequestBuilder(prefix);
390+
ListMultipartUploadsRequest.Builder requestBuilder =
391+
requestFactory.newListMultipartUploadsRequestBuilder(prefix);
350392
if (!firstListing) {
351393
requestBuilder.keyMarker(listing.nextKeyMarker());
352394
requestBuilder.uploadIdMarker(listing.nextUploadIdMarker());
@@ -364,7 +406,7 @@ private void requestNextBatch() throws IOException {
364406
trackDurationOfOperation(storeContext.getInstrumentation(),
365407
OBJECT_MULTIPART_UPLOAD_LIST.getSymbol(),
366408
() -> {
367-
store.acquireReadCapacity(InternalConstants.MULTIPART_LIST_READ_CAPACITY);
409+
limiting.acquireReadCapacity(InternalConstants.MULTIPART_LIST_READ_CAPACITY);
368410
return getS3ClientUnchecked().listMultipartUploads(requestBuilder.build());
369411
}));
370412
LOG.debug("Listing found {} upload(s)",
@@ -375,7 +417,7 @@ private void requestNextBatch() throws IOException {
375417
}
376418

377419
private S3Client getS3ClientUnchecked() {
378-
return store.getOrCreateS3ClientUnchecked();
420+
return clientManager.getOrCreateS3ClientUnchecked();
379421
}
380422

381423
/**

0 commit comments

Comments
 (0)