Skip to content
This repository has been archived by the owner on Feb 1, 2022. It is now read-only.

Commit

Permalink
refactor(common, service-common): combine both modules to common
Browse files Browse the repository at this point in the history
- introduce Persistable, Entity, Wrapper
- use kotlinx.serialization for JSON
  • Loading branch information
JonasWanke committed Dec 17, 2019
1 parent 457ef4f commit f896a10
Show file tree
Hide file tree
Showing 45 changed files with 919 additions and 378 deletions.
14 changes: 10 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
buildscript {
ext.kotlin_version = '1.3.61'

repositories {
mavenCentral()
google()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath "org.jetbrains.kotlin:kotlin-serialization:$kotlin_version"
}
}

plugins {
id 'org.jetbrains.kotlin.jvm' version'1.3.50'
}
apply plugin: 'kotlin'

subprojects {
buildscript {
repositories {
jcenter()
mavenCentral()
}
dependencies {
classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:1.3.50")
classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version")
classpath("org.jetbrains.kotlin:kotlin-serialization:$kotlin_version")
classpath("com.github.jengelman.gradle.plugins:shadow:5.1.0")
}
}
Expand Down
12 changes: 11 additions & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
plugins {
id "idea"
id "kotlin"
id "application"
}
apply plugin: 'kotlinx-serialization'

dependencies {
api("org.jetbrains.kotlin:kotlin-stdlib:1.3.50")
api("org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version")
api("org.jetbrains.kotlin:kotlin-reflect:$kotlin_version")
api("org.jetbrains.kotlinx:kotlinx-serialization-runtime:0.14.0")

// API
api("com.google.protobuf:protobuf-java:3.6.1")
Expand All @@ -14,3 +18,9 @@ dependencies {
// Storage
api("com.couchbase.client:java-client:2.7.9")
}

compileKotlin {
kotlinOptions {
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
}
}
9 changes: 9 additions & 0 deletions common/src/main/kotlin/de/hpi/cloud/common/Context.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package de.hpi.cloud.common

import de.hpi.cloud.common.entity.Id
import java.util.*

data class Context(
val author: Id<Party>,
val languageRanges: List<Locale.LanguageRange>
)
17 changes: 0 additions & 17 deletions common/src/main/kotlin/de/hpi/cloud/common/Entity.kt

This file was deleted.

11 changes: 11 additions & 0 deletions common/src/main/kotlin/de/hpi/cloud/common/Party.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package de.hpi.cloud.common

import de.hpi.cloud.common.entity.Entity
import kotlinx.serialization.Serializable

@Serializable
data class Party(
val name: String
) : Entity<Party>() {
companion object : Entity.Companion<Party>("party")
}
10 changes: 10 additions & 0 deletions common/src/main/kotlin/de/hpi/cloud/common/Persistable.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.hpi.cloud.common

import com.google.protobuf.GeneratedMessageV3

abstract class Persistable<P : Persistable<P>> {
interface ProtoSerializer<P : Persistable<P>, Proto : GeneratedMessageV3> {
fun fromProto(proto: Proto, context: Context): P
fun toProto(persistable: P, context: Context): Proto
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package de.hpi.cloud.common
import com.couchbase.client.java.Bucket
import com.couchbase.client.java.CouchbaseCluster
import com.google.protobuf.GeneratedMessageV3
import de.hpi.cloud.common.utils.couchbase.openCouchbase
import de.hpi.cloud.common.couchbase.openCouchbase
import de.hpi.cloud.common.entity.Id
import de.hpi.cloud.common.grpc.preferredLocales
import de.hpi.cloud.common.utils.removeFirst
import io.grpc.*

Expand All @@ -17,9 +19,17 @@ class Service<S : BindableService>(
const val PORT_DEFAULT = 50051
const val PORT_VARIABLE = "HPI_CLOUD_PORT"

private val requestMetadata = mutableListOf<RequestWithMetadata<GeneratedMessageV3>>()
fun metadataForRequest(request: GeneratedMessageV3): Metadata? =
requestMetadata.firstOrNull { it.request === request }?.metadata
private val requestMetadata = mutableListOf<RequestWithMetadata>()
fun contextForRequest(request: Any): Context? {
return requestMetadata.firstOrNull { it.request === request }
?.metadata
?.let {
Context(
author = Id("0"),
languageRanges = it.preferredLocales
)
}
}
}

private val server: Server
Expand Down Expand Up @@ -55,7 +65,12 @@ class Service<S : BindableService>(
require(message is GeneratedMessageV3)

request = message
requestMetadata.add(RequestWithMetadata(message, headers))
requestMetadata.add(
RequestWithMetadata(
message,
headers
)
)
super.onMessage(message)
}

Expand Down Expand Up @@ -86,7 +101,7 @@ class Service<S : BindableService>(
}

fun stop() {
if (isStopped) throw IllegalStateException("$name is already stopped")
check(!isStopped) { "$name is already stopped" }
isStopped = true

println("Stopping $name")
Expand All @@ -105,8 +120,8 @@ class Service<S : BindableService>(
server.awaitTermination()
}

data class RequestWithMetadata<ReqT : GeneratedMessageV3>(
val request: ReqT,
data class RequestWithMetadata(
val request: Any,
val metadata: Metadata?
)
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package de.hpi.cloud.common.utils.couchbase
package de.hpi.cloud.common.couchbase

const val KEY_TYPE = "type"
const val KEY_VERSION = "version"
const val KEY_ID = "id"
const val KEY_METADATA = "meta"
const val KEY_METADATA_CREATED_AT = "createdAt"
const val KEY_VALUE = "value"

const val NESTED_SEPARATOR = "."

fun devDesignDoc(designDoc: String) = "dev_$designDoc"
const val VIEW_BY_ID = "byId"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package de.hpi.cloud.common.utils.couchbase
package de.hpi.cloud.common.couchbase

import com.couchbase.client.java.Bucket
import com.couchbase.client.java.CouchbaseAsyncCluster
Expand Down
29 changes: 29 additions & 0 deletions common/src/main/kotlin/de/hpi/cloud/common/couchbase/Document.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package de.hpi.cloud.common.couchbase

import com.couchbase.client.java.AsyncBucket
import com.couchbase.client.java.Bucket
import com.couchbase.client.java.document.RawJsonDocument
import de.hpi.cloud.common.entity.Entity
import de.hpi.cloud.common.entity.Id
import de.hpi.cloud.common.entity.Wrapper
import de.hpi.cloud.common.entity.entityCompanion
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonConfiguration
import rx.Observable

val json = Json(JsonConfiguration.Stable)

inline fun <reified E : Entity<E>> RawJsonDocument.parseWrapper(): Wrapper<E> {
return json.parse(Wrapper.jsonSerializerFor<E>(), content())
}

inline fun <reified E : Entity<E>> Bucket.get(id: Id<E>): Wrapper<E>? {
val docId = Wrapper.createDocumentId(E::class.entityCompanion().type, id)
return get(docId, RawJsonDocument::class.java)?.parseWrapper() ?: return null
}

inline fun <reified E : Entity<E>> AsyncBucket.get(id: Id<E>): Observable<Wrapper<E>> {
val docId = Wrapper.createDocumentId(E::class.entityCompanion().type, id)
return get(docId, RawJsonDocument::class.java)
.map { it.parseWrapper<E>() }
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package de.hpi.cloud.common.utils.couchbase
package de.hpi.cloud.common.couchbase

import com.couchbase.client.java.query.dsl.Expression
import com.couchbase.client.java.query.dsl.Expression.*
import com.couchbase.client.java.query.dsl.Sort
import com.couchbase.client.java.query.dsl.Sort.asc
import com.couchbase.client.java.query.dsl.Sort.desc
import de.hpi.cloud.common.utils.protobuf.TIMESTAMP_MILLIS
import de.hpi.cloud.common.utils.protobuf.TIMESTAMP_NANOS
import de.hpi.cloud.common.types.LocalDateTime

fun and(vararg expressions: Expression?): Expression {
return expressions.filterNotNull().run {
Expand All @@ -16,18 +15,24 @@ fun and(vararg expressions: Expression?): Expression {
}

fun ascTimestamp(field: Expression): Array<Sort> {
return arrayOf(asc("$field.$TIMESTAMP_MILLIS"), asc("$field.$TIMESTAMP_NANOS"))
return arrayOf(
asc("$field.${LocalDateTime.JsonSerializer.KEY_MILLIS}"),
asc("$field.${LocalDateTime.JsonSerializer.KEY_NANOS}")
)
}

fun descTimestamp(field: Expression): Array<Sort> {
return arrayOf(desc("$field.$TIMESTAMP_MILLIS"), desc("$field.$TIMESTAMP_NANOS"))
return arrayOf(
desc("$field.${LocalDateTime.JsonSerializer.KEY_MILLIS}"),
desc("$field.${LocalDateTime.JsonSerializer.KEY_NANOS}")
)
}

/**
* Builds an expression for a *nested* field with proper escaping.
*/
fun n(vararg part: String): Expression {
return x(part.joinToString(NESTED_SEPARATOR.toString()) { i(it).toString() })
return x(part.joinToString(NESTED_SEPARATOR) { i(it).toString() })
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
package de.hpi.cloud.common.utils.couchbase
package de.hpi.cloud.common.couchbase

import com.couchbase.client.java.Bucket
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.document.RawJsonDocument
import com.couchbase.client.java.query.N1qlParams
import com.couchbase.client.java.query.N1qlQuery
import com.couchbase.client.java.query.Select.select
import com.couchbase.client.java.query.dsl.functions.MetaFunctions.meta
import com.couchbase.client.java.query.dsl.path.AsPath
import com.couchbase.client.java.query.dsl.path.LimitPath
import com.couchbase.client.java.view.ViewQuery
import de.hpi.cloud.common.utils.grpc.throwException
import de.hpi.cloud.common.entity.Entity
import de.hpi.cloud.common.entity.Id
import de.hpi.cloud.common.entity.Wrapper
import de.hpi.cloud.common.grpc.throwException
import de.hpi.cloud.common.utils.thenTake
import io.grpc.Status
import rx.Observable


private const val PAGINATION_TOKEN_SEPARATOR = ";"
private const val PAGINATION_TOKEN_VARIANT_1 = "1" // "<startKey>;<startKeyDocId>"
private const val PAGINATION_TOKEN_VARIANT_2 = "2" // "<offset>"
const val PAGINATION_TOKEN_SEPARATOR = ";"
const val PAGINATION_TOKEN_VARIANT_1 = "1" // "<startKey>;<startKeyDocId>"
const val PAGINATION_TOKEN_VARIANT_2 = "2" // "<offset>"

fun <T : Any> ViewQuery.paginate(
const val FIELD_ID = "id"

inline fun <reified E : Entity<E>> ViewQuery.paginate(
bucket: Bucket,
reqPageSize: Int,
reqPageToken: String,
defaultPageSize: Int = 20,
maxPageSize: Int = 100,
mapper: ((JsonObject) -> T?)
): Pair<Collection<T>, String> {
maxPageSize: Int = 100
): Pair<Collection<Wrapper<E>>, String> {
val pageSize = getPageSize(reqPageSize, defaultPageSize, maxPageSize)

// Apply pagination to query
Expand All @@ -39,24 +45,23 @@ fun <T : Any> ViewQuery.paginate(
// Execute query
val items = execute(bucket).allRows()

val objects = items.take(pageSize)
.mapNotNull { mapper(it.document().content()) }
val entities = items.take(pageSize)
.map { it.document(RawJsonDocument::class.java).parseWrapper<E>() }
val nextPageToken = (items.size == pageSize + 1)
.thenTake { items.last() }
?.let { PAGINATION_TOKEN_VARIANT_1 + PAGINATION_TOKEN_SEPARATOR + it.key().toString() + PAGINATION_TOKEN_SEPARATOR + it.id() }
?: "" // Empty string makes building the protobuf easier
return objects to nextPageToken
return entities to nextPageToken
}

fun <T : Any> paginate(
inline fun <reified E : Entity<E>> paginate(
bucket: Bucket,
queryBuilder: AsPath.() -> LimitPath,
reqPageSize: Int,
reqPageToken: String,
defaultPageSize: Int = 20,
maxPageSize: Int = 100,
mapper: ((JsonObject) -> T?)
): Pair<Collection<T>, String> {
maxPageSize: Int = 100
): Pair<Collection<Wrapper<E>>, String> {
val pageSize = getPageSize(reqPageSize, defaultPageSize, maxPageSize)
val off = (reqPageToken.isNotBlank()).thenTake {
if (!reqPageToken.startsWith(PAGINATION_TOKEN_VARIANT_2)) invalidPaginationToken()
Expand All @@ -65,23 +70,27 @@ fun <T : Any> paginate(
} ?: 0

// Build query
val query = select("*").from(bucket.name()).queryBuilder()
val query = select(meta(bucket.name())[FIELD_ID]).from(bucket.name()).queryBuilder()
.run { limit(pageSize + 1) }
.run { offset(off) }
.let { N1qlQuery.simple(it, N1qlParams.build().adhoc(false)) }

// Execute query
val items = bucket.query(query).allRows()

val objects = items.take(pageSize)
.mapNotNull { mapper(it.value().getObject(bucket.name())) }
val ids = items.take(pageSize)
.map { it.value().getString(FIELD_ID) }
val entities = Observable.from(ids)
.flatMap { bucket.async().get<E>(Id(it)) }
.toList()
.toBlocking()
.single()
val nextPageToken = (items.size == pageSize + 1)
.thenTake { PAGINATION_TOKEN_VARIANT_2 + PAGINATION_TOKEN_SEPARATOR + (off + pageSize) }
?: "" // Empty string makes building the protobuf easier
return objects to nextPageToken
return entities to nextPageToken
}

private fun getPageSize(
fun getPageSize(
reqPageSize: Int,
defaultPageSize: Int = 20,
maxPageSize: Int = 100
Expand All @@ -97,6 +106,6 @@ private fun getPageSize(
}
}

private fun invalidPaginationToken(): Nothing {
fun invalidPaginationToken(): Nothing {
Status.INVALID_ARGUMENT.throwException("Invalid pagination token")
}
Loading

0 comments on commit f896a10

Please sign in to comment.