diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f169b82 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,10 @@ +language: scala +jdk: + - oraclejdk8 +scala: + - 2.11.11 + - 2.12.2 +script: + - sbt ++$TRAVIS_SCALA_VERSION coverage test coverageReport +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e06d208 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..4d27111 --- /dev/null +++ b/build.sbt @@ -0,0 +1,66 @@ +organization := "com.workday" + +name := "prometheus-akka" + +scalaVersion := "2.11.11" + +crossScalaVersions := Seq("2.11.11", "2.12.2") + +val akkaVersion = "2.4.18" +val aspectjweaverVersion = "1.8.10" +val prometheusVersion = "0.0.22" + +checksums in update := Nil + +resolvers += Resolver.sonatypeRepo("releases") + +libraryDependencies ++= Seq( + "org.slf4j" % "slf4j-api" % "1.7.22", + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "io.kamon" %% "kamon-core" % "0.6.6", + "io.prometheus" % "simpleclient" % prometheusVersion, + "io.prometheus" % "simpleclient_common" % prometheusVersion, + "com.typesafe" % "config" % "1.3.1", + "org.aspectj" % "aspectjweaver" % aspectjweaverVersion, + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "org.scalatest" %% "scalatest" % "3.0.1" % "test", + "ch.qos.logback" % "logback-classic" % "1.2.3" % "test" +) + +enablePlugins(JavaAgent) +javaAgents += "org.aspectj" % "aspectjweaver" % aspectjweaverVersion % "test" + +testOptions in Test += Tests.Argument("-oD") + +parallelExecution in Test := false +logBuffered := false + +publishMavenStyle := true + +publishTo := { + val nexus = "https://oss.sonatype.org/" + if (isSnapshot.value) + Some("snapshots" at nexus + "content/repositories/snapshots") + else + Some("releases" at nexus + "service/local/staging/deploy/maven2") +} + +publishArtifact in Test := false + +pomIncludeRepository := { _ => false } + +parallelExecution in Test := false + +homepage := Some(url("https://github.com/Workday/prometheus-akka")) + +licenses := Seq("The Apache Software License, Version 2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")) + +releasePublishArtifactsAction := PgpKeys.publishSigned.value + +pomExtra := ( + + git@github.com:Workday/prometheus-akka.git + scm:git:git@github.com:Workday/prometheus-akka.git + +) diff --git a/src/main/resources/META-INF/aop.xml b/src/main/resources/META-INF/aop.xml new file mode 100644 index 0000000..d21a02d --- /dev/null +++ b/src/main/resources/META-INF/aop.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf new file mode 100644 index 0000000..db8dc0c --- /dev/null +++ b/src/main/resources/reference.conf @@ -0,0 +1,30 @@ +# ==================================== # +# Akka-Monitor Reference Configuration # +# ==================================== # + +workday.akka.monitor { + metric.filters { + akka-actor { + includes = [] + excludes = [ "*/system/**", "*/user/IO-**" ] + } + + akka-router { + includes = [] + excludes = [] + } + + akka-dispatcher { + includes = ["**"] + excludes = [] + } + + akka-actor-groups { + //include empty actor-group to demonstrate the config + empty { + includes = [] + excludes = ["**"] + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/akka/monitor/instrumentation/ActorInstrumentation.scala b/src/main/scala/akka/monitor/instrumentation/ActorInstrumentation.scala new file mode 100644 index 0000000..d92aa6d --- /dev/null +++ b/src/main/scala/akka/monitor/instrumentation/ActorInstrumentation.scala @@ -0,0 +1,180 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package akka.monitor.instrumentation + +import java.util.concurrent.locks.ReentrantLock + +import scala.collection.immutable +import scala.util.Properties + +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ After, Around, Aspect, Before, DeclareMixin, Pointcut } + +import akka.actor.{ ActorCell, ActorRef, ActorSystem, Cell, InternalActorRef, UnstartedCell } +import akka.dispatch.Envelope +import akka.dispatch.sysmsg.SystemMessage +import akka.routing.RoutedActorCell + +@Aspect +class ActorCellInstrumentation { + + def actorInstrumentation(cell: Cell): ActorMonitor = + cell.asInstanceOf[ActorInstrumentationAware].actorInstrumentation + + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, *, *, parent)") + def actorCellCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: InternalActorRef): Unit = {} + + @Pointcut("execution(akka.actor.UnstartedCell.new(..)) && this(cell) && args(system, ref, *, parent)") + def repointableActorRefCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: InternalActorRef): Unit = {} + + @After("actorCellCreation(cell, system, ref, parent)") + def afterCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = { + cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation( + ActorMonitor.createActorMonitor(cell, system, ref, parent, true)) + } + + @After("repointableActorRefCreation(cell, system, ref, parent)") + def afterRepointableActorRefCreation(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): Unit = { + cell.asInstanceOf[ActorInstrumentationAware].setActorInstrumentation( + ActorMonitor.createActorMonitor(cell, system, ref, parent, false)) + } + + @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(envelope)") + def invokingActorBehaviourAtActorCell(cell: ActorCell, envelope: Envelope) = {} + + @Around("invokingActorBehaviourAtActorCell(cell, envelope)") + def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + actorInstrumentation(cell).processMessage(pjp, envelope.asInstanceOf[InstrumentedEnvelope].envelopeContext()) + } + + @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInActorCell(cell: Cell, envelope: Envelope): Unit = {} + + @Pointcut("execution(* akka.actor.UnstartedCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInUnstartedActorCell(cell: Cell, envelope: Envelope): Unit = {} + + @Before("sendMessageInActorCell(cell, envelope)") + def afterSendMessageInActorCell(cell: Cell, envelope: Envelope): Unit = { + setEnvelopeContext(cell, envelope) + } + + @Before("sendMessageInUnstartedActorCell(cell, envelope)") + def afterSendMessageInUnstartedActorCell(cell: Cell, envelope: Envelope): Unit = { + setEnvelopeContext(cell, envelope) + } + + private def setEnvelopeContext(cell: Cell, envelope: Envelope): Unit = { + envelope.asInstanceOf[InstrumentedEnvelope].setEnvelopeContext( + actorInstrumentation(cell).captureEnvelopeContext()) + } + + @Pointcut("execution(* akka.actor.UnstartedCell.replaceWith(*)) && this(unStartedCell) && args(cell)") + def replaceWithInRepointableActorRef(unStartedCell: UnstartedCell, cell: Cell): Unit = {} + + @Around("replaceWithInRepointableActorRef(unStartedCell, cell)") + def aroundReplaceWithInRepointableActorRef(pjp: ProceedingJoinPoint, unStartedCell: UnstartedCell, cell: Cell): Unit = { + import ActorCellInstrumentation._ + // TODO: Find a way to do this without resorting to reflection and, even better, without copy/pasting the Akka Code! + val queue = unstartedCellQueueField.get(unStartedCell).asInstanceOf[java.util.LinkedList[_]] + val lock = unstartedCellLockField.get(unStartedCell).asInstanceOf[ReentrantLock] + + def locked[T](body: ⇒ T): T = { + lock.lock() + try body finally lock.unlock() + } + + locked { + try { + while (!queue.isEmpty) { + queue.poll() match { + case s: SystemMessage ⇒ cell.sendSystemMessage(s) // TODO: ============= CHECK SYSTEM MESSAGESSSSS ========= + case e: Envelope with InstrumentedEnvelope ⇒ cell.sendMessage(e) + case e: Envelope ⇒ cell.sendMessage(e) + } + } + } finally { + unStartedCell.self.swapCell(cell) + } + } + } + + @Pointcut("execution(* akka.actor.ActorCell.stop()) && this(cell)") + def actorStop(cell: ActorCell): Unit = {} + + @After("actorStop(cell)") + def afterStop(cell: ActorCell): Unit = { + actorInstrumentation(cell).cleanup() + + // The Stop can't be captured from the RoutedActorCell so we need to put this piece of cleanup here. + if (cell.isInstanceOf[RoutedActorCell]) { + cell.asInstanceOf[RouterInstrumentationAware].routerInstrumentation.cleanup() + } + } + + @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell) && args(childrenNotToSuspend, failure)") + def actorInvokeFailure(cell: ActorCell, childrenNotToSuspend: immutable.Iterable[ActorRef], failure: Throwable): Unit = {} + + @Before("actorInvokeFailure(cell, childrenNotToSuspend, failure)") + def beforeInvokeFailure(cell: ActorCell, childrenNotToSuspend: immutable.Iterable[ActorRef], failure: Throwable): Unit = { + actorInstrumentation(cell).processFailure(failure) + } +} + +object ActorCellInstrumentation { + private val (unstartedCellQueueField, unstartedCellLockField) = { + val unstartedCellClass = classOf[UnstartedCell] + val queueFieldName = Properties.versionNumberString.split("\\.").take(2).mkString(".") match { + case _@ "2.11" ⇒ "akka$actor$UnstartedCell$$queue" + case _@ "2.12" ⇒ "queue" + case v ⇒ throw new IllegalStateException(s"Incompatible Scala version: $v") + } + + val queueField = unstartedCellClass.getDeclaredField(queueFieldName) + queueField.setAccessible(true) + + val lockField = unstartedCellClass.getDeclaredField("lock") + lockField.setAccessible(true) + + (queueField, lockField) + } + +} + +trait ActorInstrumentationAware { + def actorInstrumentation: ActorMonitor + def setActorInstrumentation(ai: ActorMonitor): Unit +} + +object ActorInstrumentationAware { + def apply(): ActorInstrumentationAware = new ActorInstrumentationAware { + private var _ai: ActorMonitor = _ + + def setActorInstrumentation(ai: ActorMonitor): Unit = _ai = ai + def actorInstrumentation: ActorMonitor = _ai + } +} + +@Aspect +class MetricsIntoActorCellsMixin { + + @DeclareMixin("akka.actor.ActorCell") + def mixinActorCellMetricsToActorCell: ActorInstrumentationAware = ActorInstrumentationAware() + + @DeclareMixin("akka.actor.UnstartedCell") + def mixinActorCellMetricsToUnstartedActorCell: ActorInstrumentationAware = ActorInstrumentationAware() + +} diff --git a/src/main/scala/akka/monitor/instrumentation/ActorMonitor.scala b/src/main/scala/akka/monitor/instrumentation/ActorMonitor.scala new file mode 100644 index 0000000..75d6a25 --- /dev/null +++ b/src/main/scala/akka/monitor/instrumentation/ActorMonitor.scala @@ -0,0 +1,183 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package akka.monitor.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +import com.workday.prometheus.akka.{ ActorMetrics, ActorGroupMetrics, RouterMetrics } + +import akka.actor.{ ActorRef, ActorSystem, Cell } +import akka.monitor.instrumentation.ActorMonitors.{ TrackedActor, TrackedRoutee } +import kamon.metric.Entity +import kamon.util.RelativeNanoTimestamp + +trait ActorMonitor { + def captureEnvelopeContext(): EnvelopeContext + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef + def processFailure(failure: Throwable): Unit + def cleanup(): Unit +} + +object ActorMonitor { + + def createActorMonitor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef, actorCellCreation: Boolean): ActorMonitor = { + val cellInfo = CellInfo.cellInfoFor(cell, system, ref, parent, actorCellCreation) + + if (cellInfo.isRouter) + ActorMonitors.ContextPropagationOnly + else { + if (cellInfo.isRoutee && cellInfo.isTracked) + createRouteeMonitor(cellInfo) + else + createRegularActorMonitor(cellInfo) + } + } + + def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = { + if (cellInfo.isTracked || cellInfo.trackingGroups.length > 0) { + val actorMetrics = if (cellInfo.isTracked) Some(ActorMetrics.metricsFor(cellInfo.entity)) else None + new TrackedActor(cellInfo.entity, actorMetrics, cellInfo.trackingGroups, cellInfo.actorCellCreation) + } else { + ActorMonitors.ContextPropagationOnly + } + } + + def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = { + def routerMetrics = RouterMetrics.metricsFor(cellInfo.entity) + new TrackedRoutee(cellInfo.entity, routerMetrics, cellInfo.trackingGroups, cellInfo.actorCellCreation) + } +} + +object ActorMonitors { + + val ContextPropagationOnly = new ActorMonitor { + def captureEnvelopeContext(): EnvelopeContext = + EnvelopeContext(RelativeNanoTimestamp.now) + + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + pjp.proceed() + } + + def processFailure(failure: Throwable): Unit = {} + def cleanup(): Unit = {} + } + + class TrackedActor(val entity: Entity, actorMetrics: Option[ActorMetrics], + trackingGroups: List[String], actorCellCreation: Boolean) + extends GroupMetricsTrackingActor(entity, trackingGroups, actorCellCreation) { + + override def captureEnvelopeContext(): EnvelopeContext = { + actorMetrics.foreach { am => + am.mailboxSize.inc() + am.messages.inc() + } + super.captureEnvelopeContext() + } + + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + val timestampBeforeProcessing = RelativeNanoTimestamp.now + + try { + pjp.proceed() + } finally { + val timestampAfterProcessing = RelativeNanoTimestamp.now + val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime + val processingTime = timestampAfterProcessing - timestampBeforeProcessing + + actorMetrics.foreach { am => + am.processingTime.inc(processingTime.nanos) + am.timeInMailbox.inc(timeInMailbox.nanos) + am.mailboxSize.dec() + } + recordProcessMetrics(processingTime, timeInMailbox) + } + } + + override def processFailure(failure: Throwable): Unit = { + actorMetrics.foreach { am => + am.errors.inc() + } + super.processFailure(failure) + } + } + + class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics, + trackingGroups: List[String], actorCellCreation: Boolean) + extends GroupMetricsTrackingActor(entity, trackingGroups, actorCellCreation) { + + def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = { + val timestampBeforeProcessing = RelativeNanoTimestamp.now + + try { + pjp.proceed() + } finally { + val timestampAfterProcessing = RelativeNanoTimestamp.now + val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime + val processingTime = timestampAfterProcessing - timestampBeforeProcessing + + routerMetrics.processingTime.inc(processingTime.nanos) + routerMetrics.timeInMailbox.inc(timeInMailbox.nanos) + recordProcessMetrics(processingTime, timeInMailbox) + } + } + + override def processFailure(failure: Throwable): Unit = { + routerMetrics.errors.inc() + super.processFailure(failure) + } + } + + abstract class GroupMetricsTrackingActor(entity: Entity, + trackingGroups: List[String], actorCellCreation: Boolean) extends ActorMonitor { + if (actorCellCreation) { + trackingGroups.foreach { group => + ActorGroupMetrics.actorCount.labels(group).inc() + } + } + + def captureEnvelopeContext(): EnvelopeContext = { + trackingGroups.foreach { group => + ActorGroupMetrics.mailboxSize.labels(group).inc() + ActorGroupMetrics.messages.labels(group).inc() + } + EnvelopeContext(RelativeNanoTimestamp.now) + } + + protected def recordProcessMetrics(processingTime: RelativeNanoTimestamp, timeInMailbox: RelativeNanoTimestamp): Unit = { + trackingGroups.foreach { group => + ActorGroupMetrics.processingTime.labels(group).inc(processingTime.nanos) + ActorGroupMetrics.timeInMailbox.labels(group).inc(timeInMailbox.nanos) + ActorGroupMetrics.mailboxSize.labels(group).dec() + } + } + + def processFailure(failure: Throwable): Unit = { + trackingGroups.foreach { group => + ActorGroupMetrics.errors.labels(group).inc() + } + } + + def cleanup(): Unit = { + if (actorCellCreation) { + trackingGroups.foreach { group => + ActorGroupMetrics.actorCount.labels(group).dec() + } + } + } + + } +} diff --git a/src/main/scala/akka/monitor/instrumentation/CellInfo.scala b/src/main/scala/akka/monitor/instrumentation/CellInfo.scala new file mode 100644 index 0000000..bd9fe80 --- /dev/null +++ b/src/main/scala/akka/monitor/instrumentation/CellInfo.scala @@ -0,0 +1,49 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package akka.monitor.instrumentation + +import com.workday.prometheus.akka.{ ActorMetrics, MetricsConfig, RouterMetrics } + +import akka.actor.{ ActorRef, ActorSystem, Cell } +import akka.routing.{ NoRouter, RoutedActorRef } +import kamon.metric.Entity + +case class CellInfo(entity: Entity, isRouter: Boolean, isRoutee: Boolean, isTracked: Boolean, + trackingGroups: List[String], actorCellCreation: Boolean) + +object CellInfo { + + def cellName(system: ActorSystem, ref: ActorRef): String = + s"""${system.name}/${ref.path.elements.mkString("/")}""" + + def cellInfoFor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef, actorCellCreation: Boolean): CellInfo = { + def hasRouterProps(cell: Cell): Boolean = cell.props.deploy.routerConfig != NoRouter + + val pathString = ref.path.elements.mkString("/") + val isRootSupervisor = pathString.length == 0 || pathString == "user" || pathString == "system" + val isRouter = hasRouterProps(cell) + val isRoutee = parent.isInstanceOf[RoutedActorRef] + + val name = if (isRoutee) cellName(system, parent) else cellName(system, ref) + val category = if (isRouter || isRoutee) MetricsConfig.Router else MetricsConfig.Actor + val entity = Entity(name, category) + val isTracked = !isRootSupervisor && MetricsConfig.shouldTrack(category, name) + val trackingGroups = if(isRoutee && isRootSupervisor) List() else MetricsConfig.actorShouldBeTrackedUnderGroups(name) + + CellInfo(entity, isRouter, isRoutee, isTracked, trackingGroups, actorCellCreation) + } +} diff --git a/src/main/scala/akka/monitor/instrumentation/DispatcherInstrumentation.scala b/src/main/scala/akka/monitor/instrumentation/DispatcherInstrumentation.scala new file mode 100644 index 0000000..c2668bb --- /dev/null +++ b/src/main/scala/akka/monitor/instrumentation/DispatcherInstrumentation.scala @@ -0,0 +1,210 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.monitor.instrumentation + +import java.lang.reflect.Method +import java.util.concurrent.{ ExecutorService, ThreadPoolExecutor } + +import scala.concurrent.forkjoin.ForkJoinPool + +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ After, AfterReturning, Around, Aspect, Before, DeclareMixin, Pointcut } +import org.slf4j.LoggerFactory + +import com.workday.prometheus.akka.{ ForkJoinPoolMetrics, MetricsConfig, ThreadPoolMetrics } + +import akka.actor.{ ActorContext, ActorSystem, ActorSystemImpl, Props } +import akka.dispatch.{ Dispatcher, Dispatchers, ExecutorServiceDelegate, MessageDispatcher } +import akka.monitor.instrumentation.LookupDataAware.LookupData + +@Aspect +class DispatcherInstrumentation { + + val logger = LoggerFactory.getLogger(classOf[DispatcherInstrumentation]) + + @Pointcut("execution(* akka.actor.ActorSystemImpl.start(..)) && this(system)") + def actorSystemInitialization(system: ActorSystemImpl): Unit = {} + + @Before("actorSystemInitialization(system)") + def afterActorSystemInitialization(system: ActorSystemImpl): Unit = { + system.dispatchers.asInstanceOf[ActorSystemAware].actorSystem = system + + // The default dispatcher for the actor system is looked up in the ActorSystemImpl's initialization code and we + // can't get the Metrics extension there since the ActorSystem is not yet fully constructed. To workaround that + // we are manually selecting and registering the default dispatcher with the Metrics extension. All other dispatchers + // will by registered by the instrumentation below. + + // Yes, reflection sucks, but this piece of code is only executed once on ActorSystem's startup. + val defaultDispatcher = system.dispatcher + val defaultDispatcherExecutor = extractExecutor(defaultDispatcher.asInstanceOf[MessageDispatcher]) + registerDispatcher(Dispatchers.DefaultDispatcherId, defaultDispatcherExecutor, Some(system)) + } + + private def extractExecutor(dispatcher: MessageDispatcher): ExecutorService = { + val executorServiceMethod: Method = { + // executorService is protected + val method = classOf[Dispatcher].getDeclaredMethod("executorService") + method.setAccessible(true) + method + } + + dispatcher match { + case x: Dispatcher ⇒ + val executor = executorServiceMethod.invoke(x) match { + case delegate: ExecutorServiceDelegate ⇒ delegate.executor + case other ⇒ other + } + executor.asInstanceOf[ExecutorService] + } + } + + private def registerDispatcher(dispatcherName: String, executorService: ExecutorService, + system: Option[ActorSystem] = None): Unit = { + val prefixedName = system match { + case Some(s) => s"${s.name}_${dispatcherName}" + case None => dispatcherName + } + if (MetricsConfig.shouldTrack(MetricsConfig.Dispatcher, prefixedName)) { + executorService match { + case fjp: ForkJoinPool ⇒ ForkJoinPoolMetrics.add(prefixedName, fjp) + case tpe: ThreadPoolExecutor ⇒ ThreadPoolMetrics.add(prefixedName, tpe) + case other ⇒ logger.warn(s"Unhandled Dispatcher Execution Service ${other.getClass.getName}") + } + } + } + + @Pointcut("execution(* akka.dispatch.Dispatchers.lookup(..)) && this(dispatchers) && args(dispatcherName)") + def dispatchersLookup(dispatchers: ActorSystemAware, dispatcherName: String) = {} + + @Around("dispatchersLookup(dispatchers, dispatcherName)") + def aroundDispatchersLookup(pjp: ProceedingJoinPoint, dispatchers: ActorSystemAware, dispatcherName: String): Any = + LookupDataAware.withLookupData(LookupData(dispatcherName, dispatchers.actorSystem)) { + pjp.proceed() + } + + @Pointcut("initialization(akka.dispatch.ExecutorServiceFactory.new(..)) && target(factory)") + def executorServiceFactoryInitialization(factory: LookupDataAware): Unit = {} + + @After("executorServiceFactoryInitialization(factory)") + def afterExecutorServiceFactoryInitialization(factory: LookupDataAware): Unit = + factory.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.ExecutorServiceFactory+.createExecutorService()) && this(factory) && !cflow(execution(* akka.dispatch.Dispatcher.shutdown()))") + def createExecutorService(factory: LookupDataAware): Unit = {} + + @AfterReturning(pointcut = "createExecutorService(factory)", returning = "executorService") + def afterCreateExecutorService(factory: LookupDataAware, executorService: ExecutorService): Unit = { + val lookupData = factory.lookupData + + // lookupData.actorSystem will be null only during the first lookup of the default dispatcher during the + // ActorSystemImpl's initialization. + if (lookupData.actorSystem != null) + registerDispatcher(lookupData.dispatcherName, executorService) + } + + @Pointcut("initialization(akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.new(..)) && this(lazyExecutor)") + def lazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorInitialization(lazyExecutor)") + def afterLazyExecutorInitialization(lazyExecutor: LookupDataAware): Unit = + lazyExecutor.lookupData = LookupDataAware.currentLookupData + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.copy()) && this(lazyExecutor)") + def lazyExecutorCopy(lazyExecutor: LookupDataAware): Unit = {} + + @Around("lazyExecutorCopy(lazyExecutor)") + def aroundLazyExecutorCopy(pjp: ProceedingJoinPoint, lazyExecutor: LookupDataAware): Any = + LookupDataAware.withLookupData(lazyExecutor.lookupData) { + pjp.proceed() + } + + @Pointcut("execution(* akka.dispatch.Dispatcher.LazyExecutorServiceDelegate.shutdown()) && this(lazyExecutor)") + def lazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = {} + + @After("lazyExecutorShutdown(lazyExecutor)") + def afterLazyExecutorShutdown(lazyExecutor: LookupDataAware): Unit = { + import lazyExecutor.lookupData + + if (lookupData.actorSystem != null) + lazyExecutor.asInstanceOf[ExecutorServiceDelegate].executor match { + case fjp: ForkJoinPool ⇒ ForkJoinPoolMetrics.remove(lazyExecutor.lookupData.dispatcherName) + case tpe: ThreadPoolExecutor ⇒ ThreadPoolMetrics.remove(lazyExecutor.lookupData.dispatcherName) + case other ⇒ // nothing to remove. + } + } + + @Pointcut("execution(* akka.routing.BalancingPool.newRoutee(..)) && args(props, context)") + def createNewRouteeOnBalancingPool(props: Props, context: ActorContext): Unit = {} + + @Around("createNewRouteeOnBalancingPool(props, context)") + def aroundCreateNewRouteeOnBalancingPool(pjp: ProceedingJoinPoint, props: Props, context: ActorContext): Any = { + val deployPath = context.self.path.elements.drop(1).mkString("/", "/", "") + val dispatcherId = s"BalancingPool-$deployPath" + + LookupDataAware.withLookupData(LookupData(dispatcherId, context.system)) { + pjp.proceed() + } + } +} + +@Aspect +class DispatcherMetricCollectionInfoIntoDispatcherMixin { + + @DeclareMixin("akka.dispatch.Dispatchers") + def mixinActorSystemAwareToDispatchers: ActorSystemAware = ActorSystemAware() + + @DeclareMixin("akka.dispatch.Dispatcher.LazyExecutorServiceDelegate") + def mixinLookupDataAwareToExecutors: LookupDataAware = LookupDataAware() + + @DeclareMixin("akka.dispatch.ExecutorServiceFactory+") + def mixinActorSystemAwareToDispatcher: LookupDataAware = LookupDataAware() +} + +trait ActorSystemAware { + @volatile var actorSystem: ActorSystem = _ +} + +object ActorSystemAware { + def apply(): ActorSystemAware = new ActorSystemAware {} +} + +trait LookupDataAware { + @volatile var lookupData: LookupData = _ +} + +object LookupDataAware { + case class LookupData(dispatcherName: String, actorSystem: ActorSystem) + + private val _currentDispatcherLookupData = new ThreadLocal[LookupData] + + def apply() = new LookupDataAware {} + + def currentLookupData: LookupData = _currentDispatcherLookupData.get() + + def withLookupData[T](lookupData: LookupData)(thunk: ⇒ T): T = { + _currentDispatcherLookupData.set(lookupData) + val result = thunk + _currentDispatcherLookupData.remove() + + result + } +} + +object AkkaDispatcherMetrics { + val Category = "akka-dispatcher" +} diff --git a/src/main/scala/akka/monitor/instrumentation/EnvelopeInstrumentation.scala b/src/main/scala/akka/monitor/instrumentation/EnvelopeInstrumentation.scala new file mode 100644 index 0000000..592a0e1 --- /dev/null +++ b/src/main/scala/akka/monitor/instrumentation/EnvelopeInstrumentation.scala @@ -0,0 +1,47 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package akka.monitor.instrumentation + +import org.aspectj.lang.annotation.{ DeclareMixin, Aspect } +import kamon.util.RelativeNanoTimestamp + +case class EnvelopeContext(nanoTime: RelativeNanoTimestamp) + +object EnvelopeContext { + val Empty = EnvelopeContext(RelativeNanoTimestamp.zero) +} + +trait InstrumentedEnvelope extends Serializable { + def envelopeContext(): EnvelopeContext + def setEnvelopeContext(envelopeContext: EnvelopeContext): Unit +} + +object InstrumentedEnvelope { + def apply(): InstrumentedEnvelope = new InstrumentedEnvelope { + var envelopeContext: EnvelopeContext = _ + + override def setEnvelopeContext(envelopeContext: EnvelopeContext): Unit = + this.envelopeContext = envelopeContext + } +} + +@Aspect +class EnvelopeContextIntoEnvelopeMixin { + + @DeclareMixin("akka.dispatch.Envelope") + def mixinInstrumentationToEnvelope: InstrumentedEnvelope = InstrumentedEnvelope() +} diff --git a/src/main/scala/akka/monitor/instrumentation/RouterInstrumentation.scala b/src/main/scala/akka/monitor/instrumentation/RouterInstrumentation.scala new file mode 100644 index 0000000..15da69d --- /dev/null +++ b/src/main/scala/akka/monitor/instrumentation/RouterInstrumentation.scala @@ -0,0 +1,70 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package akka.monitor.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.{ After, Around, Aspect, DeclareMixin, Pointcut } + +import akka.actor.{ ActorRef, ActorSystem, Cell, Props } +import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell + +@Aspect +class RoutedActorCellInstrumentation { + + def routerInstrumentation(cell: Cell): RouterMonitor = + cell.asInstanceOf[RouterInstrumentationAware].routerInstrumentation + + @Pointcut("execution(akka.routing.RoutedActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, routeeProps, supervisor)") + def routedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = {} + + @After("routedActorCellCreation(cell, system, ref, props, dispatcher, routeeProps, supervisor)") + def afterRoutedActorCellCreation(cell: RoutedActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, routeeProps: Props, supervisor: ActorRef): Unit = { + cell.asInstanceOf[RouterInstrumentationAware].setRouterInstrumentation( + RouterMonitor.createRouterInstrumentation(cell)) + } + + @Pointcut("execution(* akka.routing.RoutedActorCell.sendMessage(*)) && this(cell) && args(envelope)") + def sendMessageInRouterActorCell(cell: RoutedActorCell, envelope: Envelope) = {} + + @Around("sendMessageInRouterActorCell(cell, envelope)") + def aroundSendMessageInRouterActorCell(pjp: ProceedingJoinPoint, cell: RoutedActorCell, envelope: Envelope): Any = { + routerInstrumentation(cell).processMessage(pjp) + } +} + +trait RouterInstrumentationAware { + def routerInstrumentation: RouterMonitor + def setRouterInstrumentation(ai: RouterMonitor): Unit +} + +object RouterInstrumentationAware { + def apply(): RouterInstrumentationAware = new RouterInstrumentationAware { + private var _ri: RouterMonitor = _ + + override def setRouterInstrumentation(ai: RouterMonitor): Unit = _ri = ai + override def routerInstrumentation: RouterMonitor = _ri + } +} + +@Aspect +class MetricsIntoRouterCellsMixin { + + @DeclareMixin("akka.routing.RoutedActorCell") + def mixinActorCellMetricsToRoutedActorCell: RouterInstrumentationAware = RouterInstrumentationAware() + +} diff --git a/src/main/scala/akka/monitor/instrumentation/RouterMonitor.scala b/src/main/scala/akka/monitor/instrumentation/RouterMonitor.scala new file mode 100644 index 0000000..e795896 --- /dev/null +++ b/src/main/scala/akka/monitor/instrumentation/RouterMonitor.scala @@ -0,0 +1,75 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package akka.monitor.instrumentation + +import org.aspectj.lang.ProceedingJoinPoint + +import com.workday.prometheus.akka.RouterMetrics + +import akka.actor.Cell +import kamon.metric.Entity +import kamon.util.RelativeNanoTimestamp + +trait RouterMonitor { + def processMessage(pjp: ProceedingJoinPoint): AnyRef + def processFailure(failure: Throwable): Unit + def cleanup(): Unit + + def routeeAdded(): Unit + def routeeRemoved(): Unit +} + +object RouterMonitor { + + def createRouterInstrumentation(cell: Cell): RouterMonitor = { + val cellInfo = CellInfo.cellInfoFor(cell, cell.system, cell.self, cell.parent, false) + def routerMetrics = RouterMetrics.metricsFor(cellInfo.entity) + + if (cellInfo.isTracked) + new MetricsOnlyRouterMonitor(cellInfo.entity, routerMetrics) + else NoOpRouterMonitor + } +} + +object NoOpRouterMonitor extends RouterMonitor { + def processMessage(pjp: ProceedingJoinPoint): AnyRef = pjp.proceed() + def processFailure(failure: Throwable): Unit = {} + def routeeAdded(): Unit = {} + def routeeRemoved(): Unit = {} + def cleanup(): Unit = {} +} + +class MetricsOnlyRouterMonitor(entity: Entity, routerMetrics: RouterMetrics) extends RouterMonitor { + + def processMessage(pjp: ProceedingJoinPoint): AnyRef = { + val timestampBeforeProcessing = RelativeNanoTimestamp.now + + try { + pjp.proceed() + } finally { + val timestampAfterProcessing = RelativeNanoTimestamp.now + val routingTime = timestampAfterProcessing - timestampBeforeProcessing + + routerMetrics.routingTime.inc(routingTime.nanos) + } + } + + def processFailure(failure: Throwable): Unit = {} + def routeeAdded(): Unit = {} + def routeeRemoved(): Unit = {} + def cleanup(): Unit = {} +} diff --git a/src/main/scala/com/workday/prometheus/akka/ActorGroupMetrics.scala b/src/main/scala/com/workday/prometheus/akka/ActorGroupMetrics.scala new file mode 100644 index 0000000..de7c003 --- /dev/null +++ b/src/main/scala/com/workday/prometheus/akka/ActorGroupMetrics.scala @@ -0,0 +1,28 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import io.prometheus.client._ + +object ActorGroupMetrics { + val mailboxSize = Gauge.build().name(s"akka_actor_group_mailboxes_size").help("Akka Actor Group mailboxes size").labelNames("groupName").register() + val processingTime = Counter.build().name(s"akka_actor_group_processing_time").help("Akka Actor Group processing time (Nanos)").labelNames("groupName").register() + val timeInMailbox = Counter.build().name(s"akka_actor_group_time_in_mailboxes").help("Akka Actor Group time in mailboxes (Nanos)").labelNames("groupName").register() + val messages = Counter.build().name(s"akka_actor_group_messages_count").help("Akka Actor Group messages").labelNames("groupName").register() + val actorCount = Gauge.build().name(s"akka_actor_group_actor_count").help("Akka Actor Group actor count").labelNames("groupName").register() + val errors = Counter.build().name(s"akka_actor_group_error_count").help("Akka Actor Group errors").labelNames("groupName").register() +} diff --git a/src/main/scala/com/workday/prometheus/akka/ActorMetrics.scala b/src/main/scala/com/workday/prometheus/akka/ActorMetrics.scala new file mode 100644 index 0000000..7531843 --- /dev/null +++ b/src/main/scala/com/workday/prometheus/akka/ActorMetrics.scala @@ -0,0 +1,37 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import scala.collection.JavaConverters._ + +import io.prometheus.client._ +import kamon.metric.Entity + +object ActorMetrics { + private val map = new java.util.concurrent.ConcurrentHashMap[Entity, ActorMetrics]().asScala + def metricsFor(e: Entity) = map.getOrElseUpdate(e, new ActorMetrics(e)) + def hasMetricsFor(e: Entity) = map.contains(e) +} + +class ActorMetrics(entity: Entity) { + val actorName = metricFriendlyActorName(entity.name) + val mailboxSize = Gauge.build().name(s"akka_actor_mailbox_size_$actorName").help("Akka Actor mailbox size").register() + val processingTime = Counter.build().name(s"akka_actor_processing_time_$actorName").help("Akka Actor processing time (Nanos)").register() + val timeInMailbox = Counter.build().name(s"akka_actor_time_in_mailbox_$actorName").help("Akka Actor time in mailbox (Nanos)").register() + val messages = Counter.build().name(s"akka_actor_messages_count_$actorName").help("Akka Actor messages").register() + val errors = Counter.build().name(s"akka_actor_error_count_$actorName").help("Akka Actor errors").register() +} diff --git a/src/main/scala/com/workday/prometheus/akka/ForkJoinPoolMetrics.scala b/src/main/scala/com/workday/prometheus/akka/ForkJoinPoolMetrics.scala new file mode 100644 index 0000000..041aeba --- /dev/null +++ b/src/main/scala/com/workday/prometheus/akka/ForkJoinPoolMetrics.scala @@ -0,0 +1,90 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters.{ mapAsScalaConcurrentMapConverter, seqAsJavaListConverter } +import scala.concurrent.forkjoin.ForkJoinPool + +import io.prometheus.client.Collector +import io.prometheus.client.Collector.MetricFamilySamples +import io.prometheus.client.GaugeMetricFamily + +object ForkJoinPoolMetrics extends Collector { + val map = new ConcurrentHashMap[String, Option[ForkJoinPool]].asScala + this.register() + override def collect(): java.util.List[MetricFamilySamples] = { + val dispatcherNameList = List("dispatcherName").asJava + val parallelismGauge = new GaugeMetricFamily("akka_dispatcher_forkjoinpool_parellelism", + "Akka ForkJoinPool Dispatcher Parellelism", dispatcherNameList) + val poolSizeGauge = new GaugeMetricFamily("akka_dispatcher_forkjoinpool_pool_size", + "Akka ForkJoinPool Dispatcher Pool Size", dispatcherNameList) + val activeThreadCountGauge = new GaugeMetricFamily("akka_dispatcher_forkjoinpool_active_thread_count", + "Akka ForkJoinPool Dispatcher Active Thread Count", dispatcherNameList) + val runningThreadCountGauge = new GaugeMetricFamily("akka_dispatcher_forkjoinpool_running_thread_count", + "Akka ForkJoinPool Dispatcher Running Thread Count", dispatcherNameList) + val queuedTaskCountGauge = new GaugeMetricFamily("akka_dispatcher_forkjoinpool_queued_task_count", + "Akka ForkJoinPool Dispatcher Queued Task Count", dispatcherNameList) + val queuedSubmissionCountGauge = new GaugeMetricFamily("akka_dispatcher_forkjoinpool_queued_submission_count", + "Akka ForkJoinPool Dispatcher Queued Submission Count", dispatcherNameList) + val stealCountGauge = new GaugeMetricFamily("akka_dispatcher_forkjoinpool_steal_count", + "Akka ForkJoinPool Dispatcher Steal Count", dispatcherNameList) + map.foreach { case (dispatcherName, fjpOption) => + val dispatcherNameList = List(dispatcherName).asJava + fjpOption match { + case Some(fjp) => { + parallelismGauge.addMetric(dispatcherNameList, fjp.getParallelism) + poolSizeGauge.addMetric(dispatcherNameList, fjp.getPoolSize) + activeThreadCountGauge.addMetric(dispatcherNameList, fjp.getActiveThreadCount) + runningThreadCountGauge.addMetric(dispatcherNameList, fjp.getRunningThreadCount) + queuedSubmissionCountGauge.addMetric(dispatcherNameList, fjp.getQueuedSubmissionCount) + queuedTaskCountGauge.addMetric(dispatcherNameList, fjp.getQueuedTaskCount) + stealCountGauge.addMetric(dispatcherNameList, fjp.getStealCount) + } + case None => { + parallelismGauge.addMetric(dispatcherNameList, 0) + poolSizeGauge.addMetric(dispatcherNameList, 0) + activeThreadCountGauge.addMetric(dispatcherNameList, 0) + runningThreadCountGauge.addMetric(dispatcherNameList, 0) + queuedSubmissionCountGauge.addMetric(dispatcherNameList, 0) + queuedTaskCountGauge.addMetric(dispatcherNameList, 0) + stealCountGauge.addMetric(dispatcherNameList, 0) + } + } + + } + val jul = new java.util.ArrayList[MetricFamilySamples] + jul.add(parallelismGauge) + jul.add(poolSizeGauge) + jul.add(activeThreadCountGauge) + jul.add(runningThreadCountGauge) + jul.add(queuedSubmissionCountGauge) + jul.add(queuedTaskCountGauge) + jul.add(stealCountGauge) + Collections.unmodifiableList(jul) + } + + def add(dispatcherName: String, fjp: ForkJoinPool): Unit = { + map.put(dispatcherName, Some(fjp)) + } + + def remove(dispatcherName: String): Unit = { + map.put(dispatcherName, None) + } +} diff --git a/src/main/scala/com/workday/prometheus/akka/MetricsConfig.scala b/src/main/scala/com/workday/prometheus/akka/MetricsConfig.scala new file mode 100644 index 0000000..c35c67c --- /dev/null +++ b/src/main/scala/com/workday/prometheus/akka/MetricsConfig.scala @@ -0,0 +1,79 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import com.typesafe.config.{ Config, ConfigFactory, ConfigParseOptions, ConfigResolveOptions } + +import kamon.metric.EntityFilter +import kamon.util.{ GlobPathFilter, RegexPathFilter } + +object MetricsConfig { + val Dispatcher = "akka-dispatcher" + val Router = "akka-router" + val Actor = "akka-actor" + val ActorGroups = "akka-actor-groups" + + private val defaultConfig = ConfigFactory.load(this.getClass.getClassLoader, ConfigParseOptions.defaults(), ConfigResolveOptions.defaults().setAllowUnresolved(true)) + private val metricFiltersConfig = defaultConfig.getConfig("workday.akka.monitor.metric.filters") + + implicit class Syntax(val config: Config) extends AnyVal { + def firstLevelKeys: Set[String] = { + import scala.collection.JavaConverters._ + + config.entrySet().asScala.map { + case entry ⇒ entry.getKey.takeWhile(_ != '.') + } toSet + } + } + + private val filters = createFilters(metricFiltersConfig, metricFiltersConfig.firstLevelKeys.filterNot(_ == ActorGroups)) + private val groupFilters = { + if(metricFiltersConfig.hasPath(ActorGroups)) { + val cfg = metricFiltersConfig.getConfig(ActorGroups) + createFilters(cfg, cfg.firstLevelKeys) + } else { + Map.empty + } + } + + private def createFilters(cfg: Config, categories: Set[String]): Map[String, EntityFilter] = { + import scala.collection.JavaConverters._ + categories map { category: String ⇒ + val asRegex = if (cfg.hasPath(s"$category.asRegex")) cfg.getBoolean(s"$category.asRegex") else false + val includes = cfg.getStringList(s"$category.includes").asScala.map(inc ⇒ + if (asRegex) RegexPathFilter(inc) else new GlobPathFilter(inc)).toList + val excludes = cfg.getStringList(s"$category.excludes").asScala.map(exc ⇒ + if (asRegex) RegexPathFilter(exc) else new GlobPathFilter(exc)).toList + + (category, EntityFilter(includes, excludes)) + } toMap + } + + def shouldTrack(category: String, entityName: String): Boolean = { + filters.get(category) match { + case Some(filter) => filter.accept(entityName) + case None => false + } + } + + def actorShouldBeTrackedUnderGroups(entityName: String): List[String] = { + val iterable = for((groupName, filter) <- groupFilters if filter.accept(entityName)) yield groupName + iterable.toList + } + + def groupNames: Set[String] = groupFilters.keys.toSet +} diff --git a/src/main/scala/com/workday/prometheus/akka/RouterMetrics.scala b/src/main/scala/com/workday/prometheus/akka/RouterMetrics.scala new file mode 100644 index 0000000..2a51d5e --- /dev/null +++ b/src/main/scala/com/workday/prometheus/akka/RouterMetrics.scala @@ -0,0 +1,36 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import scala.collection.JavaConverters._ + +import io.prometheus.client.Counter +import kamon.metric.Entity + +object RouterMetrics { + private val map = new java.util.concurrent.ConcurrentHashMap[Entity, RouterMetrics]().asScala + def metricsFor(e: Entity) = map.getOrElseUpdate(e, new RouterMetrics(e)) + def hasMetricsFor(e: Entity) = map.contains(e) +} + +class RouterMetrics(entity: Entity) { + val actorName = metricFriendlyActorName(entity.name) + val routingTime = Counter.build().name(s"akka_router_routing_time_$actorName").help("Akka Router routing time (Nanos)").register() + val processingTime = Counter.build().name(s"akka_router_processing_time_$actorName").help("Akka Router processing time (Nanos)").register() + val timeInMailbox = Counter.build().name(s"akka_router_time_in_mailbox_$actorName").help("Akka Router time in mailbox (Nanos)").register() + val errors = Counter.build().name(s"akka_router_error_count_$actorName").help("Akka Router errors").register() +} diff --git a/src/main/scala/com/workday/prometheus/akka/ThreadPoolMetrics.scala b/src/main/scala/com/workday/prometheus/akka/ThreadPoolMetrics.scala new file mode 100644 index 0000000..7b2e354 --- /dev/null +++ b/src/main/scala/com/workday/prometheus/akka/ThreadPoolMetrics.scala @@ -0,0 +1,88 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import java.util.Collections +import java.util.concurrent.{ ConcurrentHashMap, ThreadPoolExecutor } + +import scala.collection.JavaConverters.{ mapAsScalaConcurrentMapConverter, seqAsJavaListConverter } + +import io.prometheus.client.Collector +import io.prometheus.client.Collector.MetricFamilySamples +import io.prometheus.client.GaugeMetricFamily + +object ThreadPoolMetrics extends Collector { + val map = new ConcurrentHashMap[String, Option[ThreadPoolExecutor]].asScala + this.register() + override def collect(): java.util.List[MetricFamilySamples] = { + val dispatcherNameList = List("dispatcherName").asJava + val activeThreadCountGauge = new GaugeMetricFamily("akka_dispatcher_threadpoolexecutor_active_thread_count", + "Akka ThreadPool Dispatcher Active Thread Count", dispatcherNameList) + val corePoolSizeGauge = new GaugeMetricFamily("akka_dispatcher_threadpoolexecutor_core_pool_size", + "Akka ThreadPool Dispatcher Core Pool Size", dispatcherNameList) + val currentPoolSizeGauge = new GaugeMetricFamily("akka_dispatcher_threadpoolexecutor_current_pool_size", + "Akka ThreadPool Dispatcher Current Pool Size", dispatcherNameList) + val largestPoolSizeGauge = new GaugeMetricFamily("akka_dispatcher_threadpoolexecutor_largest_pool_size", + "Akka ThreadPool Dispatcher Largest Pool Size", dispatcherNameList) + val maxPoolSizeGauge = new GaugeMetricFamily("akka_dispatcher_threadpoolexecutor_max_pool_size", + "Akka ThreadPool Dispatcher Max Pool Size", dispatcherNameList) + val completedTaskCountGauge = new GaugeMetricFamily("akka_dispatcher_threadpoolexecutor_completed_task_count", + "Akka ThreadPoolExecutor Dispatcher Completed Task Count", dispatcherNameList) + val totalTaskCountGauge = new GaugeMetricFamily("akka_dispatcher_threadpoolexecutor_total_task_count", + "Akka ThreadPoolExecutor Dispatcher Total Task Count", dispatcherNameList) + map.foreach { case (dispatcherName, tpeOption) => + val dispatcherNameList = List(dispatcherName).asJava + tpeOption match { + case Some(tpe) => { + activeThreadCountGauge.addMetric(dispatcherNameList, tpe.getActiveCount) + corePoolSizeGauge.addMetric(dispatcherNameList, tpe.getCorePoolSize) + currentPoolSizeGauge.addMetric(dispatcherNameList, tpe.getPoolSize) + largestPoolSizeGauge.addMetric(dispatcherNameList, tpe.getLargestPoolSize) + maxPoolSizeGauge.addMetric(dispatcherNameList, tpe.getMaximumPoolSize) + completedTaskCountGauge.addMetric(dispatcherNameList, tpe.getCompletedTaskCount) + totalTaskCountGauge.addMetric(dispatcherNameList, tpe.getTaskCount) + } + case None => { + activeThreadCountGauge.addMetric(dispatcherNameList, 0) + corePoolSizeGauge.addMetric(dispatcherNameList, 0) + currentPoolSizeGauge.addMetric(dispatcherNameList, 0) + largestPoolSizeGauge.addMetric(dispatcherNameList, 0) + maxPoolSizeGauge.addMetric(dispatcherNameList, 0) + completedTaskCountGauge.addMetric(dispatcherNameList, 0) + totalTaskCountGauge.addMetric(dispatcherNameList, 0) + } + } + } + val jul = new java.util.ArrayList[MetricFamilySamples] + jul.add(activeThreadCountGauge) + jul.add(corePoolSizeGauge) + jul.add(currentPoolSizeGauge) + jul.add(largestPoolSizeGauge) + jul.add(maxPoolSizeGauge) + jul.add(completedTaskCountGauge) + jul.add(totalTaskCountGauge) + Collections.unmodifiableList(jul) + } + + def add(dispatcherName: String, tpe: ThreadPoolExecutor): Unit = { + map.put(dispatcherName, Some(tpe)) + } + + def remove(dispatcherName: String): Unit = { + map.put(dispatcherName, None) + } +} diff --git a/src/main/scala/com/workday/prometheus/akka/package.scala b/src/main/scala/com/workday/prometheus/akka/package.scala new file mode 100644 index 0000000..51302b6 --- /dev/null +++ b/src/main/scala/com/workday/prometheus/akka/package.scala @@ -0,0 +1,32 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus + +import scala.annotation.tailrec + +import io.prometheus.client.Collector + +package object akka { + def metricFriendlyActorName(actorPath: String) = { + Collector.sanitizeMetricName(trimLeadingSlashes(actorPath).toLowerCase.replace("/", "_")) + } + + @tailrec + private def trimLeadingSlashes(s: String): String = { + if (s.startsWith("/")) trimLeadingSlashes(s.substring(1)) else s + } +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..54b4761 --- /dev/null +++ b/src/test/resources/application.conf @@ -0,0 +1,68 @@ +akka { + loglevel = INFO + loggers = [ "akka.event.slf4j.Slf4jLogger" ] + logger-startup-timeout = 30s +} + +workday.akka.monitor { + metric.filters { + akka-actor { + includes = [ "**/user/tracked-**", "*/user/measuring-**", "*/user/stop-**" ] + excludes = [ "*/system/**", "*/user/IO-**", "**/user/tracked-explicitly-excluded-**" ] + } + + akka-router { + includes = [ "**/user/tracked-**", "*/user/measuring-**", "*/user/stop-**" ] + excludes = [ "**/user/tracked-explicitly-excluded-**" ] + } + + akka-dispatcher { + includes = [ "**" ] + excludes = [ "**explicitly-excluded**" ] + } + + akka-actor-groups { + all { + includes = [ "**" ] + excludes = [ "*/system/**", "*/user/IO-**" ] + } + tracked { + includes = [ "**/user/tracked-**" ] + excludes = [ "*/system/**", "*/user/IO-**", "**/user/tracked-explicitly-excluded-**" ] + } + exclusive { + includes = [ "**/MyActor**" ] + excludes = [] + } + } + } +} + +explicitly-excluded { + type = "Dispatcher" + executor = "fork-join-executor" +} + +tracked-fjp { + type = "Dispatcher" + executor = "fork-join-executor" + + fork-join-executor { + parallelism-min = 8 + parallelism-factor = 100.0 + parallelism-max = 22 + } +} + +tracked-tpe { + type = "Dispatcher" + executor = "thread-pool-executor" + + thread-pool-executor { + core-pool-size-min = 7 + core-pool-size-factor = 100.0 + max-pool-size-factor = 100.0 + max-pool-size-max = 21 + core-pool-size-max = 21 + } +} \ No newline at end of file diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 0000000..c336bbf --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/src/test/scala/akka/monitor/instrumentation/EnvelopeSpec.scala b/src/test/scala/akka/monitor/instrumentation/EnvelopeSpec.scala new file mode 100644 index 0000000..f5445a2 --- /dev/null +++ b/src/test/scala/akka/monitor/instrumentation/EnvelopeSpec.scala @@ -0,0 +1,61 @@ +/* + * ========================================================================================= + * Copyright © 2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package akka.monitor.instrumentation + +import com.workday.prometheus.akka.TestKitBaseSpec +import akka.actor.{Actor, ExtendedActorSystem, Props} +import akka.dispatch.Envelope +import kamon.util.RelativeNanoTimestamp + +class EnvelopeSpec extends TestKitBaseSpec("envelope-spec") { + + "EnvelopeInstrumentation" should { + "mixin EnvelopeContext" in { + val actorRef = system.actorOf(Props[NoReply]) + val env = Envelope("msg", actorRef, system).asInstanceOf[Object] + env match { + case e: Envelope with InstrumentedEnvelope => e.setEnvelopeContext(EnvelopeContext(RelativeNanoTimestamp.now)) + case _ => fail("InstrumentedEnvelope is not mixed in") + } + env match { + case s: Serializable => { + import java.io._ + val bos = new ByteArrayOutputStream + val oos = new ObjectOutputStream(bos) + oos.writeObject(env) + oos.close() + akka.serialization.JavaSerializer.currentSystem.withValue(system.asInstanceOf[ExtendedActorSystem]) { + val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())) + val obj = ois.readObject() + ois.close() + obj match { + case e: Envelope with InstrumentedEnvelope => e.envelopeContext() should not be null + case _ => fail("InstrumentedEnvelope is not mixed in") + } + } + } + case _ => fail("envelope is not serializable") + } + } + } +} + +class NoReply extends Actor { + override def receive = { + case any ⇒ + } +} diff --git a/src/test/scala/com/workday/prometheus/akka/ActorGroupMetricsSpec.scala b/src/test/scala/com/workday/prometheus/akka/ActorGroupMetricsSpec.scala new file mode 100644 index 0000000..5184080 --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/ActorGroupMetricsSpec.scala @@ -0,0 +1,132 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + +import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpecLike} +import org.scalatest.concurrent.Eventually + +import akka.actor._ +import akka.routing.RoundRobinPool +import akka.testkit.{TestKit, TestProbe} +import io.prometheus.client.Collector + +class ActorGroupMetricsSpec extends TestKitBaseSpec("ActorGroupMetricsSpec") with BeforeAndAfterEach with Eventually { + + val CountMetricName = "akka_actor_group_actor_count" + + override def beforeEach(): Unit = { + super.beforeEach() + clearGroupMetrics + } + + "the actor group metrics" should { + "respect the configured include and exclude filters" in { + val trackedActor = createTestActor("tracked-actor") + val nonTrackedActor = createTestActor("non-tracked-actor") + val excludedTrackedActor = createTestActor("tracked-explicitly-excluded-actor") + + findGroupRecorder("tracked") should not be empty + findGroupRecorder("exclusive") shouldBe empty + val map = findGroupRecorder("tracked") + map.getOrElse(CountMetricName, -1) shouldEqual 1.0 + map.getOrElse("akka_actor_group_messages_count", -1) shouldEqual 1.0 + map.getOrElse("akka_actor_group_mailboxes_size", -1) shouldEqual 0.0 + + system.stop(trackedActor) + eventually(timeout(5 seconds)) { + findGroupRecorder("tracked").getOrElse(CountMetricName, -1) shouldEqual 0.0 + } + + val trackedActor2 = createTestActor("tracked-actor2") + val trackedActor3 = createTestActor("tracked-actor3") + findGroupRecorder("tracked").getOrElse(CountMetricName, -1) shouldEqual 2.0 + } + + "respect the configured include and exclude filters for routee actors" in { + val trackedRouter = createTestPoolRouter("tracked-router") + val nonTrackedRouter = createTestPoolRouter("non-tracked-router") + val excludedTrackedRouter = createTestPoolRouter("tracked-explicitly-excluded-router") + + findGroupRecorder("tracked") should not be empty + findGroupRecorder("exclusive") shouldBe empty + val map = findGroupRecorder("tracked") + map.getOrElse(CountMetricName, -1) shouldEqual 5.0 + map.getOrElse("akka_actor_group_messages_count", -1) shouldEqual 1.0 + map.getOrElse("akka_actor_group_mailboxes_size", -1) shouldEqual 0.0 + + system.stop(trackedRouter) + eventually(timeout(5 seconds)) { + findGroupRecorder("tracked").getOrElse(CountMetricName, -1) shouldEqual 0.0 + } + + val trackedRouter2 = createTestPoolRouter("tracked-router2") + val trackedRouter3 = createTestPoolRouter("tracked-router3") + findGroupRecorder("tracked").getOrElse(CountMetricName, -1) shouldEqual 10.0 + } + } + + def findGroupRecorder(groupName: String): Map[String, Double] = { + val metrics: List[Collector.MetricFamilySamples] = + ActorGroupMetrics.errors.collect().asScala.toList ++ + ActorGroupMetrics.actorCount.collect().asScala.toList ++ + ActorGroupMetrics.mailboxSize.collect().asScala.toList ++ + ActorGroupMetrics.messages.collect().asScala.toList ++ + ActorGroupMetrics.processingTime.collect().asScala.toList ++ + ActorGroupMetrics.timeInMailbox.collect().asScala.toList + val values = for(samples <- metrics; + sample <- samples.samples.asScala if sample.labelValues.contains(groupName)) + yield (sample.name, sample.value) + values.toMap + } + + def clearGroupMetrics: Unit = { + ActorGroupMetrics.errors.clear() + ActorGroupMetrics.actorCount.clear() + ActorGroupMetrics.mailboxSize.clear() + ActorGroupMetrics.messages.clear() + ActorGroupMetrics.processingTime.clear() + ActorGroupMetrics.timeInMailbox.clear() + } + + def createTestActor(name: String): ActorRef = { + val actor = system.actorOf(Props[ActorMetricsTestActor], name) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + import ActorMetricsTestActor._ + actor.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + actor + } + + def createTestPoolRouter(routerName: String): ActorRef = { + val router = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), routerName) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + import RouterMetricsTestActor._ + router.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + router + } + +} diff --git a/src/test/scala/com/workday/prometheus/akka/ActorMetricsSpec.scala b/src/test/scala/com/workday/prometheus/akka/ActorMetricsSpec.scala new file mode 100644 index 0000000..9b7ad1c --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/ActorMetricsSpec.scala @@ -0,0 +1,62 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import akka.actor._ +import akka.monitor.instrumentation.CellInfo +import akka.testkit.TestProbe +import kamon.metric.Entity + +class ActorMetricsSpec extends TestKitBaseSpec("ActorMetricsSpec") { + + import ActorMetricsTestActor._ + + "the actor metrics" should { + "respect the configured include and exclude filters" in { + val trackedActor = createTestActor("tracked-actor") + val nonTrackedActor = createTestActor("non-tracked-actor") + val excludedTrackedActor = createTestActor("tracked-explicitly-excluded-actor") + + actorMetricsRecorderOf(trackedActor) should not be empty + actorMetricsRecorderOf(nonTrackedActor) shouldBe empty + actorMetricsRecorderOf(excludedTrackedActor) shouldBe empty + + actorMetricsRecorderOf(trackedActor).get.actorName shouldEqual "actormetricsspec_user_tracked_actor" + } + } + + def actorMetricsRecorderOf(ref: ActorRef): Option[ActorMetrics] = { + val name = CellInfo.cellName(system, ref) + val entity = Entity(name, MetricsConfig.Actor) + if (ActorMetrics.hasMetricsFor(entity)) { + Some(ActorMetrics.metricsFor(entity)) + } else { + None + } + } + + def createTestActor(name: String): ActorRef = { + val actor = system.actorOf(Props[ActorMetricsTestActor], name) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + actor.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + actor + } +} diff --git a/src/test/scala/com/workday/prometheus/akka/ActorMetricsTestActor.scala b/src/test/scala/com/workday/prometheus/akka/ActorMetricsTestActor.scala new file mode 100644 index 0000000..bfc0d23 --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/ActorMetricsTestActor.scala @@ -0,0 +1,50 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import akka.actor._ +import scala.concurrent.duration._ + +class ActorMetricsTestActor extends Actor { + import ActorMetricsTestActor._ + + override def receive = { + case Discard ⇒ + case Fail ⇒ throw new ArithmeticException("Division by zero.") + case Ping ⇒ sender ! Pong + case TrackTimings(sendTimestamp, sleep) ⇒ { + val dequeueTimestamp = System.nanoTime() + sleep.map(s ⇒ Thread.sleep(s.toMillis)) + val afterReceiveTimestamp = System.nanoTime() + + sender ! TrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) + } + } +} + +object ActorMetricsTestActor { + case object Ping + case object Pong + case object Fail + case object Discard + + case class TrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) + case class TrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { + def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp + def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp + } +} diff --git a/src/test/scala/com/workday/prometheus/akka/BaseSpec.scala b/src/test/scala/com/workday/prometheus/akka/BaseSpec.scala new file mode 100644 index 0000000..94b82e4 --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/BaseSpec.scala @@ -0,0 +1,30 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import akka.actor.ActorSystem +import akka.testkit.TestKit + +trait BaseSpec extends WordSpecLike with Matchers with BeforeAndAfterAll + +abstract class TestKitBaseSpec(actorSystemName: String) extends TestKit(ActorSystem(actorSystemName)) with BaseSpec { + override def afterAll(): Unit = { + super.afterAll() + TestKit.shutdownActorSystem(system) + } +} diff --git a/src/test/scala/com/workday/prometheus/akka/DispatcherMetricsSpec.scala b/src/test/scala/com/workday/prometheus/akka/DispatcherMetricsSpec.scala new file mode 100644 index 0000000..c9495a5 --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/DispatcherMetricsSpec.scala @@ -0,0 +1,71 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.collection.JavaConverters._ + +import akka.actor._ +import akka.dispatch.MessageDispatcher +import akka.testkit.TestProbe + +class DispatcherMetricsSpec extends TestKitBaseSpec("DispatcherMetricsSpec") { + + sealed trait PoolType + case object ForkJoinPoolType extends PoolType + case object ThreadPoolType extends PoolType + + "the akka dispatcher metrics" should { + "respect the configured include and exclude filters" in { + val defaultDispatcher = forceInit(system.dispatchers.lookup("akka.actor.default-dispatcher")) + val fjpDispatcher = forceInit(system.dispatchers.lookup("tracked-fjp")) + val tpeDispatcher = forceInit(system.dispatchers.lookup("tracked-tpe")) + val excludedDispatcher = forceInit(system.dispatchers.lookup("explicitly-excluded")) + + findDispatcherRecorder(defaultDispatcher.id, ForkJoinPoolType) shouldNot be(empty) + findDispatcherRecorder(fjpDispatcher.id, ForkJoinPoolType) shouldNot be(empty) + findDispatcherRecorder(tpeDispatcher.id, ThreadPoolType) shouldNot be(empty) + findDispatcherRecorder(excludedDispatcher.id, ForkJoinPoolType) should be(empty) + } + } + + def findDispatcherRecorder(dispatcherName: String, poolType: PoolType): Map[String, Double] = { + val metrics = poolType match { + case ForkJoinPoolType => ForkJoinPoolMetrics.collect().asScala.toList + case ThreadPoolType => ThreadPoolMetrics.collect().asScala.toList + } + val values = for(samples <- metrics; + sample <- samples.samples.asScala if findUsingSuffix(sample.labelValues.asScala, dispatcherName)) + yield (sample.name, sample.value) + values.toMap + } + + def findUsingSuffix(list: Seq[String], suffix: String): Boolean = { + list.find(v => v.endsWith(suffix)).isDefined + } + + def forceInit(dispatcher: MessageDispatcher): MessageDispatcher = { + val listener = TestProbe() + Future { + listener.ref ! "init done" + }(dispatcher) + listener.expectMsg("init done") + + dispatcher + } +} diff --git a/src/test/scala/com/workday/prometheus/akka/MetricsConfigSpec.scala b/src/test/scala/com/workday/prometheus/akka/MetricsConfigSpec.scala new file mode 100644 index 0000000..ed8777a --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/MetricsConfigSpec.scala @@ -0,0 +1,41 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +class MetricsConfigSpec extends BaseSpec { + "MetricsConfig" should { + "contain the expected group names" in { + MetricsConfig.groupNames should contain allOf ("all", "tracked", "empty", "exclusive") + } + "track correct actor groups" in { + MetricsConfig.actorShouldBeTrackedUnderGroups("system1/hello/MyActor1") should contain theSameElementsAs List("all", "exclusive") + MetricsConfig.actorShouldBeTrackedUnderGroups("system1/hello/NotMyActor1") should contain theSameElementsAs List("all") + } + "track correct actors" in { + MetricsConfig.shouldTrack(MetricsConfig.Actor, "system1/user/tracked-actor1") shouldBe true + MetricsConfig.shouldTrack(MetricsConfig.Actor, "system1/user/non-tracked-actor1") shouldBe false + } + "track correct routers" in { + MetricsConfig.shouldTrack(MetricsConfig.Router, "system1/user/tracked-pool-router") shouldBe true + MetricsConfig.shouldTrack(MetricsConfig.Router, "system1/user/non-tracked-pool-router") shouldBe false + } + "track correct dispatchers" in { + MetricsConfig.shouldTrack(MetricsConfig.Dispatcher, "system1/hello/MyDispatcher1") shouldBe true + MetricsConfig.shouldTrack(MetricsConfig.Dispatcher, "system1/hello/explicitly-excluded") shouldBe false + } + } +} diff --git a/src/test/scala/com/workday/prometheus/akka/RouterMetricsSpec.scala b/src/test/scala/com/workday/prometheus/akka/RouterMetricsSpec.scala new file mode 100644 index 0000000..a8cf2cd --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/RouterMetricsSpec.scala @@ -0,0 +1,63 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import akka.actor._ +import akka.monitor.instrumentation.CellInfo +import akka.routing._ +import akka.testkit.TestProbe +import kamon.metric.Entity + +class RouterMetricsSpec extends TestKitBaseSpec("RouterMetricsSpec") { + + import RouterMetricsTestActor._ + + "the router metrics" should { + "respect the configured include and exclude filters" in { + val trackedRouter = createTestPoolRouter("tracked-pool-router") + val nonTrackedRouter = createTestPoolRouter("non-tracked-pool-router") + val excludedTrackedRouter = createTestPoolRouter("tracked-explicitly-excluded-pool-router") + + routerMetricsRecorderOf(trackedRouter) should not be empty + routerMetricsRecorderOf(nonTrackedRouter) shouldBe empty + routerMetricsRecorderOf(excludedTrackedRouter) shouldBe empty + + routerMetricsRecorderOf(trackedRouter).get.actorName shouldEqual "routermetricsspec_user_tracked_pool_router" + } + } + + def routerMetricsRecorderOf(ref: ActorRef): Option[RouterMetrics] = { + val name = CellInfo.cellName(system, ref) + val entity = Entity(name, MetricsConfig.Router) + if (RouterMetrics.hasMetricsFor(entity)) { + Some(RouterMetrics.metricsFor(entity)) + } else { + None + } + } + + def createTestPoolRouter(routerName: String): ActorRef = { + val router = system.actorOf(RoundRobinPool(5).props(Props[RouterMetricsTestActor]), routerName) + val initialiseListener = TestProbe() + + // Ensure that the router has been created before returning. + router.tell(Ping, initialiseListener.ref) + initialiseListener.expectMsg(Pong) + + router + } +} diff --git a/src/test/scala/com/workday/prometheus/akka/RouterMetricsTestActor.scala b/src/test/scala/com/workday/prometheus/akka/RouterMetricsTestActor.scala new file mode 100644 index 0000000..b7d2022 --- /dev/null +++ b/src/test/scala/com/workday/prometheus/akka/RouterMetricsTestActor.scala @@ -0,0 +1,49 @@ +/* + * ========================================================================================= + * Copyright © 2017 Workday, Inc. + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package com.workday.prometheus.akka + +import scala.concurrent.duration.Duration +import akka.actor.Actor + +class RouterMetricsTestActor extends Actor { + import RouterMetricsTestActor._ + override def receive = { + case Discard ⇒ + case Fail ⇒ throw new ArithmeticException("Division by zero.") + case Ping ⇒ sender ! Pong + case RouterTrackTimings(sendTimestamp, sleep) ⇒ { + val dequeueTimestamp = System.nanoTime() + sleep.map(s ⇒ Thread.sleep(s.toMillis)) + val afterReceiveTimestamp = System.nanoTime() + + sender ! RouterTrackedTimings(sendTimestamp, dequeueTimestamp, afterReceiveTimestamp) + } + } +} + +object RouterMetricsTestActor { + case object Ping + case object Pong + case object Fail + case object Discard + + case class RouterTrackTimings(sendTimestamp: Long = System.nanoTime(), sleep: Option[Duration] = None) + case class RouterTrackedTimings(sendTimestamp: Long, dequeueTimestamp: Long, afterReceiveTimestamp: Long) { + def approximateTimeInMailbox: Long = dequeueTimestamp - sendTimestamp + def approximateProcessingTime: Long = afterReceiveTimestamp - dequeueTimestamp + } +} diff --git a/version.sbt b/version.sbt new file mode 100644 index 0000000..a3ddb27 --- /dev/null +++ b/version.sbt @@ -0,0 +1 @@ +version in ThisBuild := "0.7.0-SNAPSHOT"