Skip to content

Commit

Permalink
Disabling parallelProtection when its not needed (pushParallelism of …
Browse files Browse the repository at this point in the history
…1) (#14923)
  • Loading branch information
swaminathanmanish authored Jan 27, 2025
1 parent 9e561a2 commit 3baeb92
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pinot.common.utils.http.HttpClientConfig;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
Expand Down Expand Up @@ -1264,6 +1265,14 @@ public static List<NameValuePair> makeTableParam(String tableName) {
return tableParams;
}

public static NameValuePair makeParallelProtectionParam(PushJobSpec jobSpec) {
String enableParallelProtection = jobSpec.getPushParallelism() > 1 ? "true" : "false";
NameValuePair parallelProtectionParam =
new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
enableParallelProtection);
return parallelProtectionParam;
}

@Override
public void close()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
List<Header> headers = AuthProviderUtils.toRequestHeaders(authProvider);
List<NameValuePair> parameters = FileUploadDownloadClient.makeTableParam(tableName);
PushJobSpec pushJobSpec = spec.getPushJobSpec();
parameters.add(FileUploadDownloadClient.makeParallelProtectionParam(pushJobSpec));
if (pushJobSpec != null && pushJobSpec.isBatchSegmentUpload()) {
// segments are uploaded in batch when batch mode is enabled.
sendSegmentsUriAndMetadata(spec, fileSystem, segmentUriToTarPathMap, headers, parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -53,6 +57,21 @@ public void tearDown() throws IOException {
FileUtils.deleteDirectory(_tempDir);
}

@Test
public void testSegmentParallelProtectionUploadParam() {
SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
PushJobSpec pushJobSpec = new PushJobSpec();
NameValuePair nameValuePair = FileUploadDownloadClient.makeParallelProtectionParam(pushJobSpec);
Assert.assertEquals(nameValuePair.getName(),
FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION);
Assert.assertEquals(nameValuePair.getValue(), "false");
pushJobSpec.setPushParallelism(2);
nameValuePair = FileUploadDownloadClient.makeParallelProtectionParam(pushJobSpec);
Assert.assertEquals(nameValuePair.getName(),
FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION);
Assert.assertEquals(nameValuePair.getValue(), "true");
}

@Test
public void testGetSegmentUriToTarPathMap()
throws IOException {
Expand Down

0 comments on commit 3baeb92

Please sign in to comment.