33const debug = require ( 'debug' )
44const mh = require ( 'multihashes' )
55const pull = require ( 'pull-stream' )
6- const generate = require ( 'pull-generate' )
6+ const whilst = require ( 'async/whilst' )
7+ const setImmediate = require ( 'async/setImmediate' )
8+ const each = require ( 'async/each' )
9+ const debounce = require ( 'lodash.debounce' )
710
811const log = debug ( 'bitswap:engine' )
912log . error = debug ( 'bitswap:engine:error' )
@@ -26,37 +29,45 @@ module.exports = class Engine {
2629 this . peerRequestQueue = new PeerRequestQueue ( )
2730
2831 this . _running = false
32+
33+ this . _outbox = debounce ( this . _outboxExec . bind ( this ) , 100 )
2934 }
3035
3136 _sendBlock ( env , cb ) {
3237 const msg = new Message ( false )
33- msg . addBlock ( env . block )
34-
35- log ( 'Sending block to %s' , env . peer . toB58String ( ) , env . block . data . toString ( ) )
36-
37- this . network . sendMessage ( env . peer , msg , ( err ) => {
38+ msg . addBlock ( env . block , ( err ) => {
3839 if ( err ) {
39- log ( 'sendblock error: %s' , err . message )
40+ return cb ( err )
4041 }
41- cb ( null , 'done' )
42+
43+ log ( 'Sending block to %s' , env . peer . toB58String ( ) , env . block . data . toString ( ) )
44+
45+ this . network . sendMessage ( env . peer , msg , ( err ) => {
46+ if ( err ) {
47+ log ( 'sendblock error: %s' , err . message )
48+ }
49+ cb ( null , 'done' )
50+ } )
4251 } )
4352 }
4453
45- _outbox ( ) {
46- if ( ! this . _running ) return
54+ _outboxExec ( ) {
55+ let nextTask
56+ log ( 'outbox' )
4757
48- const doIt = ( cb ) => pull (
49- generate ( null , ( state , cb ) => {
50- log ( 'generating' , this . _running )
58+ whilst (
59+ ( ) => {
5160 if ( ! this . _running ) {
52- return cb ( true )
61+ return
5362 }
5463
55- const nextTask = this . peerRequestQueue . pop ( )
64+ nextTask = this . peerRequestQueue . pop ( )
65+ log ( 'check' , this . _running && nextTask )
66+ return Boolean ( nextTask )
67+ } ,
68+ ( next ) => {
69+ log ( 'generating' )
5670 log ( 'got task' , nextTask )
57- if ( ! nextTask ) {
58- return cb ( true )
59- }
6071
6172 pull (
6273 this . blockstore . getStream ( nextTask . entry . key ) ,
@@ -65,31 +76,20 @@ module.exports = class Engine {
6576 const block = blocks [ 0 ]
6677 if ( err || ! block ) {
6778 nextTask . done ( )
68- return cb ( null , false )
79+ return next ( )
6980 }
7081
71- cb ( null , {
82+ this . _sendBlock ( {
7283 peer : nextTask . target ,
7384 block : block ,
74- sent : ( ) => {
85+ sent ( ) {
7586 nextTask . done ( )
7687 }
77- } )
88+ } , next )
7889 } )
7990 )
80- } ) ,
81- pull . filter ( Boolean ) ,
82- pull . asyncMap ( this . _sendBlock . bind ( this ) ) ,
83- pull . onEnd ( cb )
91+ }
8492 )
85-
86- if ( ! this . _timer ) {
87- this . _timer = setTimeout ( ( ) => {
88- doIt ( ( ) => {
89- this . _timer = null
90- } )
91- } , 50 )
92- }
9393 }
9494
9595 wantlistForPeer ( peerId ) {
@@ -118,20 +118,25 @@ module.exports = class Engine {
118118 ledger . wantlist = new Wantlist ( )
119119 }
120120
121- this . _processBlocks ( msg . blocks , ledger )
122- log ( 'wantlist' , Array . from ( msg . wantlist . values ( ) ) . map ( ( e ) => e . toString ( ) ) )
123-
124- pull (
125- pull . values ( Array . from ( msg . wantlist . values ( ) ) ) ,
126- pull . asyncMap ( ( entry , cb ) => {
127- this . _processWantlist ( ledger , peerId , entry , cb )
128- } ) ,
129- pull . onEnd ( ( err ) => {
130- if ( err ) return cb ( err )
131- this . _outbox ( )
132- cb ( )
133- } )
134- )
121+ this . _processBlocks ( msg . blocks , ledger , ( err ) => {
122+ if ( err ) {
123+ log . error ( `failed to process blocks: ${ err . message } ` )
124+ }
125+
126+ log ( 'wantlist' , Array . from ( msg . wantlist . values ( ) ) . map ( ( e ) => e . toString ( ) ) )
127+
128+ pull (
129+ pull . values ( Array . from ( msg . wantlist . values ( ) ) ) ,
130+ pull . asyncMap ( ( entry , cb ) => {
131+ this . _processWantlist ( ledger , peerId , entry , cb )
132+ } ) ,
133+ pull . onEnd ( ( err ) => {
134+ if ( err ) return cb ( err )
135+ this . _outbox ( )
136+ cb ( )
137+ } )
138+ )
139+ } )
135140 }
136141
137142 receivedBlock ( key ) {
@@ -173,23 +178,36 @@ module.exports = class Engine {
173178 }
174179 }
175180
176- _processBlocks ( blocks , ledger ) {
177- for ( let block of blocks . values ( ) ) {
178- log ( 'got block %s (%s bytes)' , mh . toB58String ( block . key ) , block . data . length )
179- ledger . receivedBytes ( block . data . length )
181+ _processBlocks ( blocks , ledger , callback ) {
182+ each ( blocks . values ( ) , ( block , cb ) => {
183+ block . key ( ( err , key ) => {
184+ if ( err ) {
185+ return cb ( err )
186+ }
187+ log ( 'got block %s (%s bytes)' , mh . toB58String ( key ) , block . data . length )
188+ ledger . receivedBytes ( block . data . length )
180189
181- this . receivedBlock ( block . key )
182- }
190+ this . receivedBlock ( key )
191+ cb ( )
192+ } )
193+ } , callback )
183194 }
184195
185196 // Clear up all accounting things after message was sent
186- messageSent ( peerId , msg ) {
197+ messageSent ( peerId , msg , callback ) {
187198 const ledger = this . _findOrCreate ( peerId )
188- for ( let block of msg . blocks . values ( ) ) {
199+ each ( msg . blocks . values ( ) , ( block , cb ) => {
189200 ledger . sentBytes ( block . data . length )
190- ledger . wantlist . remove ( block . key )
191- this . peerRequestQueue . remove ( block . key , peerId )
192- }
201+ block . key ( ( err , key ) => {
202+ if ( err ) {
203+ return cb ( err )
204+ }
205+
206+ ledger . wantlist . remove ( key )
207+ this . peerRequestQueue . remove ( key , peerId )
208+ cb ( )
209+ } )
210+ } , callback )
193211 }
194212
195213 numBytesSentTo ( peerId ) {
0 commit comments