Skip to content

Commit a5c2986

Browse files
committed
Config runtime
1 parent 66a9417 commit a5c2986

File tree

19 files changed

+566
-93
lines changed

19 files changed

+566
-93
lines changed

ansible/group_vars/all

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ controller:
105105
authentication:
106106
spi: "{{ controller_authentication_spi | default('') }}"
107107
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
108+
username: "{{ controller_username | default('controller.user') }}"
109+
password: "{{ controller_password | default('controller.pass') }}"
108110
entitlement:
109111
spi: "{{ controller_entitlement_spi | default('') }}"
110112
protocol: "{{ controller_protocol | default('https') }}"

ansible/roles/controller/tasks/deploy.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@
203203
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
204204
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"
205205

206+
"CONFIG_whisk_credentials_controller_username": "{{ controller.username }}"
207+
"CONFIG_whisk_credentials_controller_password": "{{ controller.password }}"
208+
206209
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
207210
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
208211
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common
19+
20+
case class ControllerCredentials(username: String, password: String)

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,6 @@ object ConfigKeys {
275275
val parameterStorage = "whisk.parameter-storage"
276276

277277
val azBlob = "whisk.azure-blob"
278+
279+
val controllerCredentials = "whisk.credentials.controller"
278280
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,30 @@ object EventMessage extends DefaultJsonProtocol {
426426

427427
def parse(msg: String) = Try(format.read(msg.parseJson))
428428
}
429+
430+
case class RuntimeMessage(runtime: String) extends Message {
431+
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
432+
}
433+
434+
object RuntimeMessage extends DefaultJsonProtocol {
435+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
436+
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
437+
}
438+
439+
case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
440+
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
441+
}
442+
443+
object PrewarmContainerData extends DefaultJsonProtocol {
444+
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
445+
}
446+
447+
case class PrewarmContainerDataList(items: List[PrewarmContainerData])
448+
449+
object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
450+
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
451+
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
452+
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
453+
def write(f: PrewarmContainerDataList) = ???
454+
}
455+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,20 @@ protected[core] object ExecManifest {
5858
mf
5959
}
6060

