Skip to content

Commit f748a1d

Browse files
committed
fix: race condition when requesting the same block twice
When we call `blockstore.putMany`, some implementations will batch up all the `put`s and write them at once. This means that `blockstore.has` might not return `true` for a little while - if another request for a given block comes in before `blockstore.has` returns `true` it'll get added to the want list. If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang idefinately. The change made here is to separate the sending of notifications out from putting things into the blockstore. If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block.
1 parent 936f899 commit f748a1d

File tree

3 files changed

+71
-6
lines changed

3 files changed

+71
-6
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"promisify-es6": "^1.0.3",
7070
"rimraf": "^3.0.0",
7171
"safe-buffer": "^5.1.2",
72+
"sinon": "^9.0.0",
7273
"stats-lite": "^2.2.0",
7374
"uuid": "^3.3.2"
7475
},

src/index.js

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ class Bitswap {
101101
this._updateReceiveCounters(peerId.toB58String(), block, has)
102102

103103
if (has || !wasWanted) {
104+
if (wasWanted) {
105+
this._sendHaveBlockNotifications(block)
106+
}
107+
104108
return
105109
}
106110

@@ -287,16 +291,25 @@ class Bitswap {
287291

288292
yield block
289293

290-
self.notifications.hasBlock(block)
291-
self.engine.receivedBlocks([block.cid])
292-
// Note: Don't wait for provide to finish before returning
293-
self.network.provide(block.cid).catch((err) => {
294-
self._log.error('Failed to provide: %s', err.message)
295-
})
294+
self._sendHaveBlockNotifications(block)
296295
}
297296
}())
298297
}
299298

299+
/**
300+
* Sends notifications about the arrival of a block
301+
*
302+
* @param {Block} block
303+
*/
304+
_sendHaveBlockNotifications (block) {
305+
this.notifications.hasBlock(block)
306+
this.engine.receivedBlocks([block.cid])
307+
// Note: Don't wait for provide to finish before returning
308+
this.network.provide(block.cid).catch((err) => {
309+
this._log.error('Failed to provide: %s', err.message)
310+
})
311+
}
312+
300313
/**
301314
* Get the current list of wants.
302315
*

test/bitswap.js

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ const chai = require('chai')
55
chai.use(require('dirty-chai'))
66
const expect = chai.expect
77
const delay = require('delay')
8+
const PeerId = require('peer-id')
9+
const sinon = require('sinon')
810

911
const Bitswap = require('../src')
1012

1113
const createTempRepo = require('./utils/create-temp-repo-nodejs')
1214
const createLibp2pNode = require('./utils/create-libp2p-node')
1315
const makeBlock = require('./utils/make-block')
1416
const orderedFinish = require('./utils/helpers').orderedFinish
17+
const Message = require('../src/types/message')
1518

1619
// Creates a repo + libp2pNode + Bitswap with or without DHT
1720
async function createThing (dht) {
@@ -70,6 +73,54 @@ describe('bitswap without DHT', function () {
7073

7174
finish.assert()
7275
})
76+
77+
it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => {
78+
// the block we want
79+
const block = await makeBlock()
80+
81+
// id of a peer with the block we want
82+
const peerId = await PeerId.create({ bits: 512 })
83+
84+
// incoming message with requested block from the other peer
85+
const message = new Message(false)
86+
message.addEntry(block.cid, 1, false)
87+
message.addBlock(block)
88+
89+
// slow blockstore
90+
nodes[0].bitswap.blockstore = {
91+
has: sinon.stub().withArgs(block.cid).returns(false),
92+
putMany: async function * (source) { // eslint-disable-line require-await
93+
yield * source
94+
}
95+
}
96+
97+
// add the block to our want list
98+
const wantBlockPromise1 = nodes[0].bitswap.get(block.cid)
99+
100+
// oh look, a peer has sent it to us - this will trigger a `blockstore.putMany` which
101+
// for our purposes is a batch operation so `self.blockstore.has(cid)` will still return
102+
// false even though we've just yielded a block with that cid
103+
await nodes[0].bitswap._receiveMessage(peerId, message)
104+
105+
// block store did not have it
106+
expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true()
107+
108+
// another context wants the same block
109+
const wantBlockPromise2 = nodes[0].bitswap.get(block.cid)
110+
111+
// meanwhile the blockstore finishes it's batch
112+
nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true)
113+
114+
// here it comes again
115+
await nodes[0].bitswap._receiveMessage(peerId, message)
116+
117+
// block store had it this time
118+
expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true()
119+
120+
// both requests should get the block
121+
expect(await wantBlockPromise1).to.deep.equal(block)
122+
expect(await wantBlockPromise2).to.deep.equal(block)
123+
})
73124
})
74125

75126
describe('bitswap with DHT', function () {

0 commit comments

Comments
 (0)