Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1894] Allow skipping already read chunks during unreplicated shuffle read retried #3132

Closed
wants to merge 14 commits into from

Conversation

saurabhd336
Copy link
Contributor

What changes were proposed in this pull request?

Whenever a WorkerPartitionReader is recreated (due celeborn worker restarts / any other chunk fetch failure), the entire shuffle partition file is re-read from beginning, discarding already read chunks in CelebornInputStream based on the batchIdSet metadata maintained.

This can be improved (only for cases where shuffle data is unreplicated) by skipping already read chunk id since they'd be discarded anyway. This improves overall shuffle read performance (reducer's total time, network usage etc).

Why are the changes needed?

Allow skipping already read shuffle chunks

Does this PR introduce any user-facing change?

No

How was this patch tested?

UTs added

Copy link
Contributor

@zaynt4606 zaynt4606 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz run dev/reformat for [Style check]
and UPDATE=1 build/mvn clean test -pl common -am -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite for configuration change~

Copy link
Contributor

@s0nskar s0nskar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saurabhd336 there seems to be some issues with the tests.

shutdownMiniCluster()
}

test(s"test MiniCluster with connection resets, ensure no duplicate reads") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a negative case to show that there will be duplicated reads if feature is disable. Also, a case where replication is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

val WORKER_PARTITION_READER_CHECKPOINT_ENABLE: ConfigEntry[Boolean] =
buildConf("celeborn.worker.partition.reader.checkpointEnabled")
.categories("client")
.version("0.5.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.version("0.5.0")
.version("0.6.0")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@SteNicholas
Copy link
Member

@saurabhd336, please create new JIRA ticket for this pull request.

@saurabhd336
Copy link
Contributor Author

@zaynt4606 @s0nskar @SteNicholas Thanks for helping review this. I've fixed the tests + lint issues.
@SteNicholas could you please help me how to create a celeborn ticket for this change?

@SteNicholas
Copy link
Member

@saurabhd336, you could refer to #1053 for creating new JIRA tiket.

@s0nskar s0nskar changed the title Allow skipping already read chunks during unreplicated shuffle read retried [CELEBORN-1894] Allow skipping already read chunks during unreplicated shuffle read retried Mar 5, 2025
@s0nskar
Copy link
Contributor

s0nskar commented Mar 5, 2025

@saurabhd336 had some trouble with the JIRA account, so created one –  CELEBORN-1894

+ " likely by a previous reader for the same partition.",
chunkIndex);
chunkIndex++;
returnedChunks++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why toFetch dont need to decrease if toFetch is fetchMaxReqsInFlight - inFlight + 1 when chunkIdsAlreadyReturned.contains(chunkIndex)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I read toFetch, it seems like it's trying to ensure no more than fetchMaxReqsInFlight requests are submitted at once, while ensuring we fetch as many chunks as possible at once.

My thought process is that if we're skipping certain chunks, we could instead fetch other chunks in the list. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, come to think of it, incrementing toFetch here would be wrong and cause an infinite wait. I added a comment explaining it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.
The returnedChunks increased and the chunk actually to fetch has no change.

