diff --git a/.aegir.js b/.aegir.js index 9e9b7f6..3afc327 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,3 +1,3 @@ module.exports = { - bundlesize: { maxSize: '12.1kB' } + bundlesize: { maxSize: '16kB' } } diff --git a/package.json b/package.json index 55da5e9..932894a 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,8 @@ "dependencies": { "buffer": "^5.5.0", "idb": "^5.0.2", - "interface-datastore": "^1.0.2" + "interface-datastore": "ipfs/interface-datastore#test/add-tests-for-mutating-datastore-during-query", + "p-queue": "^6.4.0" }, "devDependencies": { "aegir": "^22.0.0", diff --git a/src/index.js b/src/index.js index fadb011..2b57162 100644 --- a/src/index.js +++ b/src/index.js @@ -4,6 +4,7 @@ const { Buffer } = require('buffer') const { openDB, deleteDB } = require('idb') const { Key, Errors, utils, Adapter } = require('interface-datastore') const { filter, sortAll } = utils +const { default: PQueue } = require('p-queue') const isStrictTypedArray = (arr) => { return ( @@ -43,30 +44,44 @@ const str2ab = (str) => { return buf } -const queryIt = async function * (q, store, location) { - const range = q.prefix ? self.IDBKeyRange.bound(str2ab(q.prefix), str2ab(q.prefix + '\xFF'), false, true) : undefined - let cursor = await store.transaction(location).store.openCursor(range) - let limit = 0 +class TaskQueue { + constructor (store) { + this._defaultStore = store + this._tasks = [] + } + + push (task) { + let ok + let fail + + this._tasks.push(async (store) => { + try { + ok(await task(store)) + } catch (err) { + if (err.message.includes('finished')) { + try { + return ok(await task(this._defaultStore)) + } catch (err) { + return fail(err) + } + } + + fail(err) + } + }) - if (cursor && q.offset && q.offset > 0) { - cursor = await cursor.advance(q.offset) + return new Promise((resolve, reject) => { + ok = resolve + fail = reject + }) } - while (cursor) { - // limit - if (q.limit !== undefined && q.limit === limit) { - return - } - limit++ + async drain (store) { + while (this._tasks.length) { + const task = this._tasks.shift() - const key = new Key(Buffer.from(cursor.key)) - if (q.keysOnly) { - yield { key } - } else { - const value = Buffer.from(cursor.value) - yield { key, value } + await task(store || this._defaultStore) } - cursor = await cursor.continue() } } @@ -76,8 +91,108 @@ class IdbDatastore extends Adapter { this.store = null this.options = options - this.location = options.prefix + location + this.location = (options.prefix || '') + location this.version = options.version || 1 + + this.transactionQueue = new PQueue({ concurrency: 1 }) + this._lastTransactionFinished = null + } + + _getStore () { + if (this.store === null) { + throw new Error('Datastore needs to be opened.') + } + + return this.transactionQueue.add(async () => { + await this._lastTransactionFinished + + let next + + this._lastTransactionFinished = new Promise(resolve => { + next = () => { + // not using this transaction any more + this._tx = null + resolve() + } + }) + + this._tx = this.store.transaction('readwrite') + this._tx.oncomplete = () => { + if (this._tx) { + this._tx.active = false + } + } + this._tx.onerror = () => { + if (this._tx) { + this._tx.active = false + } + } + this._tx.onabort = () => { + if (this._tx) { + this._tx.active = false + } + } + + return { + store: this._tx.store, + done: next + } + }) + } + + async * _queryIt (q) { + const range = q.prefix ? self.IDBKeyRange.bound(str2ab(q.prefix), str2ab(q.prefix + '\xFF'), false, true) : undefined + const { + store, + done + } = await this._getStore() + + try { + let cursor = await store.openCursor(range) + + let limit = 0 + + if (cursor && q.offset && q.offset > 0) { + cursor = await cursor.advance(q.offset) + } + + while (cursor) { + // the transaction is only active *after* we've opened the cursor, so stop any interleaved + // read/writes from encountering 'transaction is not active' errors while we open the cursor. + // Sigh. This is why we can't have nice things. + this._tx.active = true + + // process any requests that occured while the cursor was moving + await this.taskQueue.drain(store) + + // limit + if (q.limit !== undefined && q.limit === limit) { + break + } + limit++ + + const key = new Key(Buffer.from(cursor.key)) + let value + + if (!q.keysOnly) { + value = Buffer.from(cursor.value) + } + + if (q.keysOnly) { + yield { key } + } else { + yield { key, value } + } + + // the transaction can end before the cursor promise has resolved + this._tx.active = false + cursor = await cursor.continue() + } + + await this.taskQueue.drain() + } finally { + done() + } } async open () { @@ -87,24 +202,50 @@ class IdbDatastore extends Adapter { const location = this.location try { - this.store = await openDB(this.location, this.version, { + const store = await openDB(location, this.version, { upgrade (db) { db.createObjectStore(location) } }) + + // this store requires a `location` arg but the transaction stores + // do not so make the API the same + this.store = { + get: (key) => store.get(location, key), + getKey: (key) => store.getKey(location, key), + put: (key, value) => store.put(location, key, value), + delete: (key) => store.delete(location, key), + transaction: (type) => store.transaction(location, type), + close: () => store.close() + } } catch (err) { throw Errors.dbOpenFailedError(err) } + + this.taskQueue = new TaskQueue(this.store) } async put (key, val) { if (this.store === null) { throw new Error('Datastore needs to be opened.') } + try { - await this.store.put(this.location, val, key.toBuffer()) + if (this._tx) { + if (this._tx.active) { + await this._tx.store.put(val, key.toBuffer()) + } else { + await this.taskQueue.push((store) => store.put(val, key.toBuffer())) + } + } else { + await this.store.put(val, key.toBuffer()) + } } catch (err) { - throw Errors.dbWriteFailedError(err) + if (err.name === 'TransactionInactiveError') { + await this.taskQueue.push((store) => store.put(val, key.toBuffer())) + } else { + throw Errors.dbWriteFailedError(err) + } } } @@ -112,11 +253,24 @@ class IdbDatastore extends Adapter { if (this.store === null) { throw new Error('Datastore needs to be opened.') } + let value try { - value = await this.store.get(this.location, key.toBuffer()) + if (this._tx) { + if (this._tx.active) { + value = await this._tx.store.get(key.toBuffer()) + } else { + value = await this.taskQueue.push((store) => store.get(key.toBuffer())) + } + } else { + value = await this.store.get(key.toBuffer()) + } } catch (err) { - throw Errors.dbWriteFailedError(err) + if (err.name === 'TransactionInactiveError') { + value = await this.taskQueue.push((store) => store.get(key.toBuffer())) + } else { + throw Errors.dbWriteFailedError(err) + } } if (!value) { @@ -130,23 +284,53 @@ class IdbDatastore extends Adapter { if (this.store === null) { throw new Error('Datastore needs to be opened.') } + + let res + try { - await this.get(key) + if (this._tx) { + if (this._tx.active) { + res = await this._tx.store.getKey(key.toBuffer()) + } else { + res = await this.taskQueue.push((store) => store.getKey(key.toBuffer())) + } + } else { + res = await this.store.getKey(key.toBuffer()) + } } catch (err) { - if (err.code === 'ERR_NOT_FOUND') return false - throw err + if (err.name === 'TransactionInactiveError') { + res = await this.taskQueue.push((store) => store.getKey(key.toBuffer())) + } else if (err.code === 'ERR_NOT_FOUND') { + return false + } else { + throw err + } } - return true + + return Boolean(res) } async delete (key) { if (this.store === null) { throw new Error('Datastore needs to be opened.') } + try { - await this.store.delete(this.location, key.toBuffer()) + if (this._tx) { + if (this._tx.active) { + await this._tx.store.delete(key.toBuffer()) + } else { + await this.taskQueue.push((store) => store.delete(key.toBuffer())) + } + } else { + await this.store.delete(key.toBuffer()) + } } catch (err) { - throw Errors.dbDeleteFailedError(err) + if (err.name === 'TransactionInactiveError') { + await this.taskQueue.push((store) => store.delete(key.toBuffer())) + } else { + throw Errors.dbDeleteFailedError(err) + } } } @@ -162,23 +346,30 @@ class IdbDatastore extends Adapter { dels.push(key.toBuffer()) }, commit: async () => { - if (this.store === null) { - throw new Error('Datastore needs to be opened.') + const { + store, + done + } = await this._getStore() + + try { + this._tx.active = true + + // process any requests that occured while the transaction was opening + await this.taskQueue.drain(store) + + await Promise.all(puts.map(p => store.put(p[1], p[0]))) + await Promise.all(dels.map(p => store.delete(p))) + + await this.taskQueue.drain(store) + } finally { + done() } - const tx = this.store.transaction(this.location, 'readwrite') - const store = tx.store - await Promise.all(puts.map(p => store.put(p[1], p[0]))) - await Promise.all(dels.map(p => store.delete(p))) - await tx.done } } } query (q) { - if (this.store === null) { - throw new Error('Datastore needs to be opened.') - } - let it = queryIt(q, this.store, this.location) + let it = this._queryIt(q) if (Array.isArray(q.filters)) { it = q.filters.reduce((it, f) => filter(it, f), it) @@ -191,12 +382,15 @@ class IdbDatastore extends Adapter { return it } - close () { - if (this.store === null) { - throw new Error('Datastore needs to be opened.') + async close () { + if (this._tx) { + await this._tx.done + } + + if (this.store) { + this.store.close() + this.store = null } - this.store.close() - this.store = null } destroy () { diff --git a/test/index.spec.js b/test/index.spec.js index 8b88892..8f9969c 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -5,8 +5,9 @@ const { MountDatastore } = require('datastore-core') const { Key } = require('interface-datastore') const { isNode } = require('ipfs-utils/src/env') const IDBStore = require('../src') +const { expect } = require('aegir/utils/chai') -describe('LevelDatastore', function () { +describe('IndexedDB Datastore', function () { if (isNode) { return } @@ -51,4 +52,71 @@ describe('LevelDatastore', function () { } }) }) + + describe('concurrency', () => { + let store + + before(async () => { + store = new IDBStore('hello') + await store.open() + }) + + it('should not explode under unreasonable load', function (done) { + this.timeout(10000) + + const updater = setInterval(async () => { + try { + const key = new Key('/a-' + Date.now()) + + await store.put(key, Buffer.from([0, 1, 2, 3])) + await store.has(key) + await store.get(key) + } catch (err) { + clearInterval(updater) + clearInterval(mutatorQuery) + clearInterval(readOnlyQuery) + done(err) + } + }, 0) + + const mutatorQuery = setInterval(async () => { + try { + for await (const { key } of store.query({})) { + await store.get(key) + + const otherKey = new Key('/b-' + Date.now()) + const otherValue = Buffer.from([0, 1, 2, 3]) + await store.put(otherKey, otherValue) + const res = await store.get(otherKey) + expect(res).to.deep.equal(otherValue) + } + } catch (err) { + clearInterval(updater) + clearInterval(mutatorQuery) + clearInterval(readOnlyQuery) + done(err) + } + }, 0) + + const readOnlyQuery = setInterval(async () => { + try { + for await (const { key } of store.query({})) { + await store.has(key) + } + } catch (err) { + clearInterval(updater) + clearInterval(mutatorQuery) + clearInterval(readOnlyQuery) + done(err) + } + }, 0) + + setTimeout(() => { + clearInterval(updater) + clearInterval(mutatorQuery) + clearInterval(readOnlyQuery) + done() + }, 5000) + }) + }) })