Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit b852460

Browse files
author
Alan Shaw
authored
refactor: convert preload to async/await (#2590)
...and removes `async` from dependencies! And half the code. License: MIT Signed-off-by: Alan Shaw <[email protected]>
1 parent cf1c3b5 commit b852460

12 files changed

+314
-600
lines changed

.aegir.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const IPFSFactory = require('ipfsd-ctl')
44
const parallel = require('async/parallel')
55
const MockPreloadNode = require('./test/utils/mock-preload-node')
66
const EchoServer = require('interface-ipfs-core/src/utils/echo-http-server')
7+
const callbackify = require('callbackify')
78

89
const ipfsdServer = IPFSFactory.createServer()
910
const preloadNode = MockPreloadNode.createNode()
@@ -30,13 +31,13 @@ module.exports = {
3031
node: {
3132
pre: (cb) => {
3233
parallel([
33-
(cb) => preloadNode.start(cb),
34+
(cb) => callbackify(preloadNode.start)(cb),
3435
(cb) => echoServer.start(cb)
3536
], cb)
3637
},
3738
post: (cb) => {
3839
parallel([
39-
(cb) => preloadNode.stop(cb),
40+
(cb) => callbackify(preloadNode.stop)(cb),
4041
(cb) => echoServer.stop(cb)
4142
], cb)
4243
}
@@ -48,7 +49,7 @@ module.exports = {
4849
ipfsdServer.start()
4950
cb()
5051
},
51-
(cb) => preloadNode.start(cb),
52+
(cb) => callbackify(preloadNode.start)(cb),
5253
(cb) => echoServer.start(cb)
5354
], cb)
5455
},
@@ -58,7 +59,7 @@ module.exports = {
5859
ipfsdServer.stop()
5960
cb()
6061
},
61-
(cb) => preloadNode.stop(cb),
62+
(cb) => callbackify(preloadNode.stop)(cb),
6263
(cb) => echoServer.stop(cb)
6364
], cb)
6465
}

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@
6464
"@hapi/boom": "^7.4.3",
6565
"@hapi/hapi": "^18.3.2",
6666
"@hapi/joi": "^15.0.0",
67+
"abort-controller": "^3.0.0",
6768
"array-shuffle": "^1.0.1",
68-
"async": "^2.6.1",
6969
"async-iterator-all": "^1.0.0",
7070
"async-iterator-first": "^1.0.0",
7171
"async-iterator-to-pull-stream": "^1.3.0",
@@ -146,7 +146,6 @@
146146
"libp2p-webrtc-star": "~0.16.0",
147147
"libp2p-websocket-star-multi": "~0.4.3",
148148
"libp2p-websockets": "~0.12.3",
149-
"lodash": "^4.17.15",
150149
"lodash.flatten": "^4.4.0",
151150
"mafmt": "^6.0.10",
152151
"merge-options": "^2.0.0",
@@ -196,6 +195,7 @@
196195
},
197196
"devDependencies": {
198197
"aegir": "^20.4.1",
198+
"async": "^2.6.3",
199199
"base64url": "^3.0.1",
200200
"clear-module": "^4.0.0",
201201
"delay": "^4.1.0",
@@ -208,6 +208,7 @@
208208
"ipfs-interop": "^0.1.1",
209209
"ipfsd-ctl": "^0.47.2",
210210
"libp2p-websocket-star": "~0.10.2",
211+
"lodash": "^4.17.15",
211212
"ncp": "^2.0.0",
212213
"p-event": "^4.1.0",
213214
"qs": "^6.5.2",

src/core/components/init-assets.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ module.exports = async function addDefaultAssets (self, log) {
88
const initDocsPath = path.join(__dirname, '../../init-files/init-docs')
99

1010
const results = await self.addFromFs(initDocsPath, {
11-
recursive: true
11+
recursive: true,
12+
preload: false
1213
})
1314

1415
const dir = results.filter(file => file.path === 'init-docs').pop()

src/core/components/init.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ async function addRepoAssets (self, privateKey, opts) {
116116
const cid = await self.dag.put(node, {
117117
version: 0,
118118
format: multicodec.DAG_PB,
119-
hashAlg: multicodec.SHA2_256
119+
hashAlg: multicodec.SHA2_256,
120+
preload: false
120121
})
121122

122123
await self._ipns.initializeKeyspace(privateKey, cid.toBaseEncodedString())

src/core/mfs-preload.js

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,20 @@ module.exports = (self) => {
1919
let rootCid
2020
let timeoutId
2121

22-
const preloadMfs = () => {
23-
self.files.stat('/')
24-
.then((stats) => {
25-
if (rootCid !== stats.hash) {
26-
log(`preloading updated MFS root ${rootCid} -> ${stats.hash}`)
27-
28-
return self._preload(stats.hash, (err) => {
29-
timeoutId = setTimeout(preloadMfs, options.interval)
30-
if (err) return log.error(`failed to preload MFS root ${stats.hash}`, err)
31-
rootCid = stats.hash
32-
})
33-
}
22+
const preloadMfs = async () => {
23+
try {
24+
const stats = await self.files.stat('/')
3425

35-
timeoutId = setTimeout(preloadMfs, options.interval)
36-
}, (err) => {
37-
timeoutId = setTimeout(preloadMfs, options.interval)
38-
log.error('failed to stat MFS root for preload', err)
39-
})
26+
if (rootCid !== stats.hash) {
27+
log(`preloading updated MFS root ${rootCid} -> ${stats.hash}`)
28+
await self._preload(stats.hash)
29+
rootCid = stats.hash
30+
}
31+
} catch (err) {
32+
log.error('failed to preload MFS root', err)
33+
} finally {
34+
timeoutId = setTimeout(preloadMfs, options.interval)
35+
}
4036
}
4137

4238
return {

src/core/preload.js

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,23 @@
11
'use strict'
22

3-
const setImmediate = require('async/setImmediate')
4-
const retry = require('async/retry')
53
const toUri = require('multiaddr-to-uri')
64
const debug = require('debug')
75
const CID = require('cids')
86
const shuffle = require('array-shuffle')
7+
const AbortController = require('abort-controller')
98
const preload = require('./runtime/preload-nodejs')
109

1110
const log = debug('ipfs:preload')
1211
log.error = debug('ipfs:preload:error')
1312

14-
const noop = (err) => { if (err) log.error(err) }
15-
1613
module.exports = self => {
1714
const options = self._options.preload || {}
1815
options.enabled = Boolean(options.enabled)
1916
options.addresses = options.addresses || []
2017

2118
if (!options.enabled || !options.addresses.length) {
2219
log('preload disabled')
23-
const api = (_, callback) => {
24-
if (callback) {
25-
setImmediate(() => callback())
26-
}
27-
}
20+
const api = () => {}
2821
api.start = () => {}
2922
api.stop = () => {}
3023
return api
@@ -34,41 +27,40 @@ module.exports = self => {
3427
let requests = []
3528
const apiUris = options.addresses.map(toUri)
3629

37-
const api = (path, callback) => {
38-
callback = callback || noop
30+
const api = async path => {
31+
try {
32+
if (stopped) throw new Error(`preload ${path} but preloader is not started`)
3933

40-
if (typeof path !== 'string') {
41-
try {
42-
path = new CID(path).toBaseEncodedString()
43-
} catch (err) {
44-
return setImmediate(() => callback(err))
34+
if (typeof path !== 'string') {
35+
path = new CID(path).toString()
4536
}
46-
}
47-
48-
const fallbackApiUris = shuffle(apiUris)
49-
let request
50-
const now = Date.now()
5137

52-
retry({ times: fallbackApiUris.length }, (cb) => {
53-
if (stopped) return cb(new Error(`preload aborted for ${path}`))
54-
55-
// Remove failed request from a previous attempt
56-
requests = requests.filter(r => r !== request)
57-
58-
const apiUri = fallbackApiUris.shift()
59-
60-
request = preload(`${apiUri}/api/v0/refs?r=true&arg=${encodeURIComponent(path)}`, cb)
61-
requests = requests.concat(request)
62-
}, (err) => {
63-
requests = requests.filter(r => r !== request)
64-
65-
if (err) {
66-
return callback(err)
38+
const fallbackApiUris = shuffle(apiUris)
39+
let success = false
40+
const now = Date.now()
41+
42+
for (const uri of fallbackApiUris) {
43+
if (stopped) throw new Error(`preload aborted for ${path}`)
44+
let controller
45+
46+
try {
47+
controller = new AbortController()
48+
requests = requests.concat(controller)
49+
await preload(`${uri}/api/v0/refs?r=true&arg=${encodeURIComponent(path)}`, { signal: controller.signal })
50+
success = true
51+
} catch (err) {
52+
if (err.type !== 'aborted') log.error(err)
53+
} finally {
54+
requests = requests.filter(r => r !== controller)
55+
}
56+
57+
if (success) break
6758
}
6859

69-
log(`preloaded ${path} in ${Date.now() - now}ms`)
70-
callback()
71-
})
60+
log(`${success ? '' : 'un'}successfully preloaded ${path} in ${Date.now() - now}ms`)
61+
} catch (err) {
62+
log.error(err)
63+
}
7264
}
7365

7466
api.start = () => {
@@ -77,8 +69,8 @@ module.exports = self => {
7769

7870
api.stop = () => {
7971
stopped = true
80-
log(`canceling ${requests.length} pending preload request(s)`)
81-
requests.forEach(r => r.cancel())
72+
log(`aborting ${requests.length} pending preload request(s)`)
73+
requests.forEach(r => r.abort())
8274
requests = []
8375
}
8476

src/core/runtime/preload-browser.js

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,22 @@ log.error = debug('ipfs:preload:error')
1212
// we don't want preload calls to exhaust the limit (~6)
1313
const httpQueue = new PQueue({ concurrency: 4 })
1414

15-
module.exports = function preload (url, callback) {
15+
module.exports = function preload (url, options) {
1616
log(url)
17+
options = options || {}
1718

18-
const controller = new AbortController()
19-
const signal = controller.signal
20-
const cb = () => setImmediate(callback) // https://github.com/ipfs/js-ipfs/pull/2304#discussion_r320700893
19+
return httpQueue.add(async () => {
20+
const res = await ky.get(url, { signal: options.signal })
21+
const reader = res.body.getReader()
2122

22-
httpQueue.add(() => ky.get(url, { signal })).then(cb, cb)
23-
24-
return {
25-
cancel: () => controller.abort()
26-
}
23+
try {
24+
while (true) {
25+
const { done } = await reader.read()
26+
if (done) return
27+
// Read to completion but do not cache
28+
}
29+
} finally {
30+
reader.releaseLock()
31+
}
32+
})
2733
}

src/core/runtime/preload-nodejs.js

Lines changed: 6 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,18 @@
11
'use strict'
22

3-
const http = require('http')
4-
const https = require('https')
5-
const { URL } = require('url')
3+
const { default: ky } = require('ky-universal')
64
const debug = require('debug')
7-
const setImmediate = require('async/setImmediate')
85

96
const log = debug('ipfs:preload')
107
log.error = debug('ipfs:preload:error')
118

12-
module.exports = function preload (url, callback = () => {}) {
9+
module.exports = async function preload (url, options) {
1310
log(url)
11+
options = options || {}
1412

15-
try {
16-
url = new URL(url)
17-
} catch (err) {
18-
return setImmediate(() => callback(err))
19-
}
20-
21-
const transport = url.protocol === 'https:' ? https : http
22-
23-
const req = transport.get({
24-
hostname: url.hostname,
25-
port: url.port,
26-
path: url.pathname + url.search
27-
}, (res) => {
28-
if (res.statusCode < 200 || res.statusCode >= 300) {
29-
res.resume()
30-
log.error('failed to preload', url.href, res.statusCode, res.statusMessage)
31-
return callback(new Error(`failed to preload ${url}`))
32-
}
33-
34-
res.on('data', chunk => log(`data ${chunk}`))
35-
36-
res.on('abort', () => {
37-
callback(new Error('request aborted'))
38-
})
39-
40-
res.on('error', err => {
41-
log.error('response error preloading', url.href, err)
42-
callback(err)
43-
})
44-
45-
res.on('end', () => {
46-
// If aborted, callback is called in the abort handler
47-
if (!res.aborted) callback()
48-
})
49-
})
50-
51-
req.on('error', err => {
52-
log.error('request error preloading', url.href, err)
53-
callback(err)
54-
})
13+
const res = await ky.get(url, { signal: options.signal })
5514

56-
return {
57-
cancel: () => {
58-
// No need to call callback here
59-
// before repsonse - called in req error handler
60-
// after response - called in res abort hander
61-
req.abort()
62-
}
15+
for await (const _ of res.body) { // eslint-disable-line no-unused-vars
16+
// Read to completion but do not cache
6317
}
6418
}

test/core/mfs-preload.spec.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ const createMockFilesStat = (cids = []) => {
1515
}
1616

1717
const createMockPreload = () => {
18-
const preload = (cid, cb = () => {}) => {
19-
preload.cids.push(cid)
20-
21-
cb()
22-
}
18+
const preload = cid => preload.cids.push(cid)
2319
preload.cids = []
2420
return preload
2521
}

0 commit comments

Comments
 (0)