@@ -982,6 +982,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def isWorkerPartitionReaderCheckpointEnabled: Boolean =
get(WORKER_PARTITION_READER_CHECKPOINT_ENABLE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about disable this conf when replica enable.
def isWorkerPartitionReaderCheckpointEnabled: Boolean = if (clientPushReplicateEnabled) { false } else { get(WORKER_PARTITION_READER_CHECKPOINT_ENABLE) }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Ack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zaynt4606 It is possible to fallback to reading from the same server even when replication is enabled. Eg: when celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled is set to true and partitions are being split. I'd let CelebornInputStream itself decide whether or not to restore checkpoint when creating a reader

results.forEach(ReferenceCounted::release);
results.forEach(
chunk -> {
chunk.getRight().release(); //
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty annotation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Exception lastException = null;
PartitionReader reader = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could move this definition into line 437? The catch blocker does not use this variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

boolean hasNext();

ByteBuf next() throws IOException, InterruptedException;

void close();

PartitionLocation getLocation();

default T getPartitionReaderCheckpointMetadata() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this method has default implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought of implementing it only for the WorkerPartitionReader, but I can add no-op impl for other types of readers, and can take up their implementation in a subsequent PR

@@ -4691,6 +4698,15 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64k")

val WORKER_PARTITION_READER_CHECKPOINT_ENABLE: ConfigEntry[Boolean] =
buildConf("celeborn.worker.partition.reader.checkpointEnabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buildConf("celeborn.worker.partition.reader.checkpointEnabled")
buildConf("celeborn.worker.partition.reader.checkpoint.enabled")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -4691,6 +4698,15 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64k")

val WORKER_PARTITION_READER_CHECKPOINT_ENABLE: ConfigEntry[Boolean] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val WORKER_PARTITION_READER_CHECKPOINT_ENABLE: ConfigEntry[Boolean] =
val WORKER_PARTITION_READER_CHECKPOINT_ENABLED: ConfigEntry[Boolean] =

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

import java.util.Set;

/** Checkpoint metadata for a partition reader on the worker side. */
public class WorkerPartitionReaderCheckpointMetadata implements PartitionReaderCheckpointMetadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class can be used in worker partition readers, DFS partition reader, and local partition readers. IMO, here is no need to extract an empty interface for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought of implementing it only for the WorkerPartitionReader, but I can add no-op impl for other types of readers, and can take up their implementation in a subsequent PR

if (clientPushReplicateEnabled) {
false
} else
get(WORKER_PARTITION_READER_CHECKPOINT_ENABLE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save it to a val to avoid repeatedly parsing this config

@saurabhd336 saurabhd336 requested a review from zaynt4606 March 10, 2025 06:21
@saurabhd336
Copy link
Contributor Author

@zaynt4606 @FMX @SteNicholas @s0nskar Review comments have been addressed PTAL!

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this optimization is generic for all partition readers.


@Override
public Optional<PartitionReaderCheckpointMetadata> getPartitionReaderCheckpointMetadata() {
// TODO implement similar to {@link WorkerPartitionReader}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That implementation is helpful for all partition readers. Why should we leave the implementation as a to-do here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had planned to implement in a followup PR, after testing this for worker partition reader. Anway, I've added the implementation for DfsPartitionReader. For LocalPartitionReader, I think the complexity of implementation outweighs the potential benefits (since for both the worker / dfs readers, its a network call to fetch the chunks, while for local reader it's just a local file buffer read)

I can take up the impl for local reader in a followup PR. wdyt?

@@ -4691,6 +4694,15 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64k")

val PARTITION_READER_CHECKPOINT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.partition.reader.checkpoint.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename the configuration to celeborn.client.partition.reader.checkpoint.enabled, since it is only for client use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -176,7 +186,8 @@ public ByteBuf next() throws IOException, InterruptedException {
throw e;
}
returnedChunks++;
return chunk;
chunkIdsAlreadyReturned.add(chunk.getLeft());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only checkpoint when isCheckpointEnabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new way of checkpointing, this is now handled

int toFetch = Math.min(fetchMaxReqsInFlight - inFlight + 1, endChunkIndex + 1 - chunkIndex);

while (toFetch > 0 && chunkIndex <= endChunkIndex) {
if (chunkIdsAlreadyReturned.contains(chunkIndex)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isCheckpointEnabled && chunkIdsAlreadyReturned.contains(chunkIndex)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now handled

@saurabhd336 saurabhd336 requested a review from RexXiong March 13, 2025 17:20
@saurabhd336
Copy link
Contributor Author

@FMX I noticed the comment regarding checkpointing only after a chunk is fully read, while i'm not sure under what cases can a returned chunk be not fully read before calling next(), I nonetheless have addressed the comment, now checkpointing last returned chunk id on each next() call. PTAL!

@FMX
Copy link
Contributor

FMX commented Mar 14, 2025

@FMX I noticed the comment regarding checkpointing only after a chunk is fully read, while i'm not sure under what cases can a returned chunk be not fully read before calling next(), I nonetheless have addressed the comment, now checkpointing last returned chunk id on each next() call. PTAL!

Hi, after some discussion, we found that if a chunk is returned and is not fully read, the spark task will fail, so this won't be a problem here.

@saurabhd336 saurabhd336 force-pushed the skipReadChunks branch 2 times, most recently from 162d4f1 to 16483dc Compare March 14, 2025 03:00
@saurabhd336
Copy link
Contributor Author

@FMX I noticed the comment regarding checkpointing only after a chunk is fully read, while i'm not sure under what cases can a returned chunk be not fully read before calling next(), I nonetheless have addressed the comment, now checkpointing last returned chunk id on each next() call. PTAL!

Hi, after some discussion, we found that if a chunk is returned and is not fully read, the spark task will fail, so this won't be a problem here.

Yes that's what I had noticed too, I've anyway changes the pr in a way that the last chunk returned is now checkpointed in the next next() call, ensuring we only checkpoint once the last returned chunk is fully read. PTAL!

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, only a nit

@Override
public ByteBuf next() throws IOException, InterruptedException {
checkException();
checkpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IMO checkpoint can be placed before checkException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense. Ack

@saurabhd336
Copy link
Contributor Author

@zaynt4606 @FMX @SteNicholas @s0nskar PTAL!

@RexXiong RexXiong closed this in 7571e10 Mar 18, 2025
@RexXiong
Copy link
Contributor

Thanks. merge to main(v0.6.0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants