Skip to content

Commit

Permalink
[linkis-orchestrator-ecm-plugin] Modification of scala file floating …
Browse files Browse the repository at this point in the history
…red (#3166)

* [linkis-orchestrator-ecm-plugin] Modification of scala file floating red
  • Loading branch information
binbinCheng authored Sep 7, 2022
1 parent c01d9a7 commit 739adf4
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration

/**
Expand All @@ -64,7 +64,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin
override def applyMark(markReq: MarkReq): Mark = {
if (null == markReq) return null
val mark = MARK_CACHE_LOCKER.synchronized {
val markCache = getMarkCache().keys
val markCache = getMarkCache().asScala.keys
val maybeMark = markCache.find(_.getMarkReq.equals(markReq))
maybeMark.orNull
}
Expand Down Expand Up @@ -120,7 +120,7 @@ class ComputationEngineConnManager extends AbstractEngineConnManager with Loggin
new ComputationEngineConnExecutor(engineNode)
}
if (null != engineNode.getLabels) {
engineConnExecutor.setLabels(engineNode.getLabels.toList.toArray)
engineConnExecutor.setLabels(engineNode.getLabels.asScala.toList.toArray)
}
return engineConnExecutor
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.linkis.orchestrator.ecm.service.EngineConnExecutor

import java.util

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

/**
*/
Expand Down Expand Up @@ -146,12 +146,13 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
val instances = getInstances(mark)
if (null != instances) {
val executors = Utils.tryAndWarn {
instances.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy { executor =>
if (null == executor.getRunningTaskCount) {
0
} else {
executor.getRunningTaskCount
}
instances.asScala.map(getEngineConnExecutorCache().get(_)).filter(null != _).sortBy {
executor =>
if (null == executor.getRunningTaskCount) {
0
} else {
executor.getRunningTaskCount
}
}
}

Expand Down Expand Up @@ -206,9 +207,9 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging

protected def getMarksByInstance(serviceInstance: ServiceInstance): Array[Mark] =
MARK_CACHE_LOCKER.synchronized {
getMarkCache()
getMarkCache().asScala
.filter { keyValue =>
keyValue._2.exists(serviceInstance.equals(_))
keyValue._2.asScala.exists(serviceInstance.equals(_))
}
.keys
.toArray
Expand Down Expand Up @@ -240,7 +241,7 @@ abstract class AbstractEngineConnManager extends EngineConnManager with Logging
override def releaseMark(mark: Mark): Unit = {
if (null != mark && getMarkCache().containsKey(mark)) {
logger.debug(s"Start to release mark ${mark.getMarkId()}")
val executors = getMarkCache().get(mark).map(getEngineConnExecutorCache().get(_))
val executors = getMarkCache().get(mark).asScala.map(getEngineConnExecutorCache().get(_))
Utils.tryAndError(executors.foreach { executor =>
getEngineConnExecutorCache().remove(executor.getServiceInstance)
executor.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.linkis.server.BDPJettyServerHelper
import java.util
import java.util.Random

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager with Logging {
Expand Down Expand Up @@ -65,31 +65,26 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
}

/**
* 申请获取一个Mark
* 1. 如果没有对应的Mark就生成新的 2. 一个MarkRequest对应多个Mark,一个Mark对应一个Engine 3.
* 如果存在bindEngineLabel则需要在jobStart的时候随机选择一个,并缓存给后续jobGroup使用,在jobEnd的时候删除缓存 4. 将Mark进行返回
* Apply for a mark
* 1. If there is no corresponding mark, a new one will be generated. 2. A markrequest
* corresponds to multiple marks, and a mark corresponds to an engine. 3 If there is a
* bindenginelabel, you need to randomly select one at jobstart and cache it for subsequent
* jobgroups. Delete the cache at jobend. 4. Return mark
*
* @param markReq
* @return
*/
override def applyMark(markReq: MarkReq): Mark = {
if (null == markReq) return null

val markNum: Int = getMarkNumByMarReq(markReq)
/*val markReqCache = MARK_REQ_CACHE_LOCKER.synchronized {
getMarkReqAndMarkCache().keys
}
var mayBeMarkReq = markReqCache.find(_.equals(markReq)).orNull*/
var count = 0
if (getMarkReqAndMarkCache().containsKey(markReq)) {
count = getMarkReqAndMarkCache().get(markReq).size()
}
// toto check if count >= markNum, will not add more mark
while (count < markNum) {
createMark(markReq)
count = getMarkReqAndMarkCache().get(markReq).size()
}
// markReq is in cache, and mark list is ready
val markList = getMarkReqAndMarkCache().get(markReq)
var chooseMark: Mark = null
if (markReq.getLabels.containsKey(LabelKeyConstant.BIND_ENGINE_KEY)) {
Expand All @@ -98,13 +93,10 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
markReq.getLabels.get(LabelKeyConstant.BIND_ENGINE_KEY)
)
if (bindEngineLabel.getIsJobGroupHead) {
// getRandom mark
chooseMark = markList.get(new util.Random().nextInt(markList.length))
// chooseMark.asInstanceOf[LoadBalanceMark].setTaskMarkReq(markReq)
chooseMark = markList.get(new util.Random().nextInt(markList.asScala.length))
getIdToMarkCache().put(bindEngineLabel.getJobGroupId, chooseMark)
} else if (getIdToMarkCache().containsKey(bindEngineLabel.getJobGroupId)) {
chooseMark = getIdToMarkCache().get(bindEngineLabel.getJobGroupId)
// chooseMark.asInstanceOf[LoadBalanceMark].setTaskMarkReq(markReq)
val insList = getMarkCache().get(chooseMark)
if (null == insList || insList.size() != 1) {
val msg =
Expand All @@ -131,16 +123,11 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
}
}
} else {
// treat as isHead and isEnd
chooseMark = markList.get(new Random().nextInt(count))
}
chooseMark
}

/**
* 1. 创建一个新的Mark 2. 生成新的Mark会存在请求引擎的过程,如果请求到了则存入Map中:Mark为Key,EngineConnExecutor为Value 3.
* 生成的Mark数量等于LoadBalance的并发量
*/
override def createMark(markReq: MarkReq): Mark = {
val mark = new LoadBalanceMark(nextMarkId(), markReq)
addMark(mark, new util.ArrayList[ServiceInstance]())
Expand All @@ -152,7 +139,7 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
if (null != req) MARK_REQ_CACHE_LOCKER.synchronized {
val markList = getMarkReqAndMarkCache().get(req)
if (null != markList) {
val mayBeMark = markList.find(_.getMarkId().equals(mark.getMarkId())).orNull
val mayBeMark = markList.asScala.find(_.getMarkId().equals(mark.getMarkId())).orNull
if (null == mayBeMark) {
markList.add(mark)
} else {
Expand Down Expand Up @@ -231,20 +218,22 @@ class LoadBalanceLabelEngineConnManager extends ComputationEngineConnManager wit
if (null == marks || marks.isEmpty) {
getMarkReqAndMarkCache().remove(mark.getMarkReq)
} else {
val newMarks = marks.filter(!_.getMarkId().equals(mark.getMarkId()))
val newMarks = marks.asScala.filter(!_.getMarkId().equals(mark.getMarkId()))
if (null == newMarks || newMarks.isEmpty) {
getMarkReqAndMarkCache().remove(mark.getMarkReq)
} else {
getMarkReqAndMarkCache().put(mark.getMarkReq, newMarks)
getMarkReqAndMarkCache().put(mark.getMarkReq, newMarks.asJava)
}
// getMarkReqAndMarkCache().put(mark.getMarkReq))
}
}
}

protected def getAllInstances(): Array[String] = MARK_CACHE_LOCKER.synchronized {
val instances = new ArrayBuffer[String]
getMarkCache().values().foreach(_.foreach(s => instances.add(s.getInstance)))
getMarkCache()
.values()
.asScala
.foreach(_.asScala.foreach(s => instances.asJava.add(s.getInstance)))
instances.toArray
}

Expand Down

0 comments on commit 739adf4

Please sign in to comment.