From 0c98fd437cda403c3a179a4161960e20c80e04c7 Mon Sep 17 00:00:00 2001 From: Andy Steed Date: Mon, 17 Aug 2020 04:56:00 -0700 Subject: [PATCH 1/4] Add support for Legacy AttachmentStore to support data migration --- .../core/database/ArtifactStoreProvider.scala | 11 + .../CosmosDBMigrationArtifactStore.scala | 600 ++++++++++++++++++ ...smosDBMigrationArtifactStoreProvider.scala | 127 ++++ 3 files changed, 738 insertions(+) create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala index d9622a40dcf..78787f5bebd 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala @@ -48,4 +48,15 @@ trait ArtifactStoreProvider extends Spi { None } } + + protected def getLegacyAttachmentStore[D <: DocumentSerializer: ClassTag]()( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): Option[AttachmentStore] = { + if (ConfigFactory.load().hasPath("whisk.spi.LegacyAttachmentStoreProvider")) { + Some(SpiLoader.get[AttachmentStoreProvider].makeStore[D]()) + } else { + None + } + } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala new file mode 100644 index 00000000000..ab32ed70c74 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import _root_.rx.RxReactiveStreams +import akka.actor.ActorSystem +import akka.event.Logging.InfoLevel +import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri} +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString +import com.microsoft.azure.cosmosdb._ +import com.microsoft.azure.cosmosdb.internal.Constants.Properties +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient +import com.typesafe.config.ConfigFactory +import kamon.metric.MeasurementUnit +import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, Scheduler, TransactionId} +import org.apache.openwhisk.core.database.StoreUtils._ +import org.apache.openwhisk.core.database._ +import org.apache.openwhisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef +import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._ +import org.apache.openwhisk.core.entity.Attachments.Attached +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.http.Messages +import org.apache.openwhisk.spi.SpiLoader +import spray.json._ + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.util.Success + +class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer](protected val collName: String, + protected val config: CosmosDBConfig, + clientRef: DocumentClientRef, + documentHandler: DocumentHandler, + protected val viewMapper: CosmosDBViewMapper, + val inliningConfig: InliningConfig, + val attachmentStore: Option[AttachmentStore], + val legacyAttachmentStore: Option[AttachmentStore])( + implicit system: ActorSystem, + val logging: Logging, + jsonFormat: RootJsonFormat[DocumentAbstraction], + val materializer: ActorMaterializer, + docReader: DocumentReader) + extends ArtifactStore[DocumentAbstraction] + with DefaultJsonProtocol + with DocumentProvider + with CosmosDBSupport + with AttachmentSupport[DocumentAbstraction] { + + private val cosmosScheme = "cosmos" + val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(cosmosScheme) + + protected val client: AsyncDocumentClient = clientRef.get.client + private[cosmosdb] val (database, collection) = initialize() + + private val putToken = createToken("put", read = false) + private val delToken = createToken("del", read = false) + private val getToken = createToken("get") + private val queryToken = createToken("query") + private val countToken = createToken("count") + private val docSizeToken = createDocSizeToken() + + private val documentsSizeToken = createUsageToken("documentsSize", MeasurementUnit.information.kilobytes) + private val indexSizeToken = createUsageToken("indexSize", MeasurementUnit.information.kilobytes) + private val documentCountToken = createUsageToken("documentCount") + + private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt) + + private val clusterIdValue = config.clusterId.map(JsString(_)) + + logging.info( + this, + s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " + + s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}], " + + s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], " + + s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}]") + + private val usageMetricRecorder = config.recordUsageFrequency.map { f => + Scheduler.scheduleWaitAtLeast(f, 10.seconds)(() => recordResourceUsage()) + } + + //Clone the returned instance as these are mutable + def documentCollection(): DocumentCollection = new DocumentCollection(collection.toJson) + + override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher + + override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { + val asJson = d.toDocumentRecord + + val (doc, docSize) = toCosmosDoc(asJson) + val id = doc.getId + val docinfoStr = s"id: $id, rev: ${doc.getETag}" + val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' saving document: '$docinfoStr'") + + val o = if (isNewDocument(doc)) { + client.createDocument(collection.getSelfLink, doc, newRequestOption(id), true) + } else { + client.replaceDocument(doc, matchRevOption(id, doc.getETag)) + } + + val f = o + .head() + .recoverWith { + case e: DocumentClientException if isConflict(e) && isNewDocument(doc) => + val docId = DocId(asJson.fields(_id).convertTo[String]) + //Fetch existing document and check if its deleted + getRaw(docId).flatMap { + case Some(js) => + if (isSoftDeleted(js)) { + //Existing document is soft deleted. So can be replaced. Use the etag of document + //and replace it with document we are trying to add + val etag = js.fields(Properties.E_TAG).convertTo[String] + client.replaceDocument(doc, matchRevOption(id, etag)).head() + } else { + //Trying to create a new document and found an existing + //Document which is valid (not soft delete) then conflict is a valid outcome + throw e + } + case None => + //Document not found. Should not happen unless someone else removed + //Propagate existing exception + throw e + } + } + .transform( + { r => + docSizeToken.histogram.record(docSize) + transid.finished( + this, + start, + s"[PUT] '$collName' completed document: '$docinfoStr', size=$docSize, ru=${r.getRequestCharge}${extraLogs(r)}", + InfoLevel) + collectMetrics(putToken, r.getRequestCharge) + toDocInfo(r.getResource) + }, { + case e: DocumentClientException if isConflict(e) => + transid.finished(this, start, s"[PUT] '$collName', document: '$docinfoStr'; conflict.") + DocumentConflictException("conflict on 'put'") + case e => e + }) + + reportFailure(f, start, failure => s"[PUT] '$collName' internal error, failure: '${failure.getMessage}'") + } + + override protected[database] def del(doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = { + checkDocHasRevision(doc) + val start = transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] '$collName' deleting document: '$doc'") + val f = softDeleteTTL match { + case Some(_) => softDelete(doc) + case None => hardDelete(doc) + } + val g = f + .transform( + { r => + transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'${extraLogs(r)}", InfoLevel) + true + }, { + case e: DocumentClientException if isNotFound(e) => + transid.finished(this, start, s"[DEL] '$collName', document: '$doc'; not found.") + NoDocumentException("not found on 'delete'") + case e: DocumentClientException if isConflict(e) => + transid.finished(this, start, s"[DEL] '$collName', document: '$doc'; conflict.") + DocumentConflictException("conflict on 'delete'") + case e => e + }) + + reportFailure( + g, + start, + failure => s"[DEL] '$collName' internal error, doc: '$doc', failure: '${failure.getMessage}'") + } + + private def hardDelete(doc: DocInfo) = { + val f = client + .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc)) + .head() + f.foreach(r => collectMetrics(delToken, r.getRequestCharge)) + f + } + + private def softDelete(doc: DocInfo)(implicit transid: TransactionId) = { + for { + js <- getAsWhiskJson(doc.id) + r <- softDeletePut(doc, js) + } yield r + } + + private def softDeletePut(docInfo: DocInfo, js: JsObject)(implicit transid: TransactionId) = { + val deletedJs = transform(js, Seq((deleted, Some(JsTrue)))) + val (doc, _) = toCosmosDoc(deletedJs) + softDeleteTTL.foreach(doc.setTimeToLive(_)) + val f = client.replaceDocument(doc, matchRevOption(docInfo)).head() + f.foreach(r => collectMetrics(putToken, r.getRequestCharge)) + f + } + + override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo, + attachmentHandler: Option[(A, Attached) => A] = None)( + implicit transid: TransactionId, + ma: Manifest[A]): Future[A] = { + val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] '$collName' finding document: '$doc'") + + require(doc != null, "doc undefined") + val f = + client + .readDocument(selfLinkOf(doc.id), newRequestOption(doc.id)) + .head() + .transform( + { rr => + collectMetrics(getToken, rr.getRequestCharge) + if (isSoftDeleted(rr.getResource)) { + transid.finished(this, start, s"[GET] '$collName', document: '$doc'; not found.") + // for compatibility + throw NoDocumentException("not found on 'get'") + } else { + val (js, docSize) = getResultToWhiskJsonDoc(rr.getResource) + transid + .finished( + this, + start, + s"[GET] '$collName' completed: found document '$doc',size=$docSize, ru=${rr.getRequestCharge}${extraLogs(rr)}", + InfoLevel) + deserialize[A, DocumentAbstraction](doc, js) + } + }, { + case e: DocumentClientException if isNotFound(e) => + transid.finished(this, start, s"[GET] '$collName', document: '$doc'; not found.") + // for compatibility + throw NoDocumentException("not found on 'get'") + case e => e + }) + .recoverWith { + case _: DeserializationException => throw DocumentUnreadable(Messages.corruptedEntity) + } + + reportFailure( + f, + start, + failure => s"[GET] '$collName' internal error, doc: '$doc', failure: '${failure.getMessage}'") + + } + + override protected[database] def get(id: DocId)(implicit transid: TransactionId): Future[Option[JsObject]] = { + val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET_BY_ID] '$collName' finding document: '$id'") + + val f = client + .readDocument(selfLinkOf(id), newRequestOption(id)) + .head() + .map { rr => + collectMetrics(getToken, rr.getRequestCharge) + if (isSoftDeleted(rr.getResource)) { + transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: '$id' not found") + None + } else { + val (js, _) = getResultToWhiskJsonDoc(rr.getResource) + transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'") + Some(js) + } + } + .recoverWith { + case e: DocumentClientException if isNotFound(e) => + transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: '$id' not found") + Future.successful(None) + } + + reportFailure( + f, + start, + failure => s"[GET_BY_ID] '$collName' internal error, doc: '$id', failure: '${failure.getMessage}'") + } + + /** + * Method exposed for test cases to access the raw json returned by CosmosDB + */ + private[cosmosdb] def getRaw(id: DocId): Future[Option[JsObject]] = { + client + .readDocument(selfLinkOf(id), newRequestOption(id)) + .head() + .map { rr => + val js = rr.getResource.toJson.parseJson.asJsObject + Some(js) + } + .recoverWith { + case e: DocumentClientException if isNotFound(e) => Future.successful(None) + } + } + + private def getAsWhiskJson(id: DocId): Future[JsObject] = { + client + .readDocument(selfLinkOf(id), newRequestOption(id)) + .head() + .map { rr => + val (js, _) = getResultToWhiskJsonDoc(rr.getResource) + collectMetrics(getToken, rr.getRequestCharge) + js + } + } + + override protected[core] def query(table: String, + startKey: List[Any], + endKey: List[Any], + skip: Int, + limit: Int, + includeDocs: Boolean, + descending: Boolean, + reduce: Boolean, + stale: StaleParameter)(implicit transid: TransactionId): Future[List[JsObject]] = { + require(!(reduce && includeDocs), "reduce and includeDocs cannot both be true") + require(!reduce, "Reduce scenario not supported") //TODO Investigate reduce + require(skip >= 0, "skip should be non negative") + require(limit >= 0, "limit should be non negative") + documentHandler.checkIfTableSupported(table) + + val Array(ddoc, viewName) = table.split("/") + + val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] '$collName' searching '$table'") + val realIncludeDocs = includeDocs | documentHandler.shouldAlwaysIncludeDocs(ddoc, viewName) + val realLimit = if (limit > 0) skip + limit else limit + + val querySpec = viewMapper.prepareQuery(ddoc, viewName, startKey, endKey, realLimit, realIncludeDocs, descending) + + val options = newFeedOptions() + val queryMetrics = scala.collection.mutable.Buffer[QueryMetrics]() + if (transid.meta.extraLogging) { + options.setPopulateQueryMetrics(true) + options.setEmitVerboseTracesInQuery(true) + } + + def collectQueryMetrics(r: FeedResponse[Document]): Unit = { + collectMetrics(queryToken, r.getRequestCharge) + queryMetrics.appendAll(r.getQueryMetrics.values().asScala) + } + + val publisher = + RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, options)) + val f = Source + .fromPublisher(publisher) + .wireTap(collectQueryMetrics(_)) + .mapConcat(asVector) + .drop(skip) + .map(queryResultToWhiskJsonDoc) + .map(js => + documentHandler + .transformViewResult(ddoc, viewName, startKey, endKey, realIncludeDocs, js, CosmosDBMigrationArtifactStore.this)) + .mapAsync(1)(identity) + .mapConcat(identity) + .runWith(Sink.seq) + .map(_.toList) + .map(l => if (limit > 0) l.take(limit) else l) + + val g = f.andThen { + case Success(queryResult) => + if (queryMetrics.nonEmpty) { + val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics.toSeq: _*) + logging.debug( + this, + s"[QueryMetricsEnabled] Collection [$collName] - Query [${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]") + } + val stats = viewMapper.recordQueryStats(ddoc, viewName, descending, querySpec.getParameters, queryResult) + val statsToLog = stats.map(s => " " + s).getOrElse("") + transid.finished( + this, + start, + s"[QUERY] '$collName' completed: matched ${queryResult.size}$statsToLog", + InfoLevel) + } + reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, failure: '${failure.getMessage}'") + } + + override protected[core] def count(table: String, + startKey: List[Any], + endKey: List[Any], + skip: Int, + stale: StaleParameter)(implicit transid: TransactionId): Future[Long] = { + require(skip >= 0, "skip should be non negative") + val Array(ddoc, viewName) = table.split("/") + + val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT] '$collName' searching '$table") + val querySpec = viewMapper.prepareCountQuery(ddoc, viewName, startKey, endKey) + + //For aggregates the value is in _aggregates fields + val f = client + .queryDocuments(collection.getSelfLink, querySpec, newFeedOptions()) + .head() + .map { r => + val count = r.getResults.asScala.head.getLong(aggregate).longValue + transid.finished(this, start, s"[COUNT] '$collName' completed: count $count") + collectMetrics(countToken, r.getRequestCharge) + if (count > skip) count - skip else 0L + } + + reportFailure(f, start, failure => s"[COUNT] '$collName' internal error, failure: '${failure.getMessage}'") + } + + override protected[database] def putAndAttach[A <: DocumentAbstraction]( + doc: A, + update: (A, Attached) => A, + contentType: ContentType, + docStream: Source[ByteString, _], + oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = { + attachmentStore match { + case Some(as) => + attachToExternalStore(doc, update, contentType, docStream, oldAttachment, as) + case None => + Future.failed(new IllegalArgumentException( + s" '$cosmosScheme' is now not supported. You must configure an external AttachmentStore for storing attachments")) + } + } + + override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])( + implicit transid: TransactionId): Future[T] = { + val name = attached.attachmentName + val attachmentUri = Uri(name) + attachmentUri.scheme match { + case AttachmentSupport.MemScheme => + memorySource(attachmentUri).runWith(sink) + case s if s == cosmosScheme || attachmentUri.isRelative => + //relative case is for compatibility with earlier naming approach where attachment name would be like 'jarfile' + //Compared to current approach of ':' + Future.failed(new IllegalArgumentException( + s" '$cosmosScheme' is now not supported. You must configure an external AttachmentStore for storing attachments")) + case s if attachmentStore.isDefined && attachmentStore.get.scheme == s => + attachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink) + case s if legacyAttachmentStore.isDefined && legacyAttachmentStore.get.scheme == s => + legacyAttachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink) + case _ => + throw new IllegalArgumentException(s"Unknown attachment scheme in attachment uri $attachmentUri") + } + } + + override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = + attachmentStore + .map(as => as.deleteAttachments(doc.id)) + .getOrElse(Future.successful(true)) // For CosmosDB it is expected that the entire document is deleted. + + override def shutdown(): Unit = { + //Its async so a chance exist for next scheduled job to still trigger + usageMetricRecorder.foreach(system.stop) + attachmentStore.foreach(_.shutdown()) + clientRef.close() + } + + def getResourceUsage(): Future[Option[CollectionResourceUsage]] = { + val opts = new RequestOptions + opts.setPopulateQuotaInfo(true) + client + .readCollection(collection.getSelfLink, opts) + .head() + .map(rr => CollectionResourceUsage(rr.getResponseHeaders.asScala.toMap)) + } + + private def recordResourceUsage() = { + getResourceUsage().map { o => + o.foreach { u => + u.documentsCount.foreach(documentCountToken.gauge.update(_)) + u.documentsSize.foreach(ds => documentsSizeToken.gauge.update(ds.toKB)) + u.indexSize.foreach(is => indexSizeToken.gauge.update(is.toKB)) + logging.info(this, s"Collection usage stats for [$collName] are ${u.asString}") + u.indexingProgress.foreach { i => + if (i < 100) logging.info(this, s"Indexing for collection [$collName] is at $i%") + } + } + o + } + } + + private def isNotFound[A <: DocumentAbstraction](e: DocumentClientException) = + e.getStatusCode == StatusCodes.NotFound.intValue + + private def isConflict(e: DocumentClientException) = { + e.getStatusCode == StatusCodes.Conflict.intValue || e.getStatusCode == StatusCodes.PreconditionFailed.intValue + } + + private def toCosmosDoc(json: JsObject): (Document, Int) = { + val computedJs = documentHandler.computedFields(json) + val computedOpt = if (computedJs.fields.nonEmpty) Some(computedJs) else None + val fieldsToAdd = + Seq( + (cid, Some(JsString(escapeId(json.fields(_id).convertTo[String])))), + (etag, json.fields.get(_rev)), + (computed, computedOpt), + (clusterId, clusterIdValue)) + val fieldsToRemove = Seq(_id, _rev) + val mapped = transform(json, fieldsToAdd, fieldsToRemove) + val jsonString = mapped.compactPrint + val doc = new Document(jsonString) + doc.set(selfLink, createSelfLink(doc.getId)) + doc.setTimeToLive(null) //Disable any TTL if in effect for earlier revision + (doc, jsonString.length) + } + + private def queryResultToWhiskJsonDoc(doc: Document): JsObject = { + val docJson = doc.toJson.parseJson.asJsObject + //If includeDocs is true then document json is to be used + val js = if (doc.has(alias)) docJson.fields(alias).asJsObject else docJson + val id = js.fields(cid).convertTo[String] + toWhiskJsonDoc(js, id, None) + } + + private def getResultToWhiskJsonDoc(doc: Document): (JsObject, Int) = { + checkDoc(doc) + val jsString = doc.toJson + val js = jsString.parseJson.asJsObject + val whiskDoc = toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag))) + (whiskDoc, jsString.length) + } + + private def toDocInfo[T <: Resource](doc: T) = { + checkDoc(doc) + DocInfo(DocId(unescapeId(doc.getId)), DocRevision(doc.getETag)) + } + + private def selfLinkOf(id: DocId) = createSelfLink(escapeId(id.id)) + + private def createSelfLink(id: String) = s"dbs/${database.getId}/colls/${collection.getId}/docs/$id" + + private def matchRevOption(info: DocInfo): RequestOptions = matchRevOption(escapeId(info.id.id), info.rev.rev) + + private def matchRevOption(id: String, etag: String): RequestOptions = { + val options = newRequestOption(id) + val condition = new AccessCondition + condition.setCondition(etag) + options.setAccessCondition(condition) + options + } + + //Using DummyImplicit to allow overloading work with type erasure of DocId AnyVal + private def newRequestOption(id: DocId)(implicit i: DummyImplicit): RequestOptions = newRequestOption(escapeId(id.id)) + + private def newRequestOption(id: String) = { + val options = new RequestOptions + options.setPartitionKey(new PartitionKey(id)) + options + } + + private def newFeedOptions() = { + val options = new FeedOptions() + options.setEnableCrossPartitionQuery(true) + options + } + + private def checkDoc[T <: Resource](doc: T): Unit = { + require(doc.getId != null, s"$doc does not have id field set") + require(doc.getETag != null, s"$doc does not have etag field set") + } + + private def collectMetrics(token: LogMarkerToken, charge: Double): Unit = { + MetricEmitter.emitCounterMetric(token, Math.round(charge)) + } + + private def createToken(action: String, read: Boolean = true): LogMarkerToken = { + val mode = if (read) "read" else "write" + val tags = Map("action" -> action, "mode" -> mode, "collection" -> collName) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", "used", tags = tags)(MeasurementUnit.none) + else LogMarkerToken("cosmosdb", "ru", collName, Some(action))(MeasurementUnit.none) + } + + private def createUsageToken(name: String, unit: MeasurementUnit = MeasurementUnit.none): LogMarkerToken = { + val tags = Map("collection" -> collName) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "used", tags = tags)(unit) + else LogMarkerToken("cosmosdb", name, collName)(unit) + } + + private def createDocSizeToken(): LogMarkerToken = { + val unit = MeasurementUnit.information.bytes + val name = "doc" + val tags = Map("collection" -> collName) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", name, "size", tags = tags)(unit) + else LogMarkerToken("cosmosdb", name, collName)(unit) + } + + private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true + + private def isSoftDeleted(js: JsObject) = js.fields.get(deleted).contains(JsTrue) + + private def isNewDocument(doc: Document) = doc.getETag == null + + private def extraLogs(r: ResourceResponse[_])(implicit tid: TransactionId): String = { + if (tid.meta.extraLogging) { + " " + r.getRequestDiagnosticsString + } else "" + } +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala new file mode 100644 index 00000000000..e8d75a08232 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import java.io.Closeable + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient +import com.typesafe.config.ConfigFactory +import org.apache.openwhisk.common.Logging +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.core.database._ +import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.entity.{DocumentReader, WhiskActivation, WhiskAuth, WhiskEntity} +import pureconfig._ +import pureconfig.generic.auto._ +import spray.json.RootJsonFormat + +import scala.reflect.ClassTag + +case class ClientHolder(client: AsyncDocumentClient) extends Closeable { + override def close(): Unit = client.close() +} + +object CosmosDBMigrationArtifactStoreProvider extends ArtifactStoreProvider { + type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference + private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]() + + RetryMetricsCollector.registerIfEnabled() + + override def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)( + implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, + actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): ArtifactStore[D] = { + val tag = implicitly[ClassTag[D]] + val config = CosmosDBConfig(ConfigFactory.load(), tag.runtimeClass.getSimpleName) + makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore(), getLegacyAttachmentStore()) + } + + def makeArtifactStore[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig, + attachmentStore: Option[AttachmentStore])( + implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, + actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): CosmosDBMigrationArtifactStore[D] = { + + makeStoreForClient(config, createReference(config).reference(), attachmentStore) + } + + private def makeStoreForClient[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig, + clientRef: DocumentClientRef, + attachmentStore: Option[AttachmentStore], + legacyAttachmentStore: Option[AttachmentStore])( + implicit jsonFormat: RootJsonFormat[D], + docReader: DocumentReader, + actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): CosmosDBMigrationArtifactStore[D] = { + + val classTag = implicitly[ClassTag[D]] + val (dbName, handler, viewMapper) = handlerAndMapper(classTag) + + new CosmosDBMigrationArtifactStore( + dbName, + config, + clientRef, + handler, + viewMapper, + loadConfigOrThrow[InliningConfig](ConfigKeys.db), + attachmentStore, + legacyAttachmentStore) + } + + private def handlerAndMapper[D](entityType: ClassTag[D])( + implicit actorSystem: ActorSystem, + logging: Logging, + materializer: ActorMaterializer): (String, DocumentHandler, CosmosDBViewMapper) = { + val entityClass = entityType.runtimeClass + if (entityClass == classOf[WhiskEntity]) ("whisks", WhisksHandler, WhisksViewMapper) + else if (entityClass == classOf[WhiskActivation]) ("activations", ActivationHandler, ActivationViewMapper) + else if (entityClass == classOf[WhiskAuth]) ("subjects", SubjectHandler, SubjectViewMapper) + else throw new IllegalArgumentException(s"Unsupported entity type $entityType") + } + + /* + * This method ensures that all store instances share same client instance and thus the underlying connection pool. + * Synchronization is required to ensure concurrent init of various store instances share same ref instance + */ + private def getOrCreateReference[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig) = synchronized { + val clientRef = clients.getOrElseUpdate(config, createReference(config)) + if (clientRef.isClosed) { + val newRef = createReference(config) + clients.put(config, newRef) + newRef.reference() + } else { + clientRef.reference() + } + } + + private def createReference[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig) = { + val clazz = implicitly[ClassTag[D]].runtimeClass + if (clazz != classOf[WhiskActivation]) { + require(config.timeToLive.isEmpty, s"'timeToLive' should not be specified for ${clazz.getSimpleName}") + } + new ReferenceCounted[ClientHolder](ClientHolder(config.createClient())) + } + +} From e158cb52fb389db8636ce88c773163f339ed564c Mon Sep 17 00:00:00 2001 From: Andy Steed Date: Mon, 17 Aug 2020 06:58:51 -0700 Subject: [PATCH 2/4] scalafmt fixes --- .../CosmosDBMigrationArtifactStore.scala | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala index ab32ed70c74..fa5f3b82ad3 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala @@ -45,19 +45,19 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.Success -class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer](protected val collName: String, - protected val config: CosmosDBConfig, - clientRef: DocumentClientRef, - documentHandler: DocumentHandler, - protected val viewMapper: CosmosDBViewMapper, - val inliningConfig: InliningConfig, - val attachmentStore: Option[AttachmentStore], - val legacyAttachmentStore: Option[AttachmentStore])( - implicit system: ActorSystem, - val logging: Logging, - jsonFormat: RootJsonFormat[DocumentAbstraction], - val materializer: ActorMaterializer, - docReader: DocumentReader) +class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer]( + protected val collName: String, + protected val config: CosmosDBConfig, + clientRef: DocumentClientRef, + documentHandler: DocumentHandler, + protected val viewMapper: CosmosDBViewMapper, + val inliningConfig: InliningConfig, + val attachmentStore: Option[AttachmentStore], + val legacyAttachmentStore: Option[AttachmentStore])(implicit system: ActorSystem, + val logging: Logging, + jsonFormat: RootJsonFormat[DocumentAbstraction], + val materializer: ActorMaterializer, + docReader: DocumentReader) extends ArtifactStore[DocumentAbstraction] with DefaultJsonProtocol with DocumentProvider @@ -356,9 +356,17 @@ class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer]( .mapConcat(asVector) .drop(skip) .map(queryResultToWhiskJsonDoc) - .map(js => - documentHandler - .transformViewResult(ddoc, viewName, startKey, endKey, realIncludeDocs, js, CosmosDBMigrationArtifactStore.this)) + .map( + js => + documentHandler + .transformViewResult( + ddoc, + viewName, + startKey, + endKey, + realIncludeDocs, + js, + CosmosDBMigrationArtifactStore.this)) .mapAsync(1)(identity) .mapConcat(identity) .runWith(Sink.seq) From e31944dc8619e686b42283f75f29ceb4d319d692 Mon Sep 17 00:00:00 2001 From: Andy Steed Date: Mon, 17 Aug 2020 07:33:06 -0700 Subject: [PATCH 3/4] Moved ClientHolder to it's own file and removed all unnecessary imports --- .../core/database/cosmosdb/ClientHolder.scala | 25 +++++++++++++++++++ .../CosmosDBArtifactStoreProvider.scala | 7 ------ .../CosmosDBMigrationArtifactStore.scala | 2 -- ...smosDBMigrationArtifactStoreProvider.scala | 9 +------ 4 files changed, 26 insertions(+), 17 deletions(-) create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/ClientHolder.scala diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/ClientHolder.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/ClientHolder.scala new file mode 100644 index 00000000000..fa493e07a11 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/ClientHolder.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.database.cosmosdb + +import java.io.Closeable +import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient + +case class ClientHolder(client: AsyncDocumentClient) extends Closeable { + override def close(): Unit = client.close() +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala index d71e4586fa6..abaf5889973 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala @@ -17,11 +17,8 @@ package org.apache.openwhisk.core.database.cosmosdb -import java.io.Closeable - import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient import com.typesafe.config.ConfigFactory import org.apache.openwhisk.common.Logging import org.apache.openwhisk.core.ConfigKeys @@ -34,10 +31,6 @@ import spray.json.RootJsonFormat import scala.reflect.ClassTag -case class ClientHolder(client: AsyncDocumentClient) extends Closeable { - override def close(): Unit = client.close() -} - object CosmosDBArtifactStoreProvider extends ArtifactStoreProvider { type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]() diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala index fa5f3b82ad3..ceb578bdb40 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala @@ -27,7 +27,6 @@ import akka.util.ByteString import com.microsoft.azure.cosmosdb._ import com.microsoft.azure.cosmosdb.internal.Constants.Properties import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient -import com.typesafe.config.ConfigFactory import kamon.metric.MeasurementUnit import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, Scheduler, TransactionId} import org.apache.openwhisk.core.database.StoreUtils._ @@ -37,7 +36,6 @@ import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._ import org.apache.openwhisk.core.entity.Attachments.Attached import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.http.Messages -import org.apache.openwhisk.spi.SpiLoader import spray.json._ import scala.collection.JavaConverters._ diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala index e8d75a08232..34b9a1066f5 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala @@ -17,11 +17,8 @@ package org.apache.openwhisk.core.database.cosmosdb -import java.io.Closeable - import akka.actor.ActorSystem import akka.stream.ActorMaterializer -import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient import com.typesafe.config.ConfigFactory import org.apache.openwhisk.common.Logging import org.apache.openwhisk.core.ConfigKeys @@ -34,10 +31,6 @@ import spray.json.RootJsonFormat import scala.reflect.ClassTag -case class ClientHolder(client: AsyncDocumentClient) extends Closeable { - override def close(): Unit = client.close() -} - object CosmosDBMigrationArtifactStoreProvider extends ArtifactStoreProvider { type DocumentClientRef = ReferenceCounted[ClientHolder]#CountedReference private val clients = collection.mutable.Map[CosmosDBConfig, ReferenceCounted[ClientHolder]]() @@ -63,7 +56,7 @@ object CosmosDBMigrationArtifactStoreProvider extends ArtifactStoreProvider { logging: Logging, materializer: ActorMaterializer): CosmosDBMigrationArtifactStore[D] = { - makeStoreForClient(config, createReference(config).reference(), attachmentStore) + makeStoreForClient(config, createReference(config).reference(), attachmentStore, None) } private def makeStoreForClient[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig, From 1334528dfe9894955d10ebe3a8fcb46659fa57d4 Mon Sep 17 00:00:00 2001 From: Andy Steed Date: Thu, 27 Aug 2020 22:38:25 -0700 Subject: [PATCH 4/4] Reworked to use explicit AttachmentStore for S3 and AZ instead of picking up from config --- .../core/database/ArtifactStoreProvider.scala | 11 ----- .../CosmosDBMigrationArtifactStore.scala | 47 +++++++++---------- ...smosDBMigrationArtifactStoreProvider.scala | 12 ++--- 3 files changed, 26 insertions(+), 44 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala index 78787f5bebd..d9622a40dcf 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreProvider.scala @@ -48,15 +48,4 @@ trait ArtifactStoreProvider extends Spi { None } } - - protected def getLegacyAttachmentStore[D <: DocumentSerializer: ClassTag]()( - implicit actorSystem: ActorSystem, - logging: Logging, - materializer: ActorMaterializer): Option[AttachmentStore] = { - if (ConfigFactory.load().hasPath("whisk.spi.LegacyAttachmentStoreProvider")) { - Some(SpiLoader.get[AttachmentStoreProvider].makeStore[D]()) - } else { - None - } - } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala index ceb578bdb40..f5bdfa81b36 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStore.scala @@ -30,6 +30,8 @@ import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient import kamon.metric.MeasurementUnit import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, Scheduler, TransactionId} import org.apache.openwhisk.core.database.StoreUtils._ +import org.apache.openwhisk.core.database.s3._ +import org.apache.openwhisk.core.database.azblob._ import org.apache.openwhisk.core.database._ import org.apache.openwhisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._ @@ -41,21 +43,20 @@ import spray.json._ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ +import scala.reflect.ClassTag import scala.util.Success -class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer]( +class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer: ClassTag]( protected val collName: String, protected val config: CosmosDBConfig, clientRef: DocumentClientRef, documentHandler: DocumentHandler, protected val viewMapper: CosmosDBViewMapper, - val inliningConfig: InliningConfig, - val attachmentStore: Option[AttachmentStore], - val legacyAttachmentStore: Option[AttachmentStore])(implicit system: ActorSystem, - val logging: Logging, - jsonFormat: RootJsonFormat[DocumentAbstraction], - val materializer: ActorMaterializer, - docReader: DocumentReader) + val inliningConfig: InliningConfig)(implicit system: ActorSystem, + val logging: Logging, + jsonFormat: RootJsonFormat[DocumentAbstraction], + val materializer: ActorMaterializer, + docReader: DocumentReader) extends ArtifactStore[DocumentAbstraction] with DefaultJsonProtocol with DocumentProvider @@ -63,7 +64,10 @@ class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer]( with AttachmentSupport[DocumentAbstraction] { private val cosmosScheme = "cosmos" - val attachmentScheme: String = attachmentStore.map(_.scheme).getOrElse(cosmosScheme) + private val attachmentS3Store = S3AttachmentStoreProvider.makeStore() + private val attachmentAzureStore = AzureBlobAttachmentStoreProvider.makeStore() + + val attachmentScheme: String = attachmentAzureStore.scheme protected val client: AsyncDocumentClient = clientRef.get.client private[cosmosdb] val (database, collection) = initialize() @@ -85,7 +89,7 @@ class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer]( logging.info( this, - s"Initializing CosmosDBArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " + + s"Initializing CosmosDBMigrationArtifactStore for collection [$collName]. Service endpoint [${client.getServiceEndpoint}], " + s"Read endpoint [${client.getReadEndpoint}], Write endpoint [${client.getWriteEndpoint}], Connection Policy [${client.getConnectionPolicy}], " + s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId [${config.clusterId}], soft delete TTL [${config.softDeleteTTL}], " + s"Consistency Level [${config.consistencyLevel}], Usage Metric Frequency [${config.recordUsageFrequency}]") @@ -421,13 +425,7 @@ class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer]( contentType: ContentType, docStream: Source[ByteString, _], oldAttachment: Option[Attached])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = { - attachmentStore match { - case Some(as) => - attachToExternalStore(doc, update, contentType, docStream, oldAttachment, as) - case None => - Future.failed(new IllegalArgumentException( - s" '$cosmosScheme' is now not supported. You must configure an external AttachmentStore for storing attachments")) - } + attachToExternalStore(doc, update, contentType, docStream, oldAttachment, attachmentAzureStore) } override protected[core] def readAttachment[T](doc: DocInfo, attached: Attached, sink: Sink[ByteString, Future[T]])( @@ -442,24 +440,23 @@ class CosmosDBMigrationArtifactStore[DocumentAbstraction <: DocumentSerializer]( //Compared to current approach of ':' Future.failed(new IllegalArgumentException( s" '$cosmosScheme' is now not supported. You must configure an external AttachmentStore for storing attachments")) - case s if attachmentStore.isDefined && attachmentStore.get.scheme == s => - attachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink) - case s if legacyAttachmentStore.isDefined && legacyAttachmentStore.get.scheme == s => - legacyAttachmentStore.get.readAttachment(doc.id, attachmentUri.path.toString, sink) + case s if attachmentS3Store.scheme == s => + attachmentS3Store.readAttachment(doc.id, attachmentUri.path.toString, sink) + case s if attachmentAzureStore.scheme == s => + attachmentAzureStore.readAttachment(doc.id, attachmentUri.path.toString, sink) case _ => throw new IllegalArgumentException(s"Unknown attachment scheme in attachment uri $attachmentUri") } } override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit transid: TransactionId): Future[Boolean] = - attachmentStore - .map(as => as.deleteAttachments(doc.id)) - .getOrElse(Future.successful(true)) // For CosmosDB it is expected that the entire document is deleted. + attachmentAzureStore.deleteAttachments(doc.id) override def shutdown(): Unit = { //Its async so a chance exist for next scheduled job to still trigger usageMetricRecorder.foreach(system.stop) - attachmentStore.foreach(_.shutdown()) + attachmentS3Store.shutdown() + attachmentAzureStore.shutdown() clientRef.close() } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala index 34b9a1066f5..651a6f2348d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBMigrationArtifactStoreProvider.scala @@ -45,7 +45,7 @@ object CosmosDBMigrationArtifactStoreProvider extends ArtifactStoreProvider { materializer: ActorMaterializer): ArtifactStore[D] = { val tag = implicitly[ClassTag[D]] val config = CosmosDBConfig(ConfigFactory.load(), tag.runtimeClass.getSimpleName) - makeStoreForClient(config, getOrCreateReference(config), getAttachmentStore(), getLegacyAttachmentStore()) + makeStoreForClient(config, getOrCreateReference(config)) } def makeArtifactStore[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig, @@ -56,13 +56,11 @@ object CosmosDBMigrationArtifactStoreProvider extends ArtifactStoreProvider { logging: Logging, materializer: ActorMaterializer): CosmosDBMigrationArtifactStore[D] = { - makeStoreForClient(config, createReference(config).reference(), attachmentStore, None) + makeStoreForClient(config, createReference(config).reference()) } private def makeStoreForClient[D <: DocumentSerializer: ClassTag](config: CosmosDBConfig, - clientRef: DocumentClientRef, - attachmentStore: Option[AttachmentStore], - legacyAttachmentStore: Option[AttachmentStore])( + clientRef: DocumentClientRef)( implicit jsonFormat: RootJsonFormat[D], docReader: DocumentReader, actorSystem: ActorSystem, @@ -78,9 +76,7 @@ object CosmosDBMigrationArtifactStoreProvider extends ArtifactStoreProvider { clientRef, handler, viewMapper, - loadConfigOrThrow[InliningConfig](ConfigKeys.db), - attachmentStore, - legacyAttachmentStore) + loadConfigOrThrow[InliningConfig](ConfigKeys.db)) } private def handlerAndMapper[D](entityType: ClassTag[D])(