Skip to content

Commit

Permalink
lazily order writes during a transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
wernst committed Dec 21, 2023
1 parent eb4adb9 commit af59c89
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 12 deletions.
22 changes: 19 additions & 3 deletions src/database/async/AsyncReactivityTracker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { maybePromiseAll } from "../../helpers/maybeWaitForPromises"
import { Bounds } from "../../helpers/sortedTupleArray"
import { ScanStorageArgs, WriteOps } from "../../storage/types"
import {
KeyValuePair,
ScanStorageArgs,
Tuple,
WriteOps,
} from "../../storage/types"
import { TxId } from "../types"
import { AsyncCallback } from "./asyncTypes"
import * as SortedTupleValue from "../../helpers/sortedTupleValuePairs"
Expand Down Expand Up @@ -42,8 +47,19 @@ function getReactivityEmits(listenersDb: Listeners, writes: WriteOps) {
const emits: ReactivityEmits = new Map()

for (const [callback, bounds] of listenersDb) {
const matchingWrites = SortedTupleValue.scan(writes.set || [], bounds)
const matchingRemoves = SortedTuple.scan(writes.remove || [], bounds)
const matchingWrites: KeyValuePair[] = []
const matchingRemoves: Tuple[] = []
// Found it to be slightly faster to not assume this is sorted and check bounds individually instead of using scan(writes.set, bounds)
for (const kv of writes.set || []) {
if (SortedTuple.isTupleWithinBounds(kv.key, bounds)) {
matchingWrites.push(kv)
}
}
for (const tuple of writes.remove || []) {
if (SortedTuple.isTupleWithinBounds(tuple, bounds)) {
matchingRemoves.push(tuple)
}
}
if (matchingWrites.length > 0 || matchingRemoves.length > 0) {
emits.set(callback, { set: matchingWrites, remove: matchingRemoves })
}
Expand Down
35 changes: 33 additions & 2 deletions src/database/async/AsyncTupleDatabaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
removePrefixFromTupleValuePairs,
removePrefixFromWriteOps,
} from "../../helpers/subspaceHelpers"
import { compareTuple } from "../../main"
import { KeyValuePair, Tuple, WriteOps } from "../../storage/types"
import { TupleDatabaseApi } from "../sync/types"
import {
Expand Down Expand Up @@ -129,6 +130,29 @@ export class AsyncTupleRootTransaction<S extends KeyValuePair>
canceled = false
writes: Required<WriteOps<S>>

// Track whether writes are dirty and need to be sorted prior to reading
private setsDirty = false
private removesDirty = false

private cleanWrites() {
this.cleanSets()
this.cleanRemoves()
}

private cleanSets() {
if (this.setsDirty) {
this.writes.set = this.writes.set.sort(tv.compareTupleValuePair)
this.setsDirty = false
}
}

private cleanRemoves() {
if (this.removesDirty) {
this.writes.remove = this.writes.remove.sort(compareTuple)
this.removesDirty = false
}
}

private checkActive() {
if (this.committed) throw new Error("Transaction already committed")
if (this.canceled) throw new Error("Transaction already canceled")
Expand All @@ -138,6 +162,7 @@ export class AsyncTupleRootTransaction<S extends KeyValuePair>
args: ScanArgs<T, P> = {}
): Promise<FilterTupleValuePairByPrefix<S, P>[]> {
this.checkActive()
this.cleanWrites()

const { limit: resultLimit, ...scanArgs } = normalizeSubspaceScanArgs(
this.subspacePrefix,
Expand Down Expand Up @@ -179,6 +204,7 @@ export class AsyncTupleRootTransaction<S extends KeyValuePair>
tuple: T
): Promise<ValueForTuple<S, T> | undefined> {
this.checkActive()
this.cleanWrites()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)

if (tv.exists(this.writes.set, fullTuple)) {
Expand All @@ -200,6 +226,7 @@ export class AsyncTupleRootTransaction<S extends KeyValuePair>

async exists<T extends S["key"]>(tuple: T): Promise<boolean> {
this.checkActive()
this.cleanWrites()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)

if (tv.exists(this.writes.set, fullTuple)) {
Expand All @@ -222,17 +249,21 @@ export class AsyncTupleRootTransaction<S extends KeyValuePair>
value: T["value"]
): AsyncTupleRootTransactionApi<S> {
this.checkActive()
this.cleanRemoves()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)
t.remove(this.writes.remove, fullTuple)
tv.set(this.writes.set, fullTuple, value)
this.writes.set.push({ key: fullTuple, value: value } as S)
this.setsDirty = true
return this
}

remove(tuple: S["key"]): AsyncTupleRootTransactionApi<S> {
this.checkActive()
this.cleanSets()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)
tv.remove(this.writes.set, fullTuple)
t.set(this.writes.remove, fullTuple)
this.writes.remove.push(fullTuple)
this.removesDirty = true
return this
}

Expand Down
23 changes: 19 additions & 4 deletions src/database/sync/ReactivityTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ type Identity<T> = T
import { maybePromiseAll } from "../../helpers/maybeWaitForPromises"
import * as SortedTuple from "../../helpers/sortedTupleArray"
import { Bounds } from "../../helpers/sortedTupleArray"
import * as SortedTupleValue from "../../helpers/sortedTupleValuePairs"
import { ScanStorageArgs, WriteOps } from "../../storage/types"
import {
KeyValuePair,
ScanStorageArgs,
Tuple,
WriteOps,
} from "../../storage/types"
import { TxId } from "../types"
import { Callback } from "./types"

Expand Down Expand Up @@ -50,8 +54,19 @@ function getReactivityEmits(listenersDb: Listeners, writes: WriteOps) {
const emits: ReactivityEmits = new Map()

for (const [callback, bounds] of listenersDb) {
const matchingWrites = SortedTupleValue.scan(writes.set || [], bounds)
const matchingRemoves = SortedTuple.scan(writes.remove || [], bounds)
const matchingWrites: KeyValuePair[] = []
const matchingRemoves: Tuple[] = []
// Found it to be slightly faster to not assume this is sorted and check bounds individually instead of using scan(writes.set, bounds)
for (const kv of writes.set || []) {
if (SortedTuple.isTupleWithinBounds(kv.key, bounds)) {
matchingWrites.push(kv)
}
}
for (const tuple of writes.remove || []) {
if (SortedTuple.isTupleWithinBounds(tuple, bounds)) {
matchingRemoves.push(tuple)
}
}
if (matchingWrites.length > 0 || matchingRemoves.length > 0) {
emits.set(callback, { set: matchingWrites, remove: matchingRemoves })
}
Expand Down
35 changes: 33 additions & 2 deletions src/database/sync/TupleDatabaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
removePrefixFromTupleValuePairs,
removePrefixFromWriteOps,
} from "../../helpers/subspaceHelpers"
import { compareTuple } from "../../main"
import { KeyValuePair, Tuple, WriteOps } from "../../storage/types"
import { TupleDatabaseApi } from "../sync/types"
import {
Expand Down Expand Up @@ -131,6 +132,29 @@ export class TupleRootTransaction<S extends KeyValuePair>
canceled = false
writes: Required<WriteOps<S>>

// Track whether writes are dirty and need to be sorted prior to reading
private setsDirty = false
private removesDirty = false

private cleanWrites() {
this.cleanSets()
this.cleanRemoves()
}

private cleanSets() {
if (this.setsDirty) {
this.writes.set = this.writes.set.sort(tv.compareTupleValuePair)
this.setsDirty = false
}
}

private cleanRemoves() {
if (this.removesDirty) {
this.writes.remove = this.writes.remove.sort(compareTuple)
this.removesDirty = false
}
}

private checkActive() {
if (this.committed) throw new Error("Transaction already committed")
if (this.canceled) throw new Error("Transaction already canceled")
Expand All @@ -140,6 +164,7 @@ export class TupleRootTransaction<S extends KeyValuePair>
args: ScanArgs<T, P> = {}
): Identity<FilterTupleValuePairByPrefix<S, P>[]> {
this.checkActive()
this.cleanWrites()

const { limit: resultLimit, ...scanArgs } = normalizeSubspaceScanArgs(
this.subspacePrefix,
Expand Down Expand Up @@ -179,6 +204,7 @@ export class TupleRootTransaction<S extends KeyValuePair>

get<T extends S["key"]>(tuple: T): Identity<ValueForTuple<S, T> | undefined> {
this.checkActive()
this.cleanWrites()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)

if (tv.exists(this.writes.set, fullTuple)) {
Expand All @@ -197,6 +223,7 @@ export class TupleRootTransaction<S extends KeyValuePair>

exists<T extends S["key"]>(tuple: T): Identity<boolean> {
this.checkActive()
this.cleanWrites()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)

if (tv.exists(this.writes.set, fullTuple)) {
Expand All @@ -216,17 +243,21 @@ export class TupleRootTransaction<S extends KeyValuePair>
value: T["value"]
): TupleRootTransactionApi<S> {
this.checkActive()
this.cleanRemoves()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)
t.remove(this.writes.remove, fullTuple)
tv.set(this.writes.set, fullTuple, value)
this.writes.set.push({ key: fullTuple, value: value } as S)
this.setsDirty = true
return this
}

remove(tuple: S["key"]): TupleRootTransactionApi<S> {
this.checkActive()
this.cleanSets()
const fullTuple = prependPrefixToTuple(this.subspacePrefix, tuple)
tv.remove(this.writes.set, fullTuple)
t.set(this.writes.remove, fullTuple)
this.writes.remove.push(fullTuple)
this.removesDirty = true
return this
}

Expand Down
2 changes: 1 addition & 1 deletion src/helpers/sortedTupleValuePairs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { compareTuple } from "./compareTuple"
import * as sortedList from "./sortedList"
import { normalizeTupleBounds } from "./sortedTupleArray"

function compareTupleValuePair(a: KeyValuePair, b: KeyValuePair) {
export function compareTupleValuePair(a: KeyValuePair, b: KeyValuePair) {
return compareTuple(a.key, b.key)
}

Expand Down

0 comments on commit af59c89

Please sign in to comment.