Skip to content

Commit 78bd48b

Browse files
committed
Added concurrency-safety for multi-operation transactions
1 parent 8cdd67f commit 78bd48b

File tree

4 files changed

+146
-18
lines changed

4 files changed

+146
-18
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ The adapter is subject to the fundamental [limitations](https://apple.github.io/
6868
Also, there are non-FoundationDB-specific limitations:
6969
7070
- All write operations update "last update seq" and "doc count" keys, so concurrent write transactions are effectively guaranteed to conflict and wouldn't benefit from concurrency. This can also affect read operations, eg if they need to update views.
71-
- Concurrent PouchDB actions within a single transaction are not isolated. Doing any write operations in parallel with other operations results in an undefined behavior.
7271
7372
Some of these limitations could be easily solved. If these limitations are a significant problem for you, please create an issue.
7473

lib/adapter.js

Lines changed: 131 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import assert from 'assert';
44
import crypto from 'crypto';
5+
import Deque from 'double-ended-queue';
56
import { clone, filterChange, uuid } from 'pouchdb-utils';
67
import {
78
collectConflicts,
@@ -131,13 +132,90 @@ function callbackify(fn) {
131132
};
132133
}
133134

135+
/**
136+
* @param {fdb.Transaction} db
137+
* @param {TransactionState} state
138+
*/
139+
async function executeQueue(db, state) {
140+
const { queue } = state;
141+
142+
while (queue.length > 0) {
143+
const task = /** @type {Task} */(queue.peekFront());
144+
145+
if (!task.write) {
146+
const readTasks = [task];
147+
148+
for (let i = 1; i < queue.length; i++) {
149+
const nextTask = /** @type {Task} */(queue.get(i));
150+
151+
if (nextTask.write) {
152+
break;
153+
}
154+
155+
readTasks.push(nextTask);
156+
}
157+
158+
await Promise.all(readTasks.map(
159+
({ fn, resolve, reject }) => fn(db).then(resolve, reject)
160+
));
161+
162+
readTasks.forEach(() => {
163+
queue.shift();
164+
});
165+
} else {
166+
const { fn, resolve, reject } = task;
167+
168+
await fn(db).then(resolve, reject);
169+
170+
queue.shift();
171+
}
172+
}
173+
}
174+
134175
/**
135176
* @typedef {(
136177
* | fdb.Database
137178
* | fdb.Transaction
138179
* )} Actionable
180+
* @typedef {{ queue: Deque<Task> }} TransactionState
181+
* @typedef {{
182+
* fn: (tn: fdb.Transaction) => Promise<unknown>,
183+
* resolve: (value: unknown) => void,
184+
* reject: (reason?: any) => void,
185+
* write: boolean
186+
* }} Task
139187
*/
140188

189+
/**
190+
* A quasi-global state needed for locks for when concurrent PouchDB operations are made with the
191+
* same transaction. The key is the hidden context object of the transaction.
192+
* @type {WeakMap<fdb.Transaction, Record<string, TransactionState>>}
193+
*/
194+
const transactionState = new WeakMap();
195+
196+
/**
197+
* @param {fdb.Transaction} tn
198+
* @param {string} name
199+
* @returns {TransactionState}
200+
*/
201+
function getTransactionState(tn, name) {
202+
let state = transactionState.get(/** @type {any} */(tn)._ctx);
203+
204+
if (state == null) {
205+
state = {};
206+
207+
transactionState.set(/** @type {any} */(tn)._ctx, state);
208+
}
209+
210+
if (!(name in state)) {
211+
state[name] = {
212+
queue: new Deque()
213+
};
214+
}
215+
216+
return state[name];
217+
}
218+
141219
/**
142220
* @param {any} api
143221
* @param {{ db: Actionable, name: string, revs_limit?: number }} opts
@@ -148,12 +226,46 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
148226
* @param {(tn: fdb.Transaction) => Promise<T>} fn
149227
* @returns {Promise<T>}
150228
*/
151-
function doTn(fn) {
229+
function doReadTn(fn) {
230+
return doTn(fn, false);
231+
}
232+
233+
/**
234+
* @template T
235+
* @param {(tn: fdb.Transaction) => Promise<T>} fn
236+
* @returns {Promise<T>}
237+
*/
238+
function doWriteTn(fn) {
239+
return doTn(fn, true);
240+
}
241+
242+
/**
243+
* @template T
244+
* @param {(tn: fdb.Transaction) => Promise<T>} fn
245+
* @param {boolean} write
246+
* @returns {Promise<T>}
247+
*/
248+
function doTn(fn, write) {
152249
if ('doTn' in db) {
153250
return db.doTn(fn);
154251
}
155252

156-
return fn(db);
253+
const state = getTransactionState(db, name);
254+
255+
const promise = new Promise((resolve, reject) => {
256+
state.queue.push({
257+
fn,
258+
write,
259+
resolve,
260+
reject
261+
});
262+
});
263+
264+
if (state.queue.length === 1) {
265+
executeQueue(db, state);
266+
}
267+
268+
return promise;
157269
}
158270

159271
const subspace = db.subspace.at(fdb.tuple.pack(name), defaultTransformer, defaultTransformer);
@@ -187,9 +299,11 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
187299

188300
api._id = callbackify(
189301
/**
302+
* TODO: That is concurrency-safe with most other read operations and, in most cases, with
303+
* itself so it might not be necessary registeer it as a write transaction.
190304
* @returns {Promise<string>}
191305
*/
192-
() => doTn(async tn => {
306+
() => doWriteTn(async tn => {
193307
let instanceId = await /** @type {Promise<undefined | string>} */(
194308
tn.at(stores.metaStore).get(UUID_KEY)
195309
);
@@ -208,7 +322,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
208322
/**
209323
* @returns {Promise<{ doc_count: number, update_seq: number }>}
210324
*/
211-
async () => doTn(async tn => {
325+
async () => doReadTn(async tn => {
212326
const [docCount = 0, updateSeq = 0] = await Promise.all([
213327
/** @type {Promise<undefined | number>} */(tn.at(stores.metaStore).get(DOC_COUNT_KEY)),
214328
/** @type {Promise<undefined | number>} */(tn.at(stores.metaStore).get(UPDATE_SEQ_KEY))
@@ -226,7 +340,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
226340
* @param {import('./types.js').Id} id
227341
* @param {{ rev?: import('./types.js').Rev, latest?: boolean }} param
228342
*/
229-
(id, { rev, latest } = {}) => doTn(async tn => {
343+
(id, { rev, latest } = {}) => doReadTn(async tn => {
230344
const metadata = await tn.at(stores.docStore).get(id);
231345

232346
if (metadata == null) {
@@ -278,7 +392,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
278392
* @param {{ binary?: boolean }} param3
279393
* @returns {Promise<Buffer | string>}
280394
*/
281-
(docId, attachId, attachment, { binary } = {}) => doTn(async tn => {
395+
(docId, attachId, attachment, { binary } = {}) => doReadTn(async tn => {
282396
const { digest } = attachment;
283397

284398
const attach = await tn.at(stores.binaryStore).get(digest) ?? Buffer.allocUnsafe(0);
@@ -298,7 +412,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
298412
* @param {{ new_edits: boolean }} opts
299413
* @returns {Promise<import('./types.js').BulkDocsResultRow[]>}
300414
*/
301-
({ docs: userDocs }, opts) => doTn(async tn => {
415+
({ docs: userDocs }, opts) => doWriteTn(async tn => {
302416
const docInfos = userDocs.map(doc => {
303417
if (isLocalDoc(doc)) {
304418
return doc;
@@ -561,7 +675,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
561675
* }} opts
562676
* @returns {Promise<import('./types.js').AllDocsResult>}
563677
*/
564-
opts => doTn(async tn => {
678+
opts => doReadTn(async tn => {
565679
/** @type {[import('./types.js').AllDocsResultRow[], number, number]} */
566680
const [rows, docCount = 0, updateSeq = 0] = await Promise.all([
567681
getAllDocsRows(opts, tn),
@@ -876,7 +990,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
876990

877991
for (;;) {
878992
// eslint-disable-next-line no-loop-func
879-
const { it, watch } = await doTn(async tn => {
993+
const { it, watch } = await doReadTn(async tn => {
880994
const it = await tn.at(stores.bySeqStore).getRangeAll(
881995
lastSeq,
882996
Infinity,
@@ -909,7 +1023,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
9091023

9101024
if (!metadata) {
9111025
metadata = /** @type {import('./types.js').Metadata} */(
912-
await doTn(tn => tn.at(stores.docStore).get(doc._id))
1026+
await doReadTn(tn => tn.at(stores.docStore).get(doc._id))
9131027
);
9141028

9151029
if (isLocalId(metadata.id)) {
@@ -930,7 +1044,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
9301044

9311045
const winningDoc = winningRev === doc._rev
9321046
? doc
933-
: await doTn(tn => tn.at(stores.bySeqStore).get(metadata.rev_map[winningRev]));
1047+
: await doReadTn(tn => tn.at(stores.bySeqStore).get(metadata.rev_map[winningRev]));
9341048

9351049
assert(winningDoc);
9361050

@@ -952,7 +1066,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
9521066

9531067
// fetch attachment immediately for the benefit
9541068
// of live listeners
955-
change.doc._attachments = await doTn(async tn => Object.fromEntries(
1069+
change.doc._attachments = await doReadTn(async tn => Object.fromEntries(
9561070
await Promise.all(Object.entries(attachments).map(
9571071
async ([fileName, att]) => [
9581072
fileName,
@@ -1018,7 +1132,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
10181132
* @returns {Promise<import('./types.js').RevTreePath[]>}
10191133
*/
10201134
async docId => {
1021-
const metadata = await doTn(tn => tn.at(stores.docStore).get(docId));
1135+
const metadata = await doReadTn(tn => tn.at(stores.docStore).get(docId));
10221136

10231137
if (metadata == null) {
10241138
throw createError(MISSING_DOC);
@@ -1033,7 +1147,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
10331147
* @param {import('./types.js').Id} docId
10341148
* @param {import('./types.js').Rev[]} revs
10351149
*/
1036-
(docId, revs) => doTn(tn => doCompaction(docId, revs, tn))
1150+
(docId, revs) => doWriteTn(tn => doCompaction(docId, revs, tn))
10371151
);
10381152

10391153
/**
@@ -1129,7 +1243,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
11291243
* @returns {Promise<import('./types.js').LocalDoc>}
11301244
*/
11311245
async id => {
1132-
const value = await doTn(tn => tn.at(stores.localStore).get(id));
1246+
const value = await doReadTn(tn => tn.at(stores.localStore).get(id));
11331247

11341248
if (value == null) {
11351249
throw createError(MISSING_DOC);
@@ -1147,7 +1261,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
11471261
*/
11481262
(doc, opts) => opts?.ctx
11491263
? putLocal(doc, opts.ctx)
1150-
: doTn(tn => putLocal(doc, tn))
1264+
: doWriteTn(tn => putLocal(doc, tn))
11511265
);
11521266

11531267
/**
@@ -1187,7 +1301,7 @@ export default function FoundationdbAdapter(api, { db, name, revs_limit: revLimi
11871301
*/
11881302
(doc, opts) => opts?.ctx
11891303
? removeLocal(doc, opts.ctx)
1190-
: doTn(tn => removeLocal(doc, tn))
1304+
: doWriteTn(tn => removeLocal(doc, tn))
11911305
);
11921306

11931307
/**

package-lock.json

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
"lint": "eslint lib test eslint.config.js"
1717
},
1818
"dependencies": {
19+
"double-ended-queue": "^2.1.0-0",
1920
"foundationdb": "^2.0.1",
2021
"pouchdb-errors": "^8.0.1",
2122
"pouchdb-merge": "^8.0.1",
2223
"pouchdb-utils": "^8.0.1"
2324
},
2425
"devDependencies": {
2526
"@arbendium/eslint-config-base": "^0.2.1",
27+
"@types/double-ended-queue": "^2.1.7",
2628
"@types/node": "^20.12.7",
2729
"eslint": "^8.54.0",
2830
"express": "^4.19.2",

0 commit comments

Comments
 (0)