Skip to content

Commit

Permalink
[#291] feat(client): Introduce PrefetchableClientReadHandler to suppo…
Browse files Browse the repository at this point in the history
…rt async read (#2365)

### What changes were proposed in this pull request?

1. Introduce PrefetchableClientReadHandler to support async read. And this will be disabled by default.
2. Apply for the memory/localfile/hdfs read handler

### Why are the changes needed?

Recently I found some important spark jobs are slow due to the lots of shuffle read operations. If we could support async read, the job's performance will be improved. 

So this PR is the callback for #291. almost 3 years ago!

### Does this PR introduce _any_ user-facing change?

Yes. Some configs are introduced
1. `rss.client.read.prefetch.enabled`
2. `rss.client.read.prefetch.capacity`
3. `rss.client.read.prefetch.timeoutSec`

### How was this patch tested?

1. Unit tests

---------

Co-authored-by: Junfan Zhang <[email protected]>
  • Loading branch information
zuston and Junfan Zhang authored Feb 8, 2025
1 parent 47e5d17 commit 81e00be
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,22 @@ public class RssClientConf {
.intType()
.defaultValue(-1)
.withDescription("the event loop threads of netty impl for grpc");

public static final ConfigOption<Boolean> RSS_CLIENT_PREFETCH_ENABLED =
ConfigOptions.key("rss.client.read.prefetch.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Read prefetch switch that will be disabled by default");

public static final ConfigOption<Integer> RSS_CLIENT_PREFETCH_CAPACITY =
ConfigOptions.key("rss.client.read.prefetch.capacity")
.intType()
.defaultValue(4)
.withDescription("Read prefetch capacity");

public static final ConfigOption<Integer> READ_CLIENT_PREFETCH_TIMEOUT_SEC =
ConfigOptions.key("rss.client.read.prefetch.timeoutSec")
.intType()
.defaultValue(120)
.withDescription("Read prefetch timeout seconds");
}
4 changes: 4 additions & 0 deletions docs/client_guide/client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ The important configuration of client is listed as following. These configuratio
| <client_type>.rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids |
| <client_type>.rss.client.reportExcludeProperties | - | The value of exclude properties specify a list of client configuration properties that should not be reported to the coordinator by the DelegationRssShuffleManager. |
| <client_type>.rss.client.reportIncludeProperties | - | The value of include properties specify a list of client configuration properties that should be exclusively reported to the coordinator by the DelegationRssShuffleManager. |
| <client_type>.rss.client.read.prefetch.enabled | false | Read prefetch switch that will be disabled by default |
| <client_type>.rss.client.read.prefetch.capacity | 4 | Read prefetch capacity |
| <client_type>.rss.client.read.prefetch.timeoutSec | 120 | Read prefetch timeout seconds |


Notice:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private ClientReadHandler getMemoryClientReadHandler(
shuffleServerClient,
expectTaskIds,
request.getRetryMax(),
request.getRetryIntervalMax());
request.getRetryIntervalMax(),
request.getPrefetchOption());
return memoryClientReadHandler;
}

Expand All @@ -159,7 +160,8 @@ private ClientReadHandler getLocalfileClientReaderHandler(
request.getDistributionType(),
request.getExpectTaskIds(),
request.getRetryMax(),
request.getRetryIntervalMax());
request.getRetryIntervalMax(),
request.getPrefetchOption());
}

private ClientReadHandler getHadoopClientReadHandler(
Expand All @@ -179,7 +181,8 @@ private ClientReadHandler getHadoopClientReadHandler(
request.getDistributionType(),
request.getExpectTaskIds(),
ssi.getId(),
request.isOffHeapEnabled());
request.isOffHeapEnabled(),
request.getPrefetchOption());
}

public ShuffleDeleteHandler createShuffleDeleteHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.uniffle.storage.handler.impl;

import java.util.List;
import java.util.Optional;

import com.google.common.collect.Lists;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
Expand All @@ -30,7 +31,7 @@
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.segment.SegmentSplitterFactory;

