Skip to content

Commit 8c386c7

Browse files
committed
fix: return blocks from putmany as blocks are passed in
1 parent 4b4974a commit 8c386c7

File tree

3 files changed

+25
-59
lines changed

3 files changed

+25
-59
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
"ipfs-repo-migrations": "^0.2.1",
7474
"ipfs-utils": "^2.2.0",
7575
"ipld-block": "^0.9.1",
76+
"it-map": "^1.0.2",
77+
"it-pipe": "^1.1.0",
7678
"just-safe-get": "^2.0.0",
7779
"just-safe-set": "^2.1.0",
7880
"multibase": "^0.7.0",

src/blockstore.js

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
const core = require('datastore-core')
44
const ShardingStore = core.ShardingDatastore
55
const Block = require('ipld-block')
6-
const { cidToKey } = require('./blockstore-utils')
6+
const { cidToKey, keyToCid } = require('./blockstore-utils')
7+
const map = require('it-map')
8+
const pipe = require('it-pipe')
79

810
module.exports = async (filestore, options) => {
911
const store = await maybeWithSharding(filestore, options)
@@ -77,7 +79,7 @@ function createBaseStore (store) {
7779
*
7880
* @param {Block} block
7981
* @param {Object} options
80-
* @returns {Promise<void>}
82+
* @returns {Promise<Block>}
8183
*/
8284
async put (block, options) {
8385
if (!Block.isBlock(block)) {
@@ -100,20 +102,28 @@ function createBaseStore (store) {
100102
*
101103
* @param {AsyncIterable<Block>|Iterable<Block>} blocks
102104
* @param {Object} options
103-
* @returns {Promise<void>}
105+
* @returns {AsyncIterable<Block>}
104106
*/
105107
async * putMany (blocks, options) { // eslint-disable-line require-await
106-
yield * store.putMany((async function * () {
107-
for await (const block of blocks) {
108-
const key = cidToKey(block.cid)
109-
110-
if (await store.has(key, options)) {
111-
continue
112-
}
113-
114-
yield { key, value: block.data }
108+
yield * pipe(
109+
blocks,
110+
(source) => {
111+
// turn them into a key/value pair
112+
return map(source, (block) => {
113+
return { key: cidToKey(block.cid), value: block.data }
114+
})
115+
},
116+
(source) => {
117+
// put them into the datastore
118+
return store.putMany(source, options)
119+
},
120+
(source) => {
121+
// map the returned key/value back into a block
122+
return map(source, ({ key, value }) => {
123+
return new Block(value, keyToCid(key))
124+
})
115125
}
116-
}()), options)
126+
)
117127
},
118128
/**
119129
* Does the store contain block with this cid?

test/blockstore-test.js

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -73,52 +73,6 @@ module.exports = (repo) => {
7373
}
7474
})
7575

76-
it('should not .putMany when block is already present', async () => {
77-
const data = Buffer.from(`TEST${Date.now()}`)
78-
const hash = await multihashing(data, 'sha2-256')
79-
const cid = new CID(hash)
80-
const sent = []
81-
otherRepo = new IPFSRepo(tempDir(), {
82-
storageBackends: {
83-
blocks: class ExplodingBlockStore {
84-
open () {
85-
}
86-
87-
close () {
88-
89-
}
90-
91-
has () {
92-
return true
93-
}
94-
95-
async * putMany (source) {
96-
for await (const thing of source) {
97-
sent.push(thing)
98-
99-
yield thing
100-
}
101-
}
102-
}
103-
},
104-
storageBackendOptions: {
105-
blocks: {
106-
sharding: false
107-
}
108-
}
109-
})
110-
111-
await otherRepo.init({})
112-
await otherRepo.open()
113-
114-
await drain(otherRepo.blocks.putMany([{
115-
cid,
116-
data
117-
}]))
118-
119-
expect(sent).to.have.lengthOf(0)
120-
})
121-
12276
it('returns an error on invalid block', () => {
12377
return expect(repo.blocks.put('hello')).to.eventually.be.rejected()
12478
})

0 commit comments

Comments
 (0)