Skip to content

[DL-715][DL-716][DL-718] update libraries #473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/org/thp/cortex/Module.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import play.api.libs.concurrent.AkkaGuiceSupport
import play.api.{Configuration, Environment, Logger, Mode}

import java.lang.reflect.Modifier
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

class Module(environment: Environment, configuration: Configuration) extends AbstractModule with ScalaModule with AkkaGuiceSupport {

Expand Down
2 changes: 1 addition & 1 deletion app/org/thp/cortex/controllers/AssetCtrl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ trait AssetCtrl {
}

@Singleton
class AssetCtrlProd @Inject() (errorHandler: HttpErrorHandler, meta: AssetsMetadata) extends Assets(errorHandler, meta) with AssetCtrl {
class AssetCtrlProd @Inject() (errorHandler: HttpErrorHandler, meta: AssetsMetadata, env: Environment) extends Assets(errorHandler, meta, env) with AssetCtrl {
def get(file: String): Action[AnyContent] = at("/www", file)
}

Expand Down
18 changes: 9 additions & 9 deletions app/org/thp/cortex/controllers/StatusCtrl.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package org.thp.cortex.controllers

import scala.concurrent.{ExecutionContext, Future}
import com.sksamuel.elastic4s.ElasticDsl
import org.elastic4play.controllers.Authenticated
import org.elastic4play.services.AuthSrv
import org.elastic4play.services.auth.MultiAuthSrv
import org.elasticsearch.client.Node
import org.thp.cortex.models.{Roles, Worker, WorkerType}
import org.thp.cortex.services.WorkerSrv
import play.api.Configuration
import play.api.http.Status
import play.api.libs.json.Json.toJsFieldJsValueWrapper
import play.api.libs.json.{JsBoolean, JsNull, JsString, Json}
import play.api.libs.json.{JsBoolean, JsString, Json}
import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents}
import com.sksamuel.elastic4s.ElasticDsl
import org.elastic4play.controllers.Authenticated

import javax.inject.{Inject, Singleton}
import org.elasticsearch.client.Node
import org.thp.cortex.models.{Roles, Worker, WorkerType}
import org.elastic4play.services.AuthSrv
import org.elastic4play.services.auth.MultiAuthSrv
import org.thp.cortex.services.WorkerSrv
import scala.concurrent.ExecutionContext

@Singleton
class StatusCtrl @Inject() (
Expand Down
4 changes: 1 addition & 3 deletions app/org/thp/cortex/models/BaseConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import scala.concurrent.duration.Duration
import play.api.Configuration
import play.api.libs.json._

import org.elastic4play.utils.Collection.distinctBy

case class BaseConfig(name: String, workerNames: Seq[String], items: Seq[ConfigurationDefinitionItem], config: Option[WorkerConfig]) {
def +(other: BaseConfig) = BaseConfig(name, workerNames ++ other.workerNames, distinctBy(items ++ other.items)(_.name), config.orElse(other.config))
def +(other: BaseConfig): BaseConfig = BaseConfig(name, workerNames ++ other.workerNames, (items ++ other.items).distinctBy(_.name), config.orElse(other.config))
}

object BaseConfig {
Expand Down
6 changes: 2 additions & 4 deletions app/org/thp/cortex/models/Job.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package org.thp.cortex.models

import scala.util.Try

import play.api.libs.json.{JsObject, JsString, Json}
import play.api.libs.json.{Format, JsObject, JsString, Json}

import javax.inject.{Inject, Singleton}
import org.thp.cortex.models.JsonFormat.workerTypeFormat

import org.elastic4play.models.JsonFormat.enumFormat
import org.elastic4play.models.{AttributeDef, EntityDef, HiveEnumeration, ModelDef, AttributeFormat => F, AttributeOption => O}

object JobStatus extends Enumeration with HiveEnumeration {
type Type = Value
val Waiting, InProgress, Success, Failure, Deleted = Value
implicit val reads = enumFormat(this)
implicit val reads: Format[Value] = enumFormat(this)
}

trait JobAttributes {
Expand Down
7 changes: 2 additions & 5 deletions app/org/thp/cortex/models/Organization.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package org.thp.cortex.models

import javax.inject.{Inject, Provider, Singleton}

import scala.concurrent.{ExecutionContext, Future}

import play.api.Logger
import play.api.libs.json.{JsNumber, JsObject, JsString, Json}

import play.api.libs.json.{Format, JsNumber, JsObject, JsString, Json}
import org.elastic4play.models.JsonFormat.enumFormat
import org.elastic4play.models.{AttributeDef, BaseEntity, EntityDef, HiveEnumeration, ModelDef, AttributeFormat => F, AttributeOption => O}
import org.elastic4play.services.FindSrv

object OrganizationStatus extends Enumeration with HiveEnumeration {
type Type = Value
val Active, Locked = Value
implicit val reads = enumFormat(this)
implicit val reads: Format[Value] = enumFormat(this)
}

trait OrganizationAttributes { _: AttributeDef =>
Expand Down
6 changes: 2 additions & 4 deletions app/org/thp/cortex/models/User.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package org.thp.cortex.models

import scala.concurrent.Future

import play.api.libs.json.{JsArray, JsBoolean, JsObject, JsString}

import play.api.libs.json.{Format, JsArray, JsBoolean, JsObject, JsString}
import org.elastic4play.models.JsonFormat.enumFormat
import org.elastic4play.models.{AttributeDef, BaseEntity, EntityDef, HiveEnumeration, ModelDef, AttributeFormat => F, AttributeOption => O}
import org.elastic4play.services.{User => EUser}

object UserStatus extends Enumeration with HiveEnumeration {
type Type = Value
val Ok, Locked = Value
implicit val reads = enumFormat(this)
implicit val reads: Format[Value] = enumFormat(this)
}

trait UserAttributes { _: AttributeDef =>
Expand Down
4 changes: 2 additions & 2 deletions app/org/thp/cortex/models/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.elastic4play.models.JsonFormat.enumFormat
import org.elastic4play.models.{AttributeDef, BaseEntity, ChildModelDef, EntityDef, HiveEnumeration, AttributeFormat => F, AttributeOption => O}
import org.elastic4play.utils.Hasher
import org.thp.cortex.models.JsonFormat.workerTypeFormat
import play.api.libs.json.{JsObject, JsString, Json}
import play.api.libs.json.{Format, JsObject, JsString, Json}

import scala.concurrent.Future
import scala.util.Try
Expand All @@ -17,7 +17,7 @@ object RateUnit extends Enumeration with HiveEnumeration {
val Hour = Value(60 * 60)
val Day = Value(60 * 60 * 24)
val Month = Value(60 * 60 * 24 * 30)
implicit val reads = enumFormat(this)
implicit val reads: Format[Value] = enumFormat(this)
}

object WorkerType extends Enumeration with HiveEnumeration {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package org.thp.cortex.services

import javax.inject.{Inject, Singleton}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import play.api.Logger

import akka.actor.{Actor, ActorRef}
import org.thp.cortex.models.JobStatus

import org.elastic4play.models.BaseEntity
import org.elastic4play.models.{BaseEntity, BaseModelDef}
import org.elastic4play.services._

object AuditActor {
Expand All @@ -24,10 +20,10 @@ class AuditActor @Inject() (eventSrv: EventSrv, implicit val ec: ExecutionContex

import AuditActor._

object EntityExtractor {
def unapply(e: BaseEntity) = Some((e.model, e.id, e.routing))
private object EntityExtractor {
def unapply(e: BaseEntity): Option[(BaseModelDef, String, String)] = Some((e.model, e.id, e.routing))
}
var registration = Map.empty[String, Seq[ActorRef]]
private var registration = Map.empty[String, Seq[ActorRef]]
private[AuditActor] lazy val logger = Logger(getClass)

override def preStart(): Unit = {
Expand All @@ -42,17 +38,17 @@ class AuditActor @Inject() (eventSrv: EventSrv, implicit val ec: ExecutionContex

override def receive: Receive = {
case Register(jobId, timeout) =>
logger.info(s"Register new listener for job $jobId ($sender)")
val newActorList = registration.getOrElse(jobId, Nil) :+ sender
logger.info(s"Register new listener for job $jobId (${sender()})")
val newActorList = registration.getOrElse(jobId, Nil) :+ sender()
registration += (jobId -> newActorList)
context.system.scheduler.scheduleOnce(timeout, self, Unregister(jobId, sender))
context.system.scheduler.scheduleOnce(timeout, self, Unregister(jobId, sender()))

case Unregister(jobId, actorRef) =>
logger.info(s"Unregister listener for job $jobId ($actorRef)")
val newActorList = registration.getOrElse(jobId, Nil).filterNot(_ == actorRef)
registration += (jobId -> newActorList)

case AuditOperation(EntityExtractor(model, id, routing), action, details, authContext, date) =>
case AuditOperation(EntityExtractor(model, id, _), action, details, _, _) =>
if (model.modelName == "job" && action == AuditableAction.Update) {
logger.info(s"Job $id has be updated (${details \ "status"})")
val status = (details \ "status").asOpt[JobStatus.Type].getOrElse(JobStatus.InProgress)
Expand Down
121 changes: 31 additions & 90 deletions app/org/thp/cortex/services/DockerJobRunnerSrv.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package org.thp.cortex.services

import akka.actor.ActorSystem
import com.spotify.docker.client.DockerClient.LogsParam
import com.spotify.docker.client.messages.HostConfig.Bind
import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
import com.spotify.docker.client.{DefaultDockerClient, DockerClient}
import org.thp.cortex.util.docker.{DockerClient => DockerJavaClient}
import play.api.libs.json.Json
import play.api.{Configuration, Logger}

import java.nio.charset.StandardCharsets
import java.nio.file._
import java.util.concurrent.TimeUnit
import javax.inject.{Inject, Singleton}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

@Singleton
class DockerJobRunnerSrv(
client: DockerClient,
config: Configuration,
javaClient: DockerJavaClient,
autoUpdate: Boolean,
jobBaseDirectory: Path,
dockerJobBaseDirectory: Path,
Expand All @@ -28,17 +25,7 @@ class DockerJobRunnerSrv(
@Inject()
def this(config: Configuration, system: ActorSystem) =
this(
new DefaultDockerClient.Builder()
.apiVersion(config.getOptional[String]("docker.version").orNull)
.connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100))
.connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000))
//.dockerCertificates()
.readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000))
//.registryAuthSupplier()
.uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock"))
.useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false))
.build(),
config,
new DockerJavaClient(config),
config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true),
Paths.get(config.get[String]("job.directory")),
Paths.get(config.get[String]("job.dockerDirectory")),
Expand All @@ -50,89 +37,43 @@ class DockerJobRunnerSrv(
lazy val isAvailable: Boolean =
Try {
logger.debug(s"Retrieve docker information ...")
logger.info(s"Docker is available:\n${client.info()}")
logger.info(s"Docker is available:\n${javaClient.info}")
true
}.recover {
case error =>
logger.info(s"Docker is not available", error)
false
}.get

def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit
ec: ExecutionContext
): Try[Unit] = {
import scala.collection.JavaConverters._
if (autoUpdate) Try(client.pull(dockerImage))
// ContainerConfig.builder().addVolume()
val hostConfigBuilder = HostConfig.builder()
config.getOptional[Seq[String]]("docker.container.capAdd").map(_.asJava).foreach(hostConfigBuilder.capAdd)
config.getOptional[Seq[String]]("docker.container.capDrop").map(_.asJava).foreach(hostConfigBuilder.capDrop)
config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigBuilder.cgroupParent)
config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigBuilder.cpuPeriod(_))
config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigBuilder.cpuQuota(_))
config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigBuilder.dns)
config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigBuilder.dnsSearch)
config.getOptional[Seq[String]]("docker.container.extraHosts").map(_.asJava).foreach(hostConfigBuilder.extraHosts)
config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigBuilder.kernelMemory(_))
config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigBuilder.memoryReservation(_))
config.getOptional[Long]("docker.container.memory").foreach(hostConfigBuilder.memory(_))
config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigBuilder.memorySwap(_))
config.getOptional[Int]("docker.container.memorySwappiness").foreach(hostConfigBuilder.memorySwappiness(_))
config.getOptional[String]("docker.container.networkMode").foreach(hostConfigBuilder.networkMode)
config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigBuilder.privileged(_))
hostConfigBuilder.appendBinds(
Bind
.from(dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString)
.to("/job")
.readOnly(false)
.build()
)
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
val containerConfigBuilder = ContainerConfig
.builder()
.hostConfig(hostConfigBuilder.build())
.image(dockerImage)
.cmd("/job")
private def generateErrorOutput(containerId: String, f: Path) = {
logger.warn(s"the runner didn't generate any output file $f")
for {
output <- javaClient.getLogs(containerId)
report = Json.obj("success" -> false, "errorMessage" -> output)
_ <- Try(Files.write(f, report.toString.getBytes(StandardCharsets.UTF_8)))
} yield report
}

