Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions benchmarks/put-get.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

const Benchmark = require('benchmark')
const assert = require('assert')
const all = require('async-iterator-all')
const all = require('it-all')
const drain = require('it-drain')
const makeBlock = require('../test/utils/make-block')
const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork

Expand All @@ -24,7 +25,7 @@ const blockSizes = [10, 1024, 10 * 1024]
suite.add(`put-get ${n} blocks of size ${k}`, async (defer) => {
const blocks = await makeBlock(n, k)

await bitswap.putMany(blocks)
await drain(bitswap.putMany(blocks))

const res = await all(bitswap.getMany(blocks.map(block => block.cid)))

Expand Down
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^21.10.1",
"async-iterator-all": "^1.0.0",
"aegir": "^22.0.0",
"benchmark": "^2.1.4",
"buffer": "^5.6.0",
"chai": "^4.2.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"ipfs-repo": "^2.0.0",
"ipfs-repo": "^3.0.1",
"ipfs-utils": "^2.2.0",
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"libp2p": "^0.27.0",
"libp2p-kad-dht": "^0.18.3",
"libp2p-mplex": "^0.9.2",
Expand All @@ -71,10 +70,13 @@
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^3.3.2"
"uuid": "^8.0.0"
},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^1.1.0",
"bignumber.js": "^9.0.0",
"cids": "~0.8.0",
"debug": "^4.1.0",
Expand Down
164 changes: 95 additions & 69 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const Stats = require('./stats')
const AbortController = require('abort-controller')
const anySignal = require('any-signal')

const defaultOptions = {
statsEnabled: false,
Expand Down Expand Up @@ -101,9 +103,14 @@ class Bitswap {
this._log('received block')

const has = await this.blockstore.has(block.cid)

this._updateReceiveCounters(peerId.toB58String(), block, has)

if (has || !wasWanted) {
if (wasWanted) {
this._sendHaveBlockNotifications(block)
}

return
}

Expand Down Expand Up @@ -176,65 +183,88 @@ class Bitswap {
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Block>}
*/
async get (cid) {
for await (const block of this.getMany([cid])) {
return block
async get (cid, options = {}) { // eslint-disable-line require-await
const fetchFromNetwork = (cid, options) => {
// add it to the want list - n.b. later we will abort the AbortSignal
// so no need to remove the blocks from the wantlist after we have it
this.wm.wantBlocks([cid], options)

return this.notifications.wantBlock(cid, options)
}
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Iterable<CID>} cids
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids) {
let pendingStart = cids.length
const wantList = []
let promptedNetwork = false

const fetchFromNetwork = async (cid) => {
wantList.push(cid)
const loadOrFetchFromNetwork = async (cid, options) => {
try {
// have to await here as we want to handle ERR_NOT_FOUND
const block = await this.blockstore.get(cid, options)

const blockP = this.notifications.wantBlock(cid)
return block
} catch (err) {
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}

if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
if (!promptedNetwork) {
promptedNetwork = true

const block = await blockP
this.wm.cancelWants([cid])
this.network.findAndConnect(cid)
.catch((err) => this._log.error(err))
}

return block
// we don't have the block locally so fetch it from the network
return fetchFromNetwork(cid, options)
}
}

for (const cid of cids) {
const has = await this.blockstore.has(cid)
pendingStart--
if (has) {
if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
yield this.blockstore.get(cid)
// depending on implementation it's possible for blocks to come in while
// we do the async operations to get them from the blockstore leading to
// a race condition, so register for incoming block notifications as well
// as trying to get it from the datastore
const controller = new AbortController()
const signal = anySignal([options.signal, controller.signal])

const block = await Promise.race([
this.notifications.wantBlock(cid, {
signal
}),
loadOrFetchFromNetwork(cid, {
signal
})
])

continue
}
// since we have the block we can now remove our listener
controller.abort()

if (!promptedNetwork) {
promptedNetwork = true
this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err))
}
return block
}

// we don't have the block locally so fetch it from the network
yield fetchFromNetwork(cid)
/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {AsyncIterator<CID>} cids
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids, options = {}) {
for await (const cid of cids) {
yield this.get(cid, options)
}
}

/**
* Removes the given CIDs from the wantlist independent of any ref counts
* Removes the given CIDs from the wantlist independent of any ref counts.
*
* This will cause all outstanding promises for a given block to reject.
*
* If you want to cancel the want for a block without doing that, pass an
* AbortSignal in to `.get` or `.getMany` and abort it.
*
* @param {Iterable<CID>} cids
* @returns {void}
Expand All @@ -249,7 +279,9 @@ class Bitswap {
}

/**
* Removes the given keys from the want list
* Removes the given keys from the want list. This may cause pending promises
* for blocks to never resolve. If you wish these promises to abort instead
* call `unwant(cids)` instead.
*
* @param {Iterable<CID>} cids
* @returns {void}
Expand All @@ -269,45 +301,39 @@ class Bitswap {
* @returns {Promise<void>}
*/
async put (block) { // eslint-disable-line require-await
return this.putMany([block])
await this.blockstore.put(block)
this._sendHaveBlockNotifications(block)
}

/**
* Put the given blocks to the underlying blockstore and
* send it to nodes that have it them their wantlist.
*
* @param {AsyncIterable<Block>|Iterable<Block>} blocks
* @returns {Promise<void>}
* @param {AsyncIterable<Block>} blocks
* @returns {AsyncIterable<Block>}
*/
async putMany (blocks) { // eslint-disable-line require-await
const self = this

// Add any new blocks to the blockstore
const newBlocks = []
await this.blockstore.putMany(async function * () {
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}
async * putMany (blocks) { // eslint-disable-line require-await
for await (const block of this.blockstore.putMany(blocks)) {
this._sendHaveBlockNotifications(block)

yield block
newBlocks.push(block)
}
}())

// Notify engine that we have new blocks
this.engine.receivedBlocks(newBlocks)

// Notify listeners that we have received the new blocks
for (const block of newBlocks) {
this.notifications.hasBlock(block)
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
self._log.error('Failed to provide: %s', err.message)
})
yield block
}
}