61+
/**
62+
* Reads runtimes manifest from runtime string
63+
*
64+
* @param runtime
65+
* @return the manifest if initialized successfully, or an failure
66+
*/
67+
protected[core] def initialize(runtime: String): Try[Runtimes] = {
68+
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
69+
val mf = Try(runtime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
70+
var manifest: Option[Runtimes] = None
71+
mf.foreach(m => manifest = Some(m))
72+
mf
73+
}
74+
6175
/**
6276
* Gets existing runtime manifests.
6377
*

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
2222
import akka.event.Logging.InfoLevel
2323
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
24+
import akka.http.scaladsl.model.{StatusCodes, Uri}
2425
import akka.http.scaladsl.model.StatusCodes._
25-
import akka.http.scaladsl.model.Uri
26+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2627
import akka.http.scaladsl.server.Route
2728
import akka.stream.ActorMaterializer
2829
import kamon.Kamon
@@ -31,8 +32,15 @@ import pureconfig.generic.auto._
3132
import spray.json.DefaultJsonProtocol._
3233
import spray.json._
3334
import org.apache.openwhisk.common.Https.HttpsConfig
34-
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
35-
import org.apache.openwhisk.core.WhiskConfig
35+
import org.apache.openwhisk.common.{
36+
AkkaLogging,
37+
ConfigMXBean,
38+
ControllerCredentials,
39+
Logging,
40+
LoggingMarkers,
41+
TransactionId
42+
}
43+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3644
import org.apache.openwhisk.core.connector.MessagingProvider
3745
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3846
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -97,7 +105,7 @@ class Controller(val instance: ControllerInstanceId,
97105
(pathEndOrSingleSlash & get) {
98106
complete(info)
99107
}
100-
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
108+
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
101109
}
102110

103111
// initialize datastores
@@ -176,6 +184,58 @@ class Controller(val instance: ControllerInstanceId,
176184
LogLimit.config,
177185
runtimes,
178186
List(apiV1.basepath()))
187+
188+
private val controllerCredentials = loadConfigOrThrow[ControllerCredentials](ConfigKeys.controllerCredentials)
189+
190+
/**
191+
* config runtime
192+
*/
193+
private val configRuntime = {
194+
implicit val executionContext = actorSystem.dispatcher
195+
(path("config" / "runtime") & post) {
196+
extractCredentials {
197+
case Some(BasicHttpCredentials(username, password)) =>
198+
if (username == controllerCredentials.username && password == controllerCredentials.password) {
199+
entity(as[String]) { runtime =>
200+
val execManifest = ExecManifest.initialize(runtime)
201+
if (execManifest.isFailure) {
202+
logging.info(this, s"received invalid runtimes manifest")
203+
complete(StatusCodes.BadRequest)
204+
} else {
205+
parameter('limit.?) { limit =>
206+
limit match {
207+
case Some(targetValue) =>
208+
val pattern = """\d+:\d"""
209+
if (targetValue.matches(pattern)) {
210+
val invokerArray = targetValue.split(":")
211+
val beginIndex = invokerArray(0).toInt
212+
val finishIndex = invokerArray(1).toInt
213+
if (finishIndex < beginIndex) {
214+
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
215+
} else {
216+
val targetInvokers = (beginIndex to finishIndex).toList
217+
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
218+
logging.info(this, "config runtime request is already sent to target invokers")
219+
complete(StatusCodes.Accepted)
220+
}
221+
} else {
222+
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
223+
}
224+
case None =>
225+
loadBalancer.sendRuntimeToInvokers(runtime, None)
226+
logging.info(this, "config runtime request is already sent to all managed invokers")
227+
complete(StatusCodes.Accepted)
228+
}
229+
}
230+
}
231+
}
232+
} else {
233+
complete(StatusCodes.Unauthorized, "username or password is wrong")
234+
}
235+
case _ => complete(StatusCodes.Unauthorized)
236+
}
237+
}
238+
}
179239
}
180240

181241
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ trait LoadBalancer {
6060
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
6161
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
6262

63+
/**
64+
* send runtime to invokers
65+
*
66+
* @param runtime
67+
* @param targetInvokers
68+
*/
69+
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}
70+
6371
/**
6472
* Returns a message indicating the health of the containers and/or container pool in general.
6573
*

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
4343
import scala.annotation.tailrec
4444
import scala.concurrent.Future
4545
import scala.concurrent.duration.FiniteDuration
46+
import scala.util.{Failure, Success}
4647

4748
/**
4849
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
316317
}
317318
}
318319

320+
/** send runtime to invokers*/
321+
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
322+
val runtimeMessage = RuntimeMessage(runtime)
323+
schedulingState.managedInvokers.filter { manageInvoker =>
324+
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
325+
} foreach { invokerHealth =>
326+
val topic = s"invoker${invokerHealth.id.toInt}"
327+
messageProducer.send(topic, runtimeMessage).andThen {
328+
case Success(_) =>
329+
logging.info(this, s"Successfully posted runtime to topic $topic")
330+
case Failure(_) =>
331+
logging.error(this, s"Failed posted runtime to topic $topic")
332+
}
333+
}
334+
}
335+
319336
override val invokerPool =
320337
invokerPoolFactory.createInvokerPool(
321338
actorSystem,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package org.apache.openwhisk.core.containerpool
1919

2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
2121
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
22-
import org.apache.openwhisk.core.connector.MessageFeed
22+
import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData}
2323
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
2424
import org.apache.openwhisk.core.entity._
2525
import org.apache.openwhisk.core.entity.size._
2626

2727
import scala.annotation.tailrec
2828
import scala.collection.immutable
29+
import scala.collection.mutable.ListBuffer
2930
import scala.concurrent.duration._
3031
import scala.util.{Random, Try}
3132

@@ -37,6 +38,9 @@ case class ColdStartKey(kind: String, memory: ByteSize)
3738

3839
case class WorkerData(data: ContainerData, state: WorkerState)
3940

