Skip to content

Commit

Permalink
Modify the metrics field, replace instance with ticketId (#4148)
Browse files Browse the repository at this point in the history
* replace instance with ticketId
  • Loading branch information
guoshupei authored Feb 12, 2023
1 parent d59de6b commit 4bec6ad
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public interface TaskConstant {

String ENTRANCEJOB_ENGINECONN_MAP = "engineconnMap";
String ENGINE_INSTANCE = "engineInstance";
String TICKET_ID = "ticketId";
String ENGINE_CONN_TASK_ID = "engineConnTaskId";
String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime";

String PARAMS_DATA_SOURCE = "dataSources";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import org.apache.linkis.engineconn.executor.listener.event.EngineConnSyncEvent
import org.apache.linkis.engineconn.launch.EngineConnServer
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.exception.engineconn.{
EngineConnExecutorErrorCode,
Expand Down Expand Up @@ -436,6 +437,15 @@ class TaskExecutionServiceImpl
}
val extraInfoMap = new util.HashMap[String, Object]()
extraInfoMap.put(TaskConstant.ENGINE_INSTANCE, Sender.getThisInstance)
extraInfoMap.put(
TaskConstant.TICKET_ID,
EngineConnServer.getEngineCreationContext.getTicketId
)
extraInfoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, task.getTaskId)
extraInfoMap.put(
TaskConstant.ENGINE_CONN_SUBMIT_TIME,
System.currentTimeMillis.toString
)
// todo add other info
var respRunningInfo: ResponseTaskRunningInfo = null
if (null != resourceResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ abstract class EntranceServer extends Logging {
t.setProgress(EntranceJob.JOB_COMPLETED_PROGRESS.toString)
val infoMap = new util.HashMap[String, AnyRef]
infoMap.put(TaskConstant.ENGINE_INSTANCE, "NULL")
infoMap.put(TaskConstant.TICKET_ID, "")
infoMap.put("message", "Task interception failed and cannot be retried")
JobHistoryHelper.updateJobRequestMetrics(jobRequest, null, infoMap)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,20 @@ class DefaultEntranceExecutor(id: Long)
orchestratorFuture.operate[ProgressProcessor](DefaultProgressOperation.PROGRESS_NAME)
progressProcessor.doOnObtain(progressInfoEvent => {
if (null != entranceJob) {
// Make sure to update the database, put it in front
try {
JobHistoryHelper.updateJobRequestMetrics(
entranceJob.getJobRequest,
progressInfoEvent.resourceMap,
progressInfoEvent.infoMap
)
} catch {
case e: Exception =>
logger.error("update job metrics error", e)
}
entranceJob.getProgressListener.foreach(
_.onProgressUpdate(entranceJob, progressInfoEvent.progress, entranceJob.getProgressInfo)
)
JobHistoryHelper.updateJobRequestMetrics(
entranceJob.getJobRequest,
progressInfoEvent.resourceMap,
progressInfoEvent.infoMap
)
}
})
progressProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ object JobHistoryHelper extends Logging {
metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, engineInstanceMap)
}
val infoMap = ecInfo
if (null != infoMap && infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
val instance = infoMap.get(TaskConstant.ENGINE_INSTANCE).asInstanceOf[String]
if (null != infoMap && infoMap.containsKey(TaskConstant.TICKET_ID)) {
val ticketId = infoMap.get(TaskConstant.TICKET_ID).asInstanceOf[String]
val engineExtraInfoMap = engineInstanceMap
.getOrDefault(instance, new util.HashMap[String, AnyRef])
.getOrDefault(ticketId, new util.HashMap[String, AnyRef])
.asInstanceOf[util.HashMap[String, AnyRef]]
engineExtraInfoMap.putAll(infoMap)
engineInstanceMap.put(instance, engineExtraInfoMap)
engineInstanceMap.put(ticketId, engineExtraInfoMap)
} else {
logger.warn("Ec info map must contains ECInstance")
logger.warn("Ec info map must contains ticketID")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.linkis.orchestrator.computation.execute.{
CodeExecTaskExecutorManager
}
import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
import org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor
import org.apache.linkis.orchestrator.exception.{
OrchestratorErrorCodeSummary,
OrchestratorErrorException,
Expand Down Expand Up @@ -120,6 +121,18 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
TaskConstant.ENGINE_INSTANCE,
codeExecutor.getEngineConnExecutor.getServiceInstance.getInstance
)
infoMap.put(
TaskConstant.TICKET_ID,
// Ensure that the job metric has at least one EC record.
// When the EC is reuse, the same EC may have two records, One key is Instance, and the other key is ticketId
if (codeExecutor.getEngineConnExecutor.isReuse()) {
codeExecutor.getEngineConnExecutor.getServiceInstance.getInstance
} else {
codeExecutor.getEngineConnExecutor.getTicketId
}
)
infoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, engineConnExecId)
infoMap.put(TaskConstant.ENGINE_CONN_SUBMIT_TIME, System.currentTimeMillis.toString)
val event = TaskRunningInfoEvent(
this,
0f,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ trait EngineConnExecutor extends Closeable {

def setReuse(reuse: Boolean): EngineConnExecutor

def getTicketId: String

override def equals(other: Any): Boolean = other match {
case that: EngineConnExecutor =>
(that canEqual this) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class ComputationEngineConnExecutor(engineNode: EngineNode) extends AbstractEngi

private def getEngineConnSender: Sender = Sender.getSender(getServiceInstance)

override def getTicketId: String = engineNode.getTicketId

override def close(): Unit = {
logger.info("Start to release engineConn {}", getServiceInstance)
val requestManagerUnlock =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,15 @@ object TaskConversions extends Logging {
.get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
.asInstanceOf[util.Map[String, Object]]
if (null != engineMap && !engineMap.isEmpty) {
taskVO.setEngineInstance(engineMap.map(_._1).toList.mkString(","))
// the engineInstance in metrics may be repeat, so it needs to be distinct
val engineInstances =
engineMap.asScala
.map(_._2.asInstanceOf[util.Map[String, Object]])
.map(_.get(TaskConstant.ENGINE_INSTANCE))
.toList
.distinct
.mkString(",")
taskVO.setEngineInstance(engineInstances)
}
} else if (TaskStatus.Failed.toString.equals(job.getStatus)) {
taskVO.setCanRetry(true)
Expand Down

0 comments on commit 4bec6ad

Please sign in to comment.