/**
* Sends notifications about the arrival of a block
*
* @param {Block} block
*/
_sendHaveBlockNotifications (block) {
this.notifications.hasBlock(block)
this.engine.receivedBlocks([block])
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
this._log.error('Failed to provide: %s', err.message)
})
}

/**
* Get the current list of wants.
*
Expand Down
39 changes: 27 additions & 12 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,17 @@ class Network {
*
* @param {CID} cid
* @param {number} maxProviders
* @returns {Promise<Result<Array>>}
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {AsyncIterable<PeerInfo>}
*/
findProviders (cid, maxProviders) {
findProviders (cid, maxProviders, options = {}) {
return this.libp2p.contentRouting.findProviders(
cid,
{
maxTimeout: CONSTANTS.providerRequestTimeout,
maxNumProviders: maxProviders
maxNumProviders: maxProviders,
signal: options.signal
}
)
}
Expand All @@ -121,19 +124,29 @@ class Network {
* Find the providers of a given `cid` and connect to them.
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {void}
*/
async findAndConnect (cid) {
async findAndConnect (cid, options) {
const connectAttempts = []
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest)) {
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, options)) {
this._log('connecting to providers', provider.id.toB58String())
connectAttempts.push(this.connectTo(provider))
connectAttempts.push(this.connectTo(provider, options))
}
await Promise.all(connectAttempts)
}

async provide (cid) {
await this.libp2p.contentRouting.provide(cid)
/**
* Tell the network we can provide content for the passed CID
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<void>}
*/
async provide (cid, options) {
await this.libp2p.contentRouting.provide(cid, options)
}

// Connect to the given peer
Expand Down Expand Up @@ -169,19 +182,21 @@ class Network {
* Connects to another peer
*
* @param {PeerInfo|PeerId|Multiaddr} peer
* @returns {Promise.<Connection>}
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Connection>}
*/
async connectTo (peer) { // eslint-disable-line require-await
async connectTo (peer, options) { // eslint-disable-line require-await
if (!this._running) {
throw new Error('network isn\'t running')
}

return this.libp2p.dial(peer)
return this.libp2p.dial(peer, options)
}

// Dial to the peer and try to use the most recent Bitswap
_dialPeer (peer) {
return this.libp2p.dialProtocol(peer, [BITSWAP110, BITSWAP100])
return this.libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100])
}

_updateSentStats (peer, blocks) {
Expand Down
Loading