diff --git a/.gitignore b/.gitignore index 9353f3b..a708afc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ out/* project/* .gradle *.iml +*.ipr todo.txt src/docker/.docker diff --git a/build.gradle b/build.gradle index 551daf1..abb0d66 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,7 @@ apply plugin: 'scala' apply plugin: 'idea' -version = '0.9.3.0' +version = '0.9.4.0' jar.archiveName = "kafka-mesos-${version}.jar" diff --git a/kafka-mesos.properties b/kafka-mesos.properties index 45ebfcb..9adf7c5 100644 --- a/kafka-mesos.properties +++ b/kafka-mesos.properties @@ -3,11 +3,13 @@ debug=true user=vagrant -storage=file:kafka-mesos.json +storage=zk:mesos-kafka-scheduler master=master:5050 -zk=master:2181 - -api=http://192.168.3.1:7000 +zk=master:2181/chroot +#for testing on the vagrant master via ./kafka-mesos.sh scheduler +#you will eventually want to run this on a scheduler i.e marathon +#change the IP to what is service discoverable & routable for your setup +api=http://192.168.3.5:7000 diff --git a/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala b/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala index 34a31cc..e1d757e 100644 --- a/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala +++ b/src/scala/ly/stealth/mesos/kafka/BrokerServer.scala @@ -81,27 +81,46 @@ object BrokerServer { val serverClass = loader.loadClass("kafka.server.KafkaServerStartable") val configClass = loader.loadClass("kafka.server.KafkaConfig") - val props: Properties = this.props(options, "server.properties") - val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object] + val config: Object = newKafkaConfig(this.props(options, "server.properties")) val server: Object = serverClass.getConstructor(configClass).newInstance(config).asInstanceOf[Object] server } def startReporters(options: util.Map[String, String]): Object = { - val configClass = loader.loadClass("kafka.server.KafkaConfig") - - val props: Properties = this.props(options, "server.properties") - val config: Object = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object] - val metricsReporter = loader.loadClass("kafka.metrics.KafkaMetricsReporter$").getField("MODULE$").get(null) val metricsReporterClass = metricsReporter.getClass - val verifiableProps = config.getClass.getMethod("props").invoke(config) + + val props = this.props(options, "server.properties") + val verifiablePropsClass: Class[_] = loader.loadClass("kafka.utils.VerifiableProperties") + val verifiableProps: Object = verifiablePropsClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object] + metricsReporterClass.getMethod("startReporters", verifiableProps.getClass).invoke(metricsReporter, verifiableProps) } + + private def newKafkaConfig(props: Properties): Object = { + val configClass = loader.loadClass("kafka.server.KafkaConfig") + var config: Object = null + // in kafka <= 0.8.x constructor is KafkaConfig(java.util.Properties) + try { config = configClass.getConstructor(classOf[Properties]).newInstance(props).asInstanceOf[Object] } + catch { case e: NoSuchMethodException => } + + if (config == null) { + // in kafka 0.9.0.0 constructor is KafkaConfig(java.util.Map[_,_]) + val map: util.Map[_,_] = props.toMap.asInstanceOf[Map[_,_]] + try { config = configClass.getConstructor(classOf[util.Map[String, String]]).newInstance(map).asInstanceOf[Object] } + catch { case e: NoSuchMethodError => } + } + + if (config == null) throw new IllegalStateException("Can't create KafkaConfig. Unsupported kafka distro?") + config + } + def configureLog4j(options: util.Map[String, String]): Unit = { + System.setProperty("kafka.logs.dir", "" + new File(Distro.dir, "log")) val props: Properties = this.props(options, "log4j.properties") + val configurator: Class[_] = loader.loadClass("org.apache.log4j.PropertyConfigurator") configurator.getMethod("configure", classOf[Properties]).invoke(null, props) } @@ -141,13 +160,31 @@ object BrokerServer { // This is required, because current jar have classes incompatible with classes from kafka distro. class Loader(urls: Array[URL]) extends URLClassLoader(urls) { val snappyHackedClasses = Array[String]("org.xerial.snappy.SnappyNativeAPI", "org.xerial.snappy.SnappyNative", "org.xerial.snappy.SnappyErrorCode") + var snappyHackEnabled = false + checkSnappyVersion + + def checkSnappyVersion { + var jarName: String = null + for (url <- urls) { + val fileName = new File(url.getFile).getName + if (fileName.matches("snappy.*jar")) jarName = fileName + } + + if (jarName == null) return + val hIdx = jarName.lastIndexOf("-") + val extIdx = jarName.lastIndexOf(".jar") + if (hIdx == -1 || extIdx == -1) return + + val version = new Util.Version(jarName.substring(hIdx + 1, extIdx)) + snappyHackEnabled = version.compareTo(new Util.Version(1,1,0)) <= 0 + } override protected def loadClass(name: String, resolve: Boolean): Class[_] = { getClassLoadingLock(name) synchronized { // Handle Snappy class loading hack: // Snappy injects 3 classes and native lib to root ClassLoader // See - org.xerial.snappy.SnappyLoader.injectSnappyNativeLoader - if (snappyHackedClasses.contains(name)) + if (snappyHackEnabled && snappyHackedClasses.contains(name)) return super.loadClass(name, true) // Check class is loaded diff --git a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala index 68ff2f6..c5d567f 100644 --- a/src/scala/ly/stealth/mesos/kafka/HttpServer.scala +++ b/src/scala/ly/stealth/mesos/kafka/HttpServer.scala @@ -34,6 +34,7 @@ import scala.util.parsing.json.JSONObject object HttpServer { var jar: File = null var kafkaDist: File = null + var kafkaVersion: Util.Version = null val logger = Logger.getLogger(HttpServer.getClass) var server: Server = null @@ -90,6 +91,13 @@ object HttpServer { if (jar == null) throw new IllegalStateException(jarMask + " not found in current dir") if (kafkaDist == null) throw new IllegalStateException(kafkaMask + " not found in in current dir") + + // extract version + val distName: String = kafkaDist.getName + val tgzIdx = distName.lastIndexOf(".tgz") + val hIdx = distName.lastIndexOf("-") + if (tgzIdx == -1 || hIdx == -1) throw new IllegalStateException("Can't extract version number from " + distName) + kafkaVersion = new Util.Version(distName.substring(hIdx + 1, tgzIdx)) } private class Servlet extends HttpServlet { diff --git a/src/scala/ly/stealth/mesos/kafka/Scheduler.scala b/src/scala/ly/stealth/mesos/kafka/Scheduler.scala index d77140c..ea9bde0 100644 --- a/src/scala/ly/stealth/mesos/kafka/Scheduler.scala +++ b/src/scala/ly/stealth/mesos/kafka/Scheduler.scala @@ -24,7 +24,7 @@ import java.util import com.google.protobuf.ByteString import java.util.{Collections, Date} import scala.collection.JavaConversions._ -import ly.stealth.mesos.kafka.Util.{Period, Str} +import ly.stealth.mesos.kafka.Util.{Version, Period, Str} object Scheduler extends org.apache.mesos.Scheduler { private val logger: Logger = Logger.getLogger(this.getClass) @@ -70,6 +70,9 @@ object Scheduler extends org.apache.mesos.Scheduler { "host.name" -> offer.getHostname ) + if (HttpServer.kafkaVersion.compareTo(new Version("0.9")) >= 0) + defaults += ("listeners" -> s"PLAINTEXT://:${reservation.port}") + if (reservation.volume != null) defaults += ("log.dirs" -> "data/kafka-logs") diff --git a/src/scala/ly/stealth/mesos/kafka/Util.scala b/src/scala/ly/stealth/mesos/kafka/Util.scala index d1753dc..41fc9fa 100644 --- a/src/scala/ly/stealth/mesos/kafka/Util.scala +++ b/src/scala/ly/stealth/mesos/kafka/Util.scala @@ -240,6 +240,38 @@ object Util { override def toString: String = if (start == end) "" + start else start + ".." + end } + class Version(s: String) extends Comparable[Version] { + private var parts: Array[Int] = null + + def this(args: Int*) { + this(args.mkString(".")) + } + + parse + private def parse { + parts = if (s != "") s.split("\\.", -1).map(Integer.parseInt) else new Array[Int](0) + } + + def asList: List[Int] = parts.toList + + def compareTo(v: Version): Int = { + for (i <- 0 until Math.min(parts.length, v.parts.length)) { + val diff = parts(i) - v.parts(i) + if (diff != 0) return diff + } + + parts.length - v.parts.length + } + + override def hashCode(): Int = toString.hashCode + + override def equals(obj: scala.Any): Boolean = { + obj.isInstanceOf[Version] && toString == "" + obj + } + + override def toString: String = parts.mkString(".") + } + class BindAddress(s: String) { private var _source: String = null private var _value: String = null diff --git a/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala b/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala index e871fdf..a8b5628 100644 --- a/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala +++ b/src/test/ly/stealth/mesos/kafka/MesosTestCase.scala @@ -82,7 +82,7 @@ class MesosTestCase { } HttpServer.jar = createTempFile("executor.jar", "executor") - HttpServer.kafkaDist = createTempFile("kafka.tgz", "kafka") + HttpServer.kafkaDist = createTempFile("kafka-0.9.3.0.tgz", "kafka") } @After diff --git a/src/test/ly/stealth/mesos/kafka/UtilTest.scala b/src/test/ly/stealth/mesos/kafka/UtilTest.scala index 7b1fc73..b40af5e 100644 --- a/src/test/ly/stealth/mesos/kafka/UtilTest.scala +++ b/src/test/ly/stealth/mesos/kafka/UtilTest.scala @@ -19,7 +19,7 @@ package ly.stealth.mesos.kafka import org.junit.Test import org.junit.Assert._ -import ly.stealth.mesos.kafka.Util.{BindAddress, Period, Range} +import ly.stealth.mesos.kafka.Util.{BindAddress, Period, Range, Version} import java.io.{ByteArrayOutputStream, ByteArrayInputStream} import java.util @@ -260,6 +260,33 @@ class UtilTest { assertEquals("0", "" + new Range("0..0")) } + @Test + def Version_init { + assertEquals(List(), new Version().asList) + assertEquals(List(1,0), new Version(1,0).asList) + assertEquals(List(1,2,3,4), new Version("1.2.3.4").asList) + + try { new Version(" "); fail() } + catch { case e: IllegalArgumentException => } + + try { new Version("."); fail() } + catch { case e: IllegalArgumentException => } + + try { new Version("a"); fail() } + catch { case e: IllegalArgumentException => } + } + + @Test + def Version_compareTo { + assertEquals(0, new Version().compareTo(new Version())) + assertEquals(0, new Version(0).compareTo(new Version(0))) + + assertTrue(new Version(0).compareTo(new Version(1)) < 0) + assertTrue(new Version(0).compareTo(new Version(0, 0)) < 0) + + assertTrue(new Version(0, 9, 0, 0).compareTo(new Version(0, 8, 2, 0)) > 0) + } + // BindAddress @Test def BindAddress_init {