Skip to content

Commit

Permalink
[CELEBORN-1337] Remove unused fields from HeartbeatFromApplicationRes…
Browse files Browse the repository at this point in the history
…ponse

as discussed in #2398, this PR removed unused fields from HeartbeatFromApplicationResponse, without adding WorkerId Type

Closes #2529 from ErikFang/remove-unused-fields-HeartbeatFromApplicationResponse.

Authored-by: Erik.fang <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
ErikFang authored and RexXiong committed Jun 13, 2024
1 parent 8e2fe74 commit 5323c1d
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ApplicationHeartbeater(
val (tmpTotalWritten, tmpTotalFileCount) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount")
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplication
val appHeartbeat =
HeartbeatFromApplication(
appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
} else {
Set.empty[WorkerInfo]
}
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of RequestSlots
val req =
RequestSlots(
appUniqueId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,10 @@ object ControlMessages extends Logging {
workersToRemove: util.List[WorkerInfo],
requestId: String): PbWorkerExclude = PbWorkerExclude.newBuilder()
.addAllWorkersToAdd(workersToAdd.asScala.map { workerInfo =>
PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
PbSerDeUtils.toPbWorkerInfo(workerInfo, true, false)
}.toList.asJava)
.addAllWorkersToRemove(workersToRemove.asScala.map { workerInfo =>
PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
PbSerDeUtils.toPbWorkerInfo(workerInfo, true, false)
}.toList.asJava)
.setRequestId(requestId)
.build()
Expand Down Expand Up @@ -407,7 +407,7 @@ object ControlMessages extends Logging {
PbRemoveWorkersUnavailableInfo.newBuilder()
.setRequestId(requestId)
.addAllWorkerInfo(unavailable.asScala.map { workerInfo =>
PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
PbSerDeUtils.toPbWorkerInfo(workerInfo, true, false)
}.toList.asJava)
.build()
}
Expand All @@ -421,7 +421,7 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setWorkerEventType(WorkerEventType.valueOf(eventType))
.addAllWorkers(workers.asScala.map { workerInfo =>
PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
PbSerDeUtils.toPbWorkerInfo(workerInfo, true, false)
}.toList.asJava)
.build()
}
Expand Down Expand Up @@ -603,7 +603,8 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setAvailableStorageTypes(availableStorageTypes)
.setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
.addAllExcludedWorkerSet(excludedWorkerSet.map(PbSerDeUtils.toPbWorkerInfo(_, true)).asJava)
.addAllExcludedWorkerSet(excludedWorkerSet.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).asJava)
.setPacked(packed)
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
Expand Down Expand Up @@ -737,7 +738,7 @@ object ControlMessages extends Logging {
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
PbSerDeUtils.toPbWorkerInfo(_, true)).toList.asJava)
PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.setShouldResponse(shouldResponse)
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION, payload)
Expand All @@ -750,11 +751,11 @@ object ControlMessages extends Logging {
val payload = PbHeartbeatFromApplicationResponse.newBuilder()
.setStatus(statusCode.getValue)
.addAllExcludedWorkers(
excludedWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true)).toList.asJava)
excludedWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.addAllUnknownWorkers(
unknownWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true)).toList.asJava)
unknownWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.addAllShuttingWorkers(
shuttingWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true)).toList.asJava)
shuttingWorkers.asScala.map(PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION_RESPONSE, payload)

