-
Notifications
You must be signed in to change notification settings - Fork 382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1469] Support writing shuffle data to OSS(S3 only) #2579
Conversation
Hi @zhaohehuhu, there is some tools that can check codes PR locally. |
@zhaohehuhu, please add comment for the test result. |
|
|
Done. Thanks. |
Got it. Thanks. |
![]() |
Thanks for your effort. I'll complete the review within the next week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. Based on this PR, I think Celeborn worker should support using HDFS and S3 concurrently. There will need some changes.
common/pom.xml
Outdated
@@ -187,6 +187,16 @@ | |||
<artifactId>hadoop-client-runtime</artifactId> | |||
<version>${hadoop.version}</version> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.hadoop</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two dependencies should be put into LICENSE-binary and NOTICE-binary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks
common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
Show resolved
Hide resolved
@@ -160,6 +160,10 @@ public boolean HDFSOnly() { | |||
return StorageInfo.HDFSOnly(availableStorageTypes); | |||
} | |||
|
|||
public static boolean OSSOnly(int availableStorageTypes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think this can be changed to S3Only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks
@@ -1106,6 +1110,49 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se | |||
def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE) | |||
def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE) | |||
|
|||
def s3AccessKey: String = get(S3_ACCESS_KEY).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get(S3_ACCESS_KEY)
This method won't get an empty string. So this check is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use getOrElse("")
could be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks
} | ||
}.getOrElse("") | ||
|
||
def s3SecretKey: String = get(S3_SECRET_KEY).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks
keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) { | ||
override def flush(): Unit = { | ||
if (StorageManager.hadoopFs.exists(path)) { | ||
val conf = StorageManager.hadoopFs.getConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any difference for append between S3 and HDFS? What are the benefits of this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. The main reason is S3 doesn't support append mode yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaohehuhu Is there another approach to support integrated a new Filesystem
like s3 which cannot support append
? Looks it is Inefficient, it copy the old data from s3 to worker and write to s3 again, this can scale out the write extremely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. The current implementation is just a workaround to solve the limitation that S3 doesn't support append mode. I'm figure out a better solution to avoid copy-and-write. @maobaolong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaohehuhu That's great, look forward to know more about the new solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @maobaolong
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
Show resolved
Hide resolved
@@ -56,7 +56,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs | |||
// mount point -> file writer | |||
val workingDirWriters = | |||
JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String, PartitionDataWriter]]() | |||
val hdfsWriters = JavaUtils.newConcurrentHashMap[String, PartitionDataWriter]() | |||
val dfsWriters = JavaUtils.newConcurrentHashMap[String, PartitionDataWriter]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not merge. Split into two writers map.
throw new IOException("Empty working directory configuration!") | ||
} | ||
|
||
DeviceInfo.getDeviceAndDiskInfos(workingDirInfos, conf) | ||
} | ||
val mountPoints = new util.HashSet[String](diskInfos.keySet()) | ||
val hdfsDiskInfo = | ||
val dfsDiskInfo = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just add a new S3 diskinfo.
val (hdfsFlusher, _totalHdfsFlusherThread) = | ||
if (hasHDFSStorage) { | ||
logInfo(s"Initialize HDFS support with path $hdfsDir") | ||
val (dfsFlusher, _totalDfsFlusherThread) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@zhaohehuhu Thanks for your effort. The review will be done within this week. |
The public cloud vendor's client jars size is crazy, especially aws(v1 300MiB+, v2 500MiB+), we should not ship them by default. |
You can fix the style issues and license issues using the following commands ./dev/reformat
build/mvn org.apache.rat:apache-rat-plugin:check -Pgoogle-mirror,spark-3.3 |
got it. |
OK. Thanks |
d40e3d5
to
b24d3f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your effort. But this PR still needs further changes.
pom.xml
Outdated
@@ -71,6 +71,7 @@ | |||
|
|||
<!-- use hadoop-3 as default --> | |||
<hadoop.version>3.3.6</hadoop.version> | |||
<aws.version>1.12.367</aws.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This version can be moved to the hadoop-aws profile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Done
build/make-distribution.sh
Outdated
|
||
BUILD_COMMAND=("$SBT" clean package) | ||
BUILD_COMMAND=("$SBT" clean package "$PROFILE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes can be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
./build/make-distribution.sh --sbt-enabled -Pspark-3.3,hadoop-aws maybe doesn't work as expected due to above command.
@@ -85,6 +87,13 @@ public DfsPartitionReader( | |||
|
|||
this.metricsCallback = metricsCallback; | |||
this.location = location; | |||
FileSystem hadoopFs = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A dfs partition reader will read one partition location only. So the hadoopFS can be cached as a variable in this class. This can eliminate the unnecessary condition blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Done
// create a DataStreamer that is a thread. | ||
// If we reuse HDFS output stream, we will exhaust the memory soon. | ||
// If we reuse DFS output stream, we will exhaust the memory soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the AWS Hadoop has some optimization about this point.
this.flusherBufferSize = localFlusherBufferSize; | ||
channel = FileChannelUtils.createWritableFileChannel(this.diskFileInfo.getFilePath()); | ||
} else { | ||
this.flusherBufferSize = hdfsFlusherBufferSize; | ||
// We open the stream and close immediately because HDFS output stream will | ||
FileSystem hadoopFs = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep this hadoopFs as a class variable. No need to do so many conditions to get the corresponding hadoopFS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Done
...er/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
Show resolved
Hide resolved
@@ -673,11 +690,17 @@ class FileSorter { | |||
indexFile.delete(); | |||
} | |||
} else { | |||
if (StorageManager.hadoopFs().exists(fileInfo.getHdfsSortedPath())) { | |||
StorageManager.hadoopFs().delete(fileInfo.getHdfsSortedPath(), false); | |||
FileSystem hadoopFs = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Done
if (StorageManager.hadoopFs() | ||
.exists(diskFileInfo.getHdfsPeerWriterSuccessPath())) { | ||
StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(), false); | ||
if (diskFileInfo.isDFS()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Done
@@ -307,6 +308,7 @@ org.slf4j:jcl-over-slf4j | |||
org.webjars:swagger-ui | |||
org.xerial.snappy:snappy-java | |||
org.yaml:snakeyaml | |||
com.amazonaws:aws-java-sdk-bundle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add dependency in alphabetical order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Except a nit.
hdfsStream.write(indexBuffer.array()); | ||
hdfsStream.close(); | ||
} else if (diskFileInfo.isDFS()) { | ||
FSDataOutputStream dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe for S3? As far as I know, S3 doesn't support append.
Merged into main(v0.6.0) |
### What changes were proposed in this pull request? as title ### Why are the changes needed? Now, Celeborn doesn't support sinking shuffle data directly to Amazon S3, which could be a limitation when we're trying to move on-premises servers to AWS and use S3 as a data sink for shuffled data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes apache#2579 from zhaohehuhu/dev-0619. Authored-by: zhaohehuhu <[email protected]> Signed-off-by: mingji <[email protected]>
What changes were proposed in this pull request?
as title
Why are the changes needed?
Now, Celeborn doesn't support sinking shuffle data directly to Amazon S3, which could be a limitation when we're trying to move on-premises servers to AWS and use S3 as a data sink for shuffled data.
Does this PR introduce any user-facing change?
No
How was this patch tested?