val containerConfig =
if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build()
else containerConfigBuilder.build()
val containerCreation = client.createContainer(containerConfig)
// Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn)
def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit executionContext: ExecutionContext): Try[Unit] = {
val to = timeout.getOrElse(FiniteDuration(5000, TimeUnit.SECONDS))

logger.debug(s"Container configuration: $containerConfig")
logger.info(
s"Execute container ${containerCreation.id()}\n" +
s" timeout: ${timeout.fold("none")(_.toString)}\n" +
s" image : $dockerImage\n" +
s" volume : ${jobDirectory.toAbsolutePath}:/job" +
Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString)
)

val timeoutSched = timeout.map(to =>
system.scheduler.scheduleOnce(to) {
logger.info("Timeout reached, stopping the container")
client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
}
)
val execution = Try {
client.startContainer(containerCreation.id())
client.waitContainer(containerCreation.id())
()
}
timeoutSched.foreach(_.cancel())
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
logger.warn(s"The worker didn't generate output file.")
val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully())
.fold(e => s"Container logs can't be read (${e.getMessage})", identity)
val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output)
if (autoUpdate) Try(javaClient.pullImage(dockerImage))

val report = Json.obj("success" -> false, "errorMessage" -> message)
Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8))
}
client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
execution
for {
containerId <- javaClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to)
timeoutScheduled = timeout.map(to =>
system.scheduler.scheduleOnce(to) {
logger.info("Timeout reached, stopping the container")
javaClient.clean(containerId)
}
)
_ <- javaClient.execute(containerId)
_ = timeoutScheduled.foreach(_.cancel())
outputFile <- Try(jobDirectory.resolve("output").resolve("output.json"))
isError = Files.notExists(outputFile) || Files.size(outputFile) == 0 || Files.isDirectory(outputFile)
_ = if (isError) generateErrorOutput(containerId, outputFile).toOption else None
_ <- javaClient.clean(containerId)
} yield ()
}

}
2 changes: 1 addition & 1 deletion app/org/thp/cortex/services/JobSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.thp.cortex.models._