Expand All @@ -775,7 +776,7 @@ object ControlMessages extends Logging {
case ReportWorkerUnavailable(failed, requestId) =>
val payload = PbReportWorkerUnavailable.newBuilder()
.addAllUnavailable(failed.asScala.map { workerInfo =>
PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
PbSerDeUtils.toPbWorkerInfo(workerInfo, true, false)
}
.toList.asJava)
.setRequestId(requestId).build().toByteArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,25 @@ object PbSerDeUtils {

def toPbWorkerInfo(
workerInfo: WorkerInfo,
eliminateUserResourceConsumption: Boolean): PbWorkerInfo = {
val diskInfos = workerInfo.diskInfos.values
val pbDiskInfos = new util.ArrayList[PbDiskInfo]()
diskInfos.asScala.foreach(diskInfo => pbDiskInfos.add(PbSerDeUtils.toPbDiskInfo(diskInfo)))
eliminateUserResourceConsumption: Boolean,
eliminateDiskInfo: Boolean): PbWorkerInfo = {
val builder = PbWorkerInfo.newBuilder
.setHost(workerInfo.host)
.setRpcPort(workerInfo.rpcPort)
.setFetchPort(workerInfo.fetchPort)
.setPushPort(workerInfo.pushPort)
.setReplicatePort(workerInfo.replicatePort)
.setInternalPort(workerInfo.internalPort)
.addAllDisks(pbDiskInfos)
if (!eliminateUserResourceConsumption) {
builder.putAllUserResourceConsumption(
PbSerDeUtils.toPbUserResourceConsumption(workerInfo.userResourceConsumption))
}
if (!eliminateDiskInfo) {
val diskInfos = workerInfo.diskInfos.values
val pbDiskInfos = new util.ArrayList[PbDiskInfo]()
diskInfos.asScala.foreach(diskInfo => pbDiskInfos.add(PbSerDeUtils.toPbDiskInfo(diskInfo)))
builder.addAllDisks(pbDiskInfos)
}
builder.build
}

Expand Down Expand Up @@ -427,12 +430,12 @@ object PbSerDeUtils {
.setEstimatedPartitionSize(estimatedPartitionSize)
.addAllRegisteredShuffle(registeredShuffle)
.addAllHostnameSet(hostnameSet)
.addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_, true)).asJava)
.addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.addAllManuallyExcludedWorkers(manuallyExcludedWorkers.asScala
.map(toPbWorkerInfo(_, true)).asJava)
.addAllWorkerLostEvents(workerLostEvent.asScala.map(toPbWorkerInfo(_, true)).asJava)
.map(toPbWorkerInfo(_, true, false)).asJava)
.addAllWorkerLostEvents(workerLostEvent.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.putAllAppHeartbeatTime(appHeartbeatTime)
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true)).asJava)
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.setPartitionTotalWritten(partitionTotalWritten)
.setPartitionTotalFileCount(partitionTotalFileCount)
// appDiskUsageMetricSnapshots can have null values,
Expand All @@ -442,7 +445,7 @@ object PbSerDeUtils {
.putAllLostWorkers(lostWorkers.asScala.map {
case (worker: WorkerInfo, time: java.lang.Long) => (worker.toUniqueId(), time)
}.asJava)
.addAllShutdownWorkers(shutdownWorkers.asScala.map(toPbWorkerInfo(_, true)).asJava)
.addAllShutdownWorkers(shutdownWorkers.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.putAllWorkerEventInfos(workerEventInfos.asScala.map {
case (worker, workerEventInfo) =>
(worker.toUniqueId(), PbSerDeUtils.toPbWorkerEventInfo(workerEventInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
}

test("fromAndToPbWorkerInfo") {
val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false)
val pbWorkerInfoWithEmptyResource = PbSerDeUtils.toPbWorkerInfo(workerInfo1, true)
val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false, false)
val pbWorkerInfoWithEmptyResource = PbSerDeUtils.toPbWorkerInfo(workerInfo1, true, false)
val restoredWorkerInfo = PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfo)
val restoredWorkerInfoWithEmptyResource =
PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfoWithEmptyResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,8 @@ private[celeborn] class Master(
// unknown workers will retain in needCheckedWorkerList
needCheckedWorkerList.removeAll(statusSystem.workers)
if (shouldResponse) {
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplicationResponse
context.reply(HeartbeatFromApplicationResponse(
StatusCode.SUCCESS,
new util.ArrayList(
Expand Down

0 comments on commit 5323c1d

Please sign in to comment.