Skip to content
This repository was archived by the owner on Mar 23, 2023. It is now read-only.

Commit 807e425

Browse files
committed
fix: wrap operations in transactions
To get a cursor to operate over a set of keys from an idb store, you have to start a transaction. That transaction will remain open as long as there are tasks in the microtask queue - when it empties the transaction is automatically closed. Transactions operate on an ObjectStore, puts and gets not to the object store also seem to close the transaction. This change adds a `_getObjectStore` method to the datastore which creates a new transaction if there was no previous transation, or if the previous transaction was closed. All operations then take place as part of this transaction.
1 parent 417af5b commit 807e425

File tree

3 files changed

+95
-59
lines changed

3 files changed

+95
-59
lines changed

.aegir.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module.exports = {
2-
bundlesize: { maxSize: '12.1kB' }
2+
bundlesize: { maxSize: '13.7kB' }
33
}

src/index.js

Lines changed: 93 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -43,41 +43,88 @@ const str2ab = (str) => {
4343
return buf
4444
}
4545

46-
const queryIt = async function * (q, store, location) {
47-
const range = q.prefix ? self.IDBKeyRange.bound(str2ab(q.prefix), str2ab(q.prefix + '\xFF'), false, true) : undefined
48-
let cursor = await store.transaction(location).store.openCursor(range)
49-
let limit = 0
46+
class IdbDatastore extends Adapter {
47+
constructor (location, options = {}) {
48+
super()
5049

51-
if (cursor && q.offset && q.offset > 0) {
52-
cursor = await cursor.advance(q.offset)
50+
this.store = null
51+
this.options = options
52+
this.location = options.prefix + location
53+
this.version = options.version || 1
5354
}
5455

55-
while (cursor) {
56-
// limit
57-
if (q.limit !== undefined && q.limit === limit) {
58-
return
56+
_getStore () {
57+
if (this.store === null) {
58+
throw new Error('Datastore needs to be opened.')
5959
}
60-
limit++
6160

62-
const key = new Key(Buffer.from(cursor.key))
63-
if (q.keysOnly) {
64-
yield { key }
65-
} else {
66-
const value = Buffer.from(cursor.value)
67-
yield { key, value }
61+
if (!this._tx) {
62+
let cleanup
63+
64+
// idb gives us an `tx.done` promise, but awaiting on it then doing other
65+
// work can add tasks to the microtask queue which extends the life of
66+
// the transaction which may not be what the caller intended.
67+
const done = new Promise(resolve => {
68+
cleanup = () => {
69+
// make sure we don't accidentally reuse the 'finished' transaction
70+
this._tx = null
71+
72+
// resolve on the next iteration of the event loop to ensure that
73+
// we are actually, really done, the microtask queue has been emptied
74+
// and the transaction has been auto-committed
75+
setImmediate(() => {
76+
resolve()
77+
})
78+
}
79+
})
80+
81+
const tx = this.store.transaction(this.location, 'readwrite')
82+
tx.oncomplete = cleanup
83+
tx.onerror = cleanup
84+
tx.onabort = cleanup
85+
86+
this._tx = {
87+
tx,
88+
done
89+
}
6890
}
69-
cursor = await cursor.continue()
91+
92+
// we only operate on one object store so the tx.store property is set
93+
return this._tx.tx.store
7094
}
71-
}
7295

73-
class IdbDatastore extends Adapter {
74-
constructor (location, options = {}) {
75-
super()
96+
async * _queryIt (q) {
97+
if (this._tx) {
98+
await this._tx.done
99+
}
76100

77-
this.store = null
78-
this.options = options
79-
this.location = options.prefix + location
80-
this.version = options.version || 1
101+
const range = q.prefix ? self.IDBKeyRange.bound(str2ab(q.prefix), str2ab(q.prefix + '\xFF'), false, true) : undefined
102+
const store = this._getStore()
103+
let cursor = await store.openCursor(range)
104+
let limit = 0
105+
106+
if (cursor && q.offset && q.offset > 0) {
107+
cursor = await cursor.advance(q.offset)
108+
}
109+
110+
while (cursor) {
111+
// limit
112+
if (q.limit !== undefined && q.limit === limit) {
113+
return
114+
}
115+
limit++
116+
117+
const key = new Key(Buffer.from(cursor.key))
118+
if (q.keysOnly) {
119+
yield { key }
120+
} else {
121+
const value = Buffer.from(cursor.value)
122+
yield { key, value }
123+
}
124+
cursor = await cursor.continue()
125+
}
126+
127+
await this._tx.done
81128
}
82129

83130
async open () {
@@ -98,23 +145,17 @@ class IdbDatastore extends Adapter {
98145
}
99146

100147
async put (key, val) {
101-
if (this.store === null) {
102-
throw new Error('Datastore needs to be opened.')
103-
}
104148
try {
105-
await this.store.put(this.location, val, key.toBuffer())
149+
await this._getStore().put(val, key.toBuffer())
106150
} catch (err) {
107151
throw Errors.dbWriteFailedError(err)
108152
}
109153
}
110154

111155
async get (key) {
112-
if (this.store === null) {
113-
throw new Error('Datastore needs to be opened.')
114-
}
115156
let value
116157
try {
117-
value = await this.store.get(this.location, key.toBuffer())
158+
value = await this._getStore().get(key.toBuffer())
118159
} catch (err) {
119160
throw Errors.dbWriteFailedError(err)
120161
}
@@ -127,24 +168,19 @@ class IdbDatastore extends Adapter {
127168
}
128169

129170
async has (key) {
130-
if (this.store === null) {
131-
throw new Error('Datastore needs to be opened.')
132-
}
133171
try {
134-
await this.get(key)
172+
const res = await this._getStore().getKey(key.toBuffer())
173+
174+
return Boolean(res)
135175
} catch (err) {
136176
if (err.code === 'ERR_NOT_FOUND') return false
137177
throw err
138178
}
139-
return true
140179
}
141180

142181
async delete (key) {
143-
if (this.store === null) {
144-
throw new Error('Datastore needs to be opened.')
145-
}
146182
try {
147-
await this.store.delete(this.location, key.toBuffer())
183+
await this._getStore().delete(key.toBuffer())
148184
} catch (err) {
149185
throw Errors.dbDeleteFailedError(err)
150186
}
@@ -162,23 +198,20 @@ class IdbDatastore extends Adapter {
162198
dels.push(key.toBuffer())
163199
},
164200
commit: async () => {
165-
if (this.store === null) {
166-
throw new Error('Datastore needs to be opened.')
201+
if (this._tx) {
202+
await this._tx.done
167203
}
168-
const tx = this.store.transaction(this.location, 'readwrite')
169-
const store = tx.store
204+
205+
const store = this._getStore()
170206
await Promise.all(puts.map(p => store.put(p[1], p[0])))
171207
await Promise.all(dels.map(p => store.delete(p)))
172-
await tx.done
208+
await this._tx.done
173209
}
174210
}
175211
}
176212

177213
query (q) {
178-
if (this.store === null) {
179-
throw new Error('Datastore needs to be opened.')
180-
}
181-
let it = queryIt(q, this.store, this.location)
214+
let it = this._queryIt(q)
182215

183216
if (Array.isArray(q.filters)) {
184217
it = q.filters.reduce((it, f) => filter(it, f), it)
@@ -191,12 +224,15 @@ class IdbDatastore extends Adapter {
191224
return it
192225
}
193226

194-
close () {
195-
if (this.store === null) {
196-
throw new Error('Datastore needs to be opened.')
227+
async close () {
228+
if (this._tx) {
229+
await this._tx.done
230+
}
231+
232+
if (this.store) {
233+
this.store.close()
234+
this.store = null
197235
}
198-
this.store.close()
199-
this.store = null
200236
}
201237

202238
destroy () {

test/index.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const { Key } = require('interface-datastore')
66
const { isNode } = require('ipfs-utils/src/env')
77
const IDBStore = require('../src')
88

9-
describe('LevelDatastore', function () {
9+
describe('IndexedDB Datastore', function () {
1010
if (isNode) {
1111
return
1212
}

0 commit comments

Comments
 (0)