public abstract class DataSkippableReadHandler extends AbstractClientReadHandler {
public abstract class DataSkippableReadHandler extends PrefetchableClientReadHandler {
private static final Logger LOG = LoggerFactory.getLogger(DataSkippableReadHandler.class);

protected List<ShuffleDataSegment> shuffleDataSegments = Lists.newArrayList();
Expand All @@ -50,7 +51,9 @@ public DataSkippableReadHandler(
Roaring64NavigableMap expectBlockIds,
Roaring64NavigableMap processBlockIds,
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds) {
Roaring64NavigableMap expectTaskIds,
Optional<PrefetchOption> prefetchOption) {
super(prefetchOption);
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
Expand All @@ -66,7 +69,7 @@ public DataSkippableReadHandler(
protected abstract ShuffleDataResult readShuffleData(ShuffleDataSegment segment);

@Override
public ShuffleDataResult readShuffleData() {
public ShuffleDataResult doReadShuffleData() {
if (shuffleDataSegments.isEmpty()) {
ShuffleIndexResult shuffleIndexResult = readShuffleIndex();
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class HadoopClientReadHandler extends AbstractClientReadHandler {
private ShuffleDataDistributionType distributionType;
private Roaring64NavigableMap expectTaskIds;
private boolean offHeapEnable = false;
private Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption;

public HadoopClientReadHandler(
String appId,
Expand All @@ -71,7 +73,8 @@ public HadoopClientReadHandler(
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds,
String shuffleServerId,
boolean offHeapEnable) {
boolean offHeapEnable,
Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
Expand All @@ -87,6 +90,7 @@ public HadoopClientReadHandler(
this.expectTaskIds = expectTaskIds;
this.shuffleServerId = shuffleServerId;
this.offHeapEnable = offHeapEnable;
this.prefetchOption = prefetchOption;
}

// Only for test
Expand Down Expand Up @@ -117,7 +121,8 @@ public HadoopClientReadHandler(
ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf(),
null,
false);
false,
Optional.empty());
}

protected void init(String fullShufflePath) {
Expand Down Expand Up @@ -174,7 +179,8 @@ protected void init(String fullShufflePath) {
hadoopConf,
distributionType,
expectTaskIds,
offHeapEnable);
offHeapEnable,
prefetchOption);
readHandlers.add(handler);
} catch (Exception e) {
LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -57,7 +58,8 @@ public HadoopShuffleReadHandler(
Configuration conf,
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds,
boolean offHeapEnabled)
boolean offHeapEnabled,
Optional<PrefetchOption> prefetchOption)
throws Exception {
super(
appId,
Expand All @@ -67,7 +69,8 @@ public HadoopShuffleReadHandler(
expectBlockIds,
processBlockIds,
distributionType,
expectTaskIds);
expectTaskIds,
prefetchOption);
this.filePrefix = filePrefix;
this.indexReader =
createHadoopReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf);
Expand Down Expand Up @@ -98,7 +101,8 @@ public HadoopShuffleReadHandler(
conf,
ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf(),
false);
false,
Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.uniffle.storage.handler.impl;

import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -55,7 +57,8 @@ public LocalFileClientReadHandler(
ShuffleDataDistributionType distributionType,
Roaring64NavigableMap expectTaskIds,
int retryMax,
long retryIntervalMax) {
long retryIntervalMax,
Optional<PrefetchOption> prefetchOption) {
super(
appId,
shuffleId,
Expand All @@ -64,7 +67,8 @@ public LocalFileClientReadHandler(
expectBlockIds,
processBlockIds,
distributionType,
expectTaskIds);
expectTaskIds,
prefetchOption);
this.shuffleServerClient = shuffleServerClient;
this.partitionNumPerRange = partitionNumPerRange;
this.partitionNum = partitionNum;
Expand Down Expand Up @@ -98,7 +102,8 @@ public LocalFileClientReadHandler(
ShuffleDataDistributionType.NORMAL,
Roaring64NavigableMap.bitmapOf(),
1,
0);
0,
Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.uniffle.storage.handler.impl;

import java.util.List;
import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
Expand All @@ -32,7 +33,7 @@
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.Constants;

public class MemoryClientReadHandler extends AbstractClientReadHandler {
public class MemoryClientReadHandler extends PrefetchableClientReadHandler {

private static final Logger LOG = LoggerFactory.getLogger(MemoryClientReadHandler.class);
private long lastBlockId = Constants.INVALID_BLOCK_ID;
Expand All @@ -49,7 +50,9 @@ public MemoryClientReadHandler(
ShuffleServerClient shuffleServerClient,
Roaring64NavigableMap expectTaskIds,
int retryMax,
long retryIntervalMax) {
long retryIntervalMax,
Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
super(prefetchOption);
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
Expand All @@ -68,11 +71,20 @@ public MemoryClientReadHandler(
int readBufferSize,
ShuffleServerClient shuffleServerClient,
Roaring64NavigableMap expectTaskIds) {
this(appId, shuffleId, partitionId, readBufferSize, shuffleServerClient, expectTaskIds, 1, 0);
this(
appId,
shuffleId,
partitionId,
readBufferSize,
shuffleServerClient,
expectTaskIds,
1,
0,
Optional.empty());
}

@Override
public ShuffleDataResult readShuffleData() {
public ShuffleDataResult doReadShuffleData() {
ShuffleDataResult result = null;

RssGetInMemoryShuffleDataRequest request =
Expand Down
Loading

0 comments on commit 81e00be

Please sign in to comment.