diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 000000000..59aac7e10 Binary files /dev/null and b/.DS_Store differ diff --git a/api/src/main/java/io/minio/MinioAsyncClient.java b/api/src/main/java/io/minio/MinioAsyncClient.java index f72e52622..8a68fbd7c 100644 --- a/api/src/main/java/io/minio/MinioAsyncClient.java +++ b/api/src/main/java/io/minio/MinioAsyncClient.java @@ -84,6 +84,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.regex.Matcher; import okhttp3.HttpUrl; import okhttp3.MultipartBody; @@ -149,7 +151,8 @@ private MinioAsyncClient( String region, Provider provider, OkHttpClient httpClient, - boolean closeHttpClient) { + boolean closeHttpClient, + ExecutorService executorService) { super( baseUrl, awsS3Prefix, @@ -159,7 +162,8 @@ private MinioAsyncClient( region, provider, httpClient, - closeHttpClient); + closeHttpClient, + executorService); } protected MinioAsyncClient(MinioAsyncClient client) { @@ -462,7 +466,7 @@ public CompletableFuture copyObject(CopyObjectArgs args) args.validateSse(this.baseUrl); return CompletableFuture.supplyAsync( - () -> args.source().offset() != null && args.source().length() != null) + () -> args.source().offset() != null && args.source().length() != null, executorService) .thenCompose( condition -> { if (condition) { @@ -677,7 +681,7 @@ public CompletableFuture composeObject(ComposeObjectArgs ar Multimap headers = newMultimap(args.extraHeaders()); headers.putAll(args.genHeaders()); return headers; - }) + }, executorService) .thenCompose( headers -> { try { @@ -715,7 +719,7 @@ public CompletableFuture composeObject(ComposeObjectArgs ar CompletableFuture.supplyAsync( () -> { return new Part[partCount[0]]; - }); + }, executorService); for (ComposeSource src : sources) { long size = 0; try { @@ -3491,7 +3495,7 @@ public CompletableFuture uploadSnowballObjects( } } return baos; - }) + }, executorService) .thenCompose( baos -> { Multimap headers = newMultimap(args.extraHeaders()); @@ -3721,6 +3725,7 @@ public static final class Builder { private Provider provider; private OkHttpClient httpClient; private boolean closeHttpClient; + private ExecutorService executorService = ForkJoinPool.commonPool(); private void setAwsInfo(String host, boolean https) { this.awsS3Prefix = null; @@ -3834,6 +3839,11 @@ public Builder httpClient(OkHttpClient httpClient, boolean close) { return this; } + public Builder executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + public MinioAsyncClient build() { HttpUtils.validateNotNull(this.baseUrl, "endpoint"); @@ -3861,7 +3871,8 @@ public MinioAsyncClient build() { region, provider, httpClient, - closeHttpClient); + closeHttpClient, + executorService()); } } } diff --git a/api/src/main/java/io/minio/S3Base.java b/api/src/main/java/io/minio/S3Base.java index 324637ccb..52224e0a8 100644 --- a/api/src/main/java/io/minio/S3Base.java +++ b/api/src/main/java/io/minio/S3Base.java @@ -87,6 +87,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -147,6 +148,7 @@ public abstract class S3Base implements AutoCloseable { protected Provider provider; protected OkHttpClient httpClient; protected boolean closeHttpClient; + protected final ExecutorService executorService; /** @deprecated This method is no longer supported. */ @Deprecated @@ -171,6 +173,20 @@ protected S3Base( false); } + protected S3Base( + HttpUrl baseUrl, + String awsS3Prefix, + String awsDomainSuffix, + boolean awsDualstack, + boolean useVirtualStyle, + String region, + Provider provider, + OkHttpClient httpClient, + boolean closeHttpClient) { + this(baseUrl, awsS3Prefix, awsDomainSuffix, awsDualstack, useVirtualStyle, region, provider, httpClient, closeHttpClient); + this.executorService = ForkJoinPool.commonPool(); + } + protected S3Base( HttpUrl baseUrl, String awsS3Prefix, @@ -180,7 +196,8 @@ protected S3Base( String region, Provider provider, OkHttpClient httpClient, - boolean closeHttpClient) { + boolean closeHttpClient, + ExecutorService executorService) { this.baseUrl = baseUrl; this.awsS3Prefix = awsS3Prefix; this.awsDomainSuffix = awsDomainSuffix; @@ -190,6 +207,7 @@ protected S3Base( this.provider = provider; this.httpClient = httpClient; this.closeHttpClient = closeHttpClient; + this.executorService = executorService; } /** @deprecated This method is no longer supported. */ @@ -231,6 +249,7 @@ protected S3Base(S3Base client) { this.provider = client.provider; this.httpClient = client.httpClient; this.closeHttpClient = client.closeHttpClient; + this.executorService = client.executorService; } /** Check whether argument is valid or not. */ @@ -1204,7 +1223,8 @@ protected CompletableFuture calculatePartCountAsync(List long[] objectSize = {0}; int index = 0; - CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 0); + CompletableFuture completableFuture = + CompletableFuture.supplyAsync(() -> 0, executorService); for (ComposeSource src : sources) { index++; final int i = index; @@ -2947,7 +2967,8 @@ private CompletableFuture putMultipartObjectAsync( throw new CompletionException(throwable); } return response; - }); + }, + executorService); } /** @@ -2993,7 +3014,8 @@ protected CompletableFuture putObjectAsync( } catch (NoSuchAlgorithmException | IOException e) { throw new CompletionException(e); } - }) + }, + executorService) .thenCompose( partSource -> { try {