diff --git a/package.json b/package.json index 216fc76085..f283f9f210 100644 --- a/package.json +++ b/package.json @@ -99,11 +99,11 @@ "ipfs-bitswap": "^0.26.0", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.16.0", - "ipfs-http-client": "^38.2.0", + "ipfs-http-client": "github:ipfs/js-ipfs-http-client#auhau/feat/multihash_keys_in_datastore", "ipfs-http-response": "~0.3.1", "ipfs-mfs": "^0.13.0", "ipfs-multipart": "^0.2.0", - "ipfs-repo": "^0.28.0", + "ipfs-repo": "github:ipfs/js-ipfs-repo#auhau/feat/multihash_keys_in_datastore", "ipfs-unixfs": "~0.1.16", "ipfs-unixfs-exporter": "^0.38.0", "ipfs-unixfs-importer": "^0.40.0", @@ -203,7 +203,7 @@ "execa": "^2.0.4", "form-data": "^2.5.1", "hat": "0.0.3", - "interface-ipfs-core": "^0.117.2", + "interface-ipfs-core": "github:ipfs/interface-js-ipfs-core#auhau/feat/multihash_keys_in_datastore", "ipfs-interop": "^0.1.1", "ipfsd-ctl": "^0.47.2", "libp2p-websocket-star": "~0.10.2", diff --git a/src/cli/commands/refs-local.js b/src/cli/commands/refs-local.js index c0ce2a894a..b6fef46058 100644 --- a/src/cli/commands/refs-local.js +++ b/src/cli/commands/refs-local.js @@ -5,12 +5,22 @@ module.exports = { describe: 'List all local references.', - handler ({ getIpfs, print, resolve }) { + builder (yargs) { + return yargs + .option('multihash', { + desc: 'Shows base32 encoded multihashes instead of reconstructed CIDs', + type: 'boolean', + default: false + }) + .epilog('CIDs are reconstructed therefore they might differ from those under which the blocks were originally stored.') + }, + + handler ({ getIpfs, print, resolve, multihash }) { resolve((async () => { const ipfs = await getIpfs() return new Promise((resolve, reject) => { - const stream = ipfs.refs.localReadableStream() + const stream = ipfs.refs.localReadableStream({ multihash }) stream.on('error', reject) stream.on('end', resolve) diff --git a/src/cli/commands/repo/gc.js b/src/cli/commands/repo/gc.js index b805ac9934..5eb6d7a672 100644 --- a/src/cli/commands/repo/gc.js +++ b/src/cli/commands/repo/gc.js @@ -27,7 +27,7 @@ module.exports = { if (r.err) { streamErrors && print(r.err.message, true, true) } else { - print((quiet ? '' : 'removed ') + r.cid) + print((quiet ? '' : 'removed ') + r.multihash) } } })()) diff --git a/src/core/components/files-regular/refs-local-pull-stream.js b/src/core/components/files-regular/refs-local-pull-stream.js index 77c396f58f..06ed63c124 100644 --- a/src/core/components/files-regular/refs-local-pull-stream.js +++ b/src/core/components/files-regular/refs-local-pull-stream.js @@ -1,26 +1,18 @@ 'use strict' -const CID = require('cids') -const base32 = require('base32.js') +const { keyToCid } = require('ipfs-repo/src/blockstore-utils') const itToPull = require('async-iterator-to-pull-stream') module.exports = function (self) { - return () => { + return ({ multihash }) => { return itToPull((async function * () { - for await (const result of self._repo.blocks.query({ keysOnly: true })) { - yield dsKeyToRef(result.key) + for await (const { key: k } of self._repo.blocks.query({ keysOnly: true })) { + try { + yield { ref: multihash ? k.toString().substr(1) : keyToCid(k).toString() } + } catch (err) { + yield { err: `Could not convert block with key '${k.toString()}' to CID: ${err.message}` } + } } })()) } } - -function dsKeyToRef (key) { - try { - // Block key is of the form / - const decoder = new base32.Decoder() - const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize()) - return { ref: new CID(buff).toString() } - } catch (err) { - return { err: `Could not convert block with key '${key}' to CID: ${err.message}` } - } -} diff --git a/src/core/components/files-regular/refs-local-readable-stream.js b/src/core/components/files-regular/refs-local-readable-stream.js index b73eee29bf..db4c06af84 100644 --- a/src/core/components/files-regular/refs-local-readable-stream.js +++ b/src/core/components/files-regular/refs-local-readable-stream.js @@ -3,7 +3,7 @@ const toStream = require('pull-stream-to-stream') module.exports = function (self) { - return (ipfsPath, options) => { - return toStream.source(self.refs.localPullStream()) + return (options) => { + return toStream.source(self.refs.localPullStream(options)) } } diff --git a/src/core/components/files-regular/refs-local.js b/src/core/components/files-regular/refs-local.js index 7d78388483..0e06f5a86f 100644 --- a/src/core/components/files-regular/refs-local.js +++ b/src/core/components/files-regular/refs-local.js @@ -4,9 +4,16 @@ const promisify = require('promisify-es6') const pull = require('pull-stream') module.exports = function (self) { - return promisify((callback) => { + return promisify((options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + + options = options || {} + pull( - self.refs.localPullStream(), + self.refs.localPullStream(options), pull.collect((err, values) => { if (err) { return callback(err) diff --git a/src/core/components/pin/gc.js b/src/core/components/pin/gc.js index a974a85de5..8c272ef83b 100644 --- a/src/core/components/pin/gc.js +++ b/src/core/components/pin/gc.js @@ -3,7 +3,6 @@ const CID = require('cids') const base32 = require('base32.js') const callbackify = require('callbackify') -const { cidToString } = require('../../../utils/cid') const log = require('debug')('ipfs:gc') const { default: Queue } = require('p-queue') // TODO: Use exported key from root when upgraded to ipfs-mfs@>=13 @@ -47,7 +46,7 @@ module.exports = function gc (self) { }) } -// Get Set of CIDs of blocks to keep +// Get Set of multihashes of blocks to keep async function createMarkedSet (ipfs) { const output = new Set() @@ -55,7 +54,8 @@ async function createMarkedSet (ipfs) { log(`Found ${pins.length} pinned blocks`) pins.forEach(pin => { - output.add(cidToString(new CID(pin), { base: 'base32' })) + const cid = new CID(pin) + output.add(base32.encode(cid.multihash)) }) } @@ -91,7 +91,6 @@ async function getDescendants (ipfs, cid) { const refs = await ipfs.refs(cid, { recursive: true }) const cids = [cid, ...refs.map(r => new CID(r.ref))] log(`Found ${cids.length} MFS blocks`) - // log(' ' + cids.join('\n ')) return cids } @@ -100,54 +99,37 @@ async function getDescendants (ipfs, cid) { async function deleteUnmarkedBlocks (ipfs, markedSet, blockKeys) { // Iterate through all blocks and find those that are not in the marked set // The blockKeys variable has the form [ { key: Key() }, { key: Key() }, ... ] - const unreferenced = [] const result = [] + let blockCounter = 0 const queue = new Queue({ concurrency: BLOCK_RM_CONCURRENCY }) for await (const { key: k } of blockKeys) { - try { - const cid = dsKeyToCid(k) - const b32 = cid.toV1().toString('base32') - if (!markedSet.has(b32)) { - unreferenced.push(cid) - - queue.add(async () => { - const res = { - cid - } - - try { - await ipfs._repo.blocks.delete(cid) - } catch (err) { - res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`) - } - - result.push(res) - }) - } - } catch (err) { - const msg = `Could not convert block with key '${k}' to CID` - log(msg, err) - result.push({ err: new Error(msg + `: ${err.message}`) }) + blockCounter++ + const multihashString = k.toString().substr(1) + if (!markedSet.has(multihashString)) { + queue.add(async () => { + const res = { + multihash: multihashString + } + + try { + await ipfs._repo.blocks.delete(base32.decode(multihashString)) + } catch (err) { + res.err = new Error(`Could not delete block with multihash ${multihashString}: ${err.message}`) + } + + result.push(res) + }) } } await queue.onIdle() - log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockKeys.length} blocks. ` + - `Deleted ${unreferenced.length} blocks.`) + log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockCounter} blocks. ` + + `Deleted ${result.filter(res => res.err === undefined).length} blocks.`) return result } - -// TODO: Use exported utility when upgrade to ipfs-repo@>=0.27.1 -// https://github.com/ipfs/js-ipfs-repo/pull/206 -function dsKeyToCid (key) { - // Block key is of the form / - const decoder = new base32.Decoder() - const buff = decoder.write(key.toString().slice(1)).finalize() - return new CID(Buffer.from(buff)) -} diff --git a/src/http/api/resources/files-regular.js b/src/http/api/resources/files-regular.js index ea711cfc18..4b745e6e2d 100644 --- a/src/http/api/resources/files-regular.js +++ b/src/http/api/resources/files-regular.js @@ -37,7 +37,7 @@ exports.parseKey = (request, h) => { const { arg } = request.query if (!arg) { - throw Boom.badRequest("Argument 'key' is required") + throw Boom.badRequest('Argument \'key\' is required') } const isArray = Array.isArray(arg) @@ -235,7 +235,7 @@ exports.add = { ) .then(() => { if (!filesParsed) { - throw new Error("File argument 'data' is required.") + throw new Error('File argument \'data\' is required.') } }) .catch(err => { @@ -342,10 +342,19 @@ exports.refs = { } exports.refs.local = { + validate: { + query: Joi.object().keys({ + multihash: Joi.boolean().default(false), + }).unknown() + }, + // main route handler handler (request, h) { const { ipfs } = request.server.app - const source = ipfs.refs.localPullStream() + + const multihash = request.query.multihash + + const source = ipfs.refs.localPullStream({ multihash }) return sendRefsReplyStream(request, h, 'local refs', source) } } diff --git a/src/http/api/resources/repo.js b/src/http/api/resources/repo.js index 108ce6abda..39768223e8 100644 --- a/src/http/api/resources/repo.js +++ b/src/http/api/resources/repo.js @@ -18,7 +18,7 @@ exports.gc = { const response = filtered.map(r => { return { Err: r.err && r.err.message, - Key: !r.err && { '/': r.cid.toString() } + Key: !r.err && r.multihash } }) return h.response(response) diff --git a/test/cli/refs-local.js b/test/cli/refs-local.js index 6b1e5fc872..cb0d8dce48 100644 --- a/test/cli/refs-local.js +++ b/test/cli/refs-local.js @@ -18,7 +18,7 @@ describe('refs-local', () => runOnAndOff((thing) => { const out = await ipfs('refs-local') const lines = out.split('\n') - expect(lines.includes('QmPkWYfSLCEBLZu7BZt4kigGDMe3cpogMbeVf97gN2xJDN')).to.eql(true) - expect(lines.includes('QmUhUuiTKkkK8J6JZ9zmj8iNHPuNfGYcszgRumzhHBxEEU')).to.eql(true) + expect(lines.includes('bafkreicjl7v3vyyv4zlryihez5xhunqmriry6styhil7z5lhd3r4prnz6y')).to.eql(true) + expect(lines.includes('bafkreidj5bovvm25wszvajfshj7m7m2efpswcs6dsz7giz52ovlquxc4o4')).to.eql(true) }) })) diff --git a/test/core/gc.spec.js b/test/core/gc.spec.js index 42022f3fed..059ffb3bbc 100644 --- a/test/core/gc.spec.js +++ b/test/core/gc.spec.js @@ -7,6 +7,8 @@ const IPFSFactory = require('ipfsd-ctl') const pEvent = require('p-event') const env = require('ipfs-utils/src/env') const IPFS = require('../../src/core') +const CID = require('cids') +const base32 = require('base32.js') const { Errors } = require('interface-datastore') // We need to detect when a readLock or writeLock is requested for the tests @@ -90,17 +92,17 @@ describe('gc', function () { name: 'add', add1: () => ipfs.add(fixtures[0], { pin: false }), add2: () => ipfs.add(fixtures[1], { pin: false }), - resToCid: (res) => res[0].hash + resToMultihash: (res) => base32.encode(new CID(res[0].hash).multihash) }, { name: 'object put', add1: () => ipfs.object.put({ Data: 'obj put 1', Links: [] }), add2: () => ipfs.object.put({ Data: 'obj put 2', Links: [] }), - resToCid: (res) => res.toString() + resToMultihash: (res) => base32.encode(res.multihash) }, { name: 'block put', add1: () => ipfs.block.put(Buffer.from('block put 1'), null), add2: () => ipfs.block.put(Buffer.from('block put 2'), null), - resToCid: (res) => res.cid.toString() + resToMultihash: (res) => base32.encode(res.cid.multihash) }] describe('locks', function () { @@ -122,9 +124,9 @@ describe('gc', function () { await gcStarted const add2 = test.add2() - const deleted = (await gc).map(i => i.cid.toString()) - const add1Res = test.resToCid(await add1) - const add2Res = test.resToCid(await add2) + const deleted = (await gc).map(i => i.multihash) + const add1Res = test.resToMultihash(await add1) + const add2Res = test.resToMultihash(await add2) // Should have garbage collected blocks from first add, because GC should // have waited for first add to finish @@ -152,9 +154,9 @@ describe('gc', function () { await gcStarted const add2 = ipfs.add(fixtures[3], { pin: true }) - const deleted = (await gc).map(i => i.cid.toString()) - const add1Res = (await add1)[0].hash - const add2Res = (await add2)[0].hash + const deleted = (await gc).map(i => i.multihash) + const add1Res = base32.encode(new CID((await add1)[0].hash).multihash) + const add2Res = base32.encode(new CID((await add2)[0].hash).multihash) // Should not have garbage collected blocks from first add, because GC should // have waited for first add + pin to finish (protected by pin) @@ -168,7 +170,9 @@ describe('gc', function () { it('garbage collection should wait for pending block rm to finish', async () => { // Add two blocks so that we can remove them const cid1 = (await ipfs.block.put(Buffer.from('block to rm 1'), null)).cid + const cid1Multihash = base32.encode(cid1.multihash) const cid2 = (await ipfs.block.put(Buffer.from('block to rm 2'), null)).cid + const cid2Multihash = base32.encode(cid2.multihash) // Remove first block from IPFS // Note: block rm will take a write lock @@ -185,33 +189,34 @@ describe('gc', function () { await gcStarted const rm2 = ipfs.block.rm(cid2) - const deleted = (await gc).map(i => i.cid.toString()) - await rm1 - - // Second rm should fail because GC has already removed that block - try { - await rm2 - } catch (err) { - expect(err.code).eql(Errors.dbDeleteFailedError().code) - } + const deleted = (await gc).map(i => i.multihash) + const rm1Out = await rm1 + expect(rm1Out[0]).to.not.have.property('error') // Confirm second block has been removed - const localRefs = (await ipfs.refs.local()).map(r => r.ref) - expect(localRefs).not.includes(cid2.toString()) + const localMultihashes = (await ipfs.refs.local()).map(r => base32.encode(new CID(r.ref).multihash)) + expect(localMultihashes).not.includes(cid2Multihash) + + // Second rm should fail because GC has already removed that block + expect((await rm2)[0]) + .to.have.property('error') + .that.has.property('code').that.equal(Errors.dbDeleteFailedError().code) // Should not have garbage collected block from first block put, because // GC should have waited for first rm (removing first block put) to finish - expect(deleted).not.includes(cid1.toString()) + expect(deleted).not.includes(cid1Multihash) // Should have garbage collected block from second block put, because GC // should have completed before second rm (removing second block put) - expect(deleted).includes(cid2.toString()) + expect(deleted).includes(cid2Multihash) }) it('garbage collection should wait for pending pin add to finish', async () => { // Add two blocks so that we can pin them - const cid1 = (await ipfs.block.put(Buffer.from('block to pin add 1'), null)).cid - const cid2 = (await ipfs.block.put(Buffer.from('block to pin add 2'), null)).cid + const cid1 = (await ipfs.block.put(Buffer.from('block to test pin add 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to test pin add 2'), null)).cid + const cid1Multihash = base32.encode(cid1.multihash) + const cid2Multihash = base32.encode(cid2.multihash) // Pin first block // Note: pin add will take a read lock @@ -221,7 +226,7 @@ describe('gc', function () { // Once pin lock has been requested, start GC await pinLockRequested const gc = ipfs.repo.gc() - const deleted = (await gc).map(i => i.cid.toString()) + const deleted = (await gc).map(i => i.multihash) await pin1 // TODO: Adding pin for removed block never returns, which means the lock @@ -229,22 +234,24 @@ describe('gc', function () { // const pin2 = ipfs.pin.add(cid2) // Confirm second second block has been removed - const localRefs = (await ipfs.refs.local()).map(r => r.ref) - expect(localRefs).not.includes(cid2.toString()) + const localMultihashes = (await ipfs.refs.local()).map(r => base32.encode(new CID(r.ref).multihash)) + expect(localMultihashes).not.includes(cid2Multihash) // Should not have garbage collected block from first block put, because // GC should have waited for pin (protecting first block put) to finish - expect(deleted).not.includes(cid1.toString()) + expect(deleted).not.includes(cid1Multihash) // Should have garbage collected block from second block put, because GC // should have completed before second pin - expect(deleted).includes(cid2.toString()) + expect(deleted).includes(cid2Multihash) }) it('garbage collection should wait for pending pin rm to finish', async () => { // Add two blocks so that we can pin them const cid1 = (await ipfs.block.put(Buffer.from('block to pin rm 1'), null)).cid const cid2 = (await ipfs.block.put(Buffer.from('block to pin rm 2'), null)).cid + const cid1Multihash = base32.encode(cid1.multihash) + const cid2Multihash = base32.encode(cid2.multihash) // Pin blocks await ipfs.pin.add(cid1) @@ -265,17 +272,17 @@ describe('gc', function () { await gcStarted const pinRm2 = ipfs.pin.rm(cid2) - const deleted = (await gc).map(i => i.cid.toString()) + const deleted = (await gc).map(i => i.multihash) await pinRm1 await pinRm2 // Should have garbage collected block from first block put, because // GC should have waited for pin rm (unpinning first block put) to finish - expect(deleted).includes(cid1.toString()) + expect(deleted).includes(cid1Multihash) // Should not have garbage collected block from second block put, because // GC should have completed before second block was unpinned - expect(deleted).not.includes(cid2.toString()) + expect(deleted).not.includes(cid2Multihash) }) }) }) diff --git a/test/fixtures/go-ipfs-repo/version b/test/fixtures/go-ipfs-repo/version index c7930257df..301160a930 100644 --- a/test/fixtures/go-ipfs-repo/version +++ b/test/fixtures/go-ipfs-repo/version @@ -1 +1 @@ -7 \ No newline at end of file +8 \ No newline at end of file