41+
case class PreWarmConfigList(list: List[PrewarmingConfig])
42+
object PrewarmQuery
43+
4044
case object EmitMetrics
4145

4246
case object AdjustPrewarmedContainer
@@ -74,6 +78,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
7478
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
7579
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
7680
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
81+
var latestPrewarmConfig = prewarmConfig
7782
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
7883
// buffered here to keep order of computation.
7984
// Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -305,6 +310,33 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
305310
case RescheduleJob =>
306311
freePool = freePool - sender()
307312
busyPool = busyPool - sender()
313+
case prewarmConfigList: PreWarmConfigList =>
314+
val passedPrewarmConfig = prewarmConfigList.list
315+
var newPrewarmConfig: List[PrewarmingConfig] = List.empty
316+
latestPrewarmConfig foreach { config =>
317+
newPrewarmConfig = newPrewarmConfig :+ passedPrewarmConfig
318+
.find(passedConfig =>
319+
passedConfig.exec.kind == config.exec.kind && passedConfig.memoryLimit == config.memoryLimit)
320+
.getOrElse(config)
321+
}
322+
latestPrewarmConfig = newPrewarmConfig
323+
// Delete prewarmedPool firstly
324+
prewarmedPool foreach { element =>
325+
val actor = element._1
326+
actor ! Remove
327+
prewarmedPool = prewarmedPool - actor
328+
}
329+
latestPrewarmConfig foreach { config =>
330+
logging.info(
331+
this,
332+
s"add pre-warming ${config.initialCount} ${config.exec.kind} ${config.memoryLimit.toString}")(
333+
TransactionId.invokerWarmup)
334+
(1 to config.initialCount).foreach { _ =>
335+
prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl))
336+
}
337+
}
338+
case PrewarmQuery =>
339+
sender() ! getPrewarmContainer()
308340
case EmitMetrics =>
309341
emitMetrics()
310342

@@ -335,7 +367,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
335367
def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
336368
if (scheduled) {
337369
//on scheduled time, remove expired prewarms
338-
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
370+
ContainerPool.removeExpired(poolConfig, latestPrewarmConfig, prewarmedPool).foreach { p =>
339371
prewarmedPool = prewarmedPool - p
340372
p ! Remove
341373
}
@@ -348,7 +380,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
348380
}
349381
//fill in missing prewarms (replaces any deletes)
350382
ContainerPool
351-
.increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
383+
.increasePrewarms(init, scheduled, coldStartCount, latestPrewarmConfig, prewarmedPool, prewarmStartingPool)
352384
.foreach { c =>
353385
val config = c._1
354386
val currentCount = c._2._1
@@ -382,7 +414,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
382414

383415
/** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
384416
def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
385-
prewarmConfig
417+
latestPrewarmConfig
386418
.filter { config =>
387419
kind == config.exec.kind && memoryLimit == config.memoryLimit
388420
}
@@ -423,7 +455,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
423455

424456
//get the appropriate ttl from prewarm configs
425457
val ttl =
426-
prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl))
458+
latestPrewarmConfig
459+
.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind)
460+
.flatMap(_.reactive.map(_.ttl))
427461
prewarmContainer(action.exec, memory, ttl)
428462
(ref, data)
429463
}
@@ -436,6 +470,31 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
436470
busyPool = busyPool - toDelete
437471
}
438472

473+
/**
474+
* get the prewarm container
475+
* @return
476+
*/
477+
def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = {
478+
val containerDataList = prewarmedPool.values.toList
479+
480+
var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]()
481+
containerDataList.foreach { prewarmData =>
482+
val isInclude = resultList.filter { resultData =>
483+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
484+
}.size > 0
485+
486+
if (isInclude) {
487+
var resultData = resultList.filter { resultData =>
488+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
489+
}.head
490+
resultData.number += 1
491+
} else {
492+
resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1)
493+
}
494+
}
495+
resultList
496+
}
497+
439498
/**
440499
* Calculate if there is enough free memory within a given pool.
441500
*

0 commit comments

Comments
 (0)