import org.elastic4play._
import org.elastic4play.controllers._
import org.elastic4play.services._
import org.elastic4play.services.{UserSrv => _, _}
import org.elastic4play.utils.Hasher

@Singleton
Expand Down
5 changes: 2 additions & 3 deletions app/org/thp/cortex/services/ProcessJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import play.api.libs.json.Json
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import javax.inject.{Inject, Singleton}
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.sys.process.{Process, ProcessLogger, _}
Expand All @@ -24,7 +23,7 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) {
def checkCortexUtilsVersion(pythonVersion: String): Option[(Int, Int, Int)] =
Try {
(s"pip$pythonVersion" :: "show" :: "cortexutils" :: Nil)
.lineStream
.lazyLines
.collectFirst {
case pythonPackageVersionRegex(major, minor, patch) => (major.toInt, minor.toInt, patch.toInt)
}
Expand All @@ -34,7 +33,7 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) {
ec: ExecutionContext
): Try[Unit] = {
val baseDirectory = Paths.get(command).getParent.getParent
val output = mutable.StringBuilder.newBuilder
val output = new StringBuilder()
logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}")
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
val env = if (Files.exists(cacertsFile)) Seq("REQUESTS_CA_BUNDLE" -> cacertsFile.toString) else Nil
Expand Down
Loading