Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

TransportManager Refactoring to fix #300 #302

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ function pre (done) {
peerA.multiaddrs.add(maA)
switchA = new Switch(peerA, new PeerBook())

switchA.transport.add('ws', new WebSockets())
switchA.transport.listen('ws', {}, echo, cb)
switchA.transportManager.add('ws', new WebSockets())
switchA.transportManager.listen('ws', {}, echo, cb)
})
}

Expand All @@ -53,7 +53,7 @@ function pre (done) {
peerB.multiaddrs.add(maB)
switchB = new Switch(peerB, new PeerBook())

switchB.transport.add('ws', new WebSockets())
switchB.transportManager.add('ws', new WebSockets())
switchB.connection.addStreamMuxer(spdy)
switchB.connection.reuse()
switchB.handle('/echo/1.0.0', echo)
Expand All @@ -72,7 +72,7 @@ function pre (done) {

function post (done) {
parallel([
(cb) => switchA.transport.close('ws', cb),
(cb) => switchA.transportManager.close('ws', cb),
(cb) => switchB.stop(cb),
(cb) => sigS.stop(cb)
], done)
Expand Down
4 changes: 2 additions & 2 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class ConnectionFSM extends BaseConnection {

const tKeys = this.switch.availableTransports(this.theirPeerInfo)

const circuitEnabled = Boolean(this.switch.transports[Circuit.tag])
const circuitEnabled = Boolean(this.switch.transportManager.get(Circuit.tag))
let circuitTried = false

const nextTransport = (key) => {
Expand All @@ -221,7 +221,7 @@ class ConnectionFSM extends BaseConnection {
}

this.log(`dialing transport ${transport}`)
this.switch.transport.dial(transport, this.theirPeerInfo, (err, _conn) => {
this.switch.transportManager.dial(transport, this.theirPeerInfo, (err, _conn) => {
if (err) {
this.emit('error:connection_attempt_failed', err.errors || [err])
this.log(err)
Expand Down
2 changes: 1 addition & 1 deletion src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class ConnectionManager {
Object.assign(config, { hop: { enabled: false, active: false } })
}

this.switch.transport.add(Circuit.tag, new Circuit(this.switch, config))
this.switch.transportManager.add(Circuit.tag, new Circuit(this.switch, config))
}
}

Expand Down
19 changes: 9 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ class Switch extends EventEmitter {
this._options = options || {}

this.setMaxListeners(Infinity)
// transports --
// { key: transport }; e.g { tcp: <tcp> }
this.transports = {}

// connections --
// { peerIdB58: { conn: <conn> }}
Expand All @@ -58,7 +55,7 @@ class Switch extends EventEmitter {

this.protector = this._options.protector || null

this.transport = new TransportManager(this)
this.transportManager = new TransportManager(this)
this.connection = new ConnectionManager(this)

this.observer = Observer(this)
Expand Down Expand Up @@ -121,10 +118,11 @@ class Switch extends EventEmitter {
*/
availableTransports (peerInfo) {
const myAddrs = peerInfo.multiaddrs.toArray()
const myTransports = Object.keys(this.transports)
const myTransports = this.transportManager.getAll()

// Only listen on transports we actually have addresses for
return myTransports.filter((ts) => this.transports[ts].filter(myAddrs).length > 0)
return [...myTransports.keys()]
.filter((ts) => myTransports.get(ts).filter(myAddrs).length > 0)
// push Circuit to be the last proto to be dialed
.sort((a) => {
return a === 'Circuit' ? 1 : 0
Expand Down Expand Up @@ -184,7 +182,7 @@ class Switch extends EventEmitter {
* @returns {boolean}
*/
hasTransports () {
const transports = Object.keys(this.transports).filter((t) => t !== 'Circuit')
const transports = [...this.transportManager.getAll().keys()].filter((t) => t !== 'Circuit')
return transports && transports.length > 0
}

Expand Down Expand Up @@ -224,7 +222,7 @@ class Switch extends EventEmitter {
this.stats.start()
eachSeries(this.availableTransports(this._peerInfo), (ts, cb) => {
// Listen on the given transport
this.transport.listen(ts, {}, null, cb)
this.transportManager.listen(ts, {}, null, cb)
}, (err) => {
if (err) {
log.error(err)
Expand All @@ -244,8 +242,9 @@ class Switch extends EventEmitter {
this.stats.stop()
series([
(cb) => {
each(this.transports, (transport, cb) => {
each(transport.listeners, (listener, cb) => {
const myTransports = this.transportManager.getAll()
each(myTransports.keys(), (key, cb) => {
each(this.transportManager.getListeners(key), (listener, cb) => {
listener.close(cb)
}, cb)
}, cb)
Expand Down
78 changes: 62 additions & 16 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ class TransportManager {
constructor (_switch) {
this.switch = _switch
this.dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout)

// transports --
// Map <<key>, {
// transport: <transport>,
// listeners: [<Listener>, <Listener>, ...]
// }>
// e.g:
// Map { "tcp" => {
// transport: <tcp>,
// listeners: [<Listener>, <Listener>, ...]
// }
// }
this._transports = new Map()
}

/**
Expand All @@ -35,13 +48,50 @@ class TransportManager {
*/
add (key, transport) {
log('adding %s', key)
if (this.switch.transports[key]) {
if (this._transports.get(key)) {
throw new Error('There is already a transport with this key')
}

this.switch.transports[key] = transport
if (!this.switch.transports[key].listeners) {
this.switch.transports[key].listeners = []
this._transports.set(key, {
transport: transport,
listeners: []
})
}

/**
* Returns the `Transport` for the given key
*
* @param {String} key
* @returns {Transport}
*/
get (key) {
if (this._transports.has(key)) {
return this._transports.get(key).transport
} else {
return null
}
}

/**
* Returns a map of all `Transports` on the form { key: transport }; e.g { tcp: <tcp> }
*
* @returns {Map<String,Transport>}
*/
getAll () {
return new Map([...this._transports].map(([k, v]) => [k, v.transport]))
}

/**
* Returns an array of listeners for the `Transport` with the given key
*
* @param {String} key
* @returns {Array<listener>}
*/
getListeners (key) {
if (!this._transports.has(key)) {
return []
} else {
return this._transports.get(key).listeners
}
}

Expand All @@ -56,12 +106,12 @@ class TransportManager {
remove (key, callback) {
callback = callback || function () {}

if (!this.switch.transports[key]) {
if (!this._transports.get(key)) {
return callback()
}

this.close(key, (err) => {
delete this.switch.transports[key]
this._transports.delete(key)
callback(err)
})
}
Expand All @@ -73,7 +123,7 @@ class TransportManager {
* @returns {void}
*/
removeAll (callback) {
const tasks = Object.keys(this.switch.transports).map((key) => {
const tasks = [...this._transports.keys()].map((key) => {
return (cb) => {
this.remove(key, cb)
}
Expand All @@ -91,7 +141,7 @@ class TransportManager {
* @returns {void}
*/
dial (key, peerInfo, callback) {
const transport = this.switch.transports[key]
const transport = this._transports.get(key).transport
let multiaddrs = peerInfo.multiaddrs.toArray()

if (!Array.isArray(multiaddrs)) {
Expand Down Expand Up @@ -127,16 +177,12 @@ class TransportManager {
listen (key, _options, handler, callback) {
handler = this.switch._connectionHandler(key, handler)

const transport = this.switch.transports[key]
const transport = this._transports.get(key).transport
const multiaddrs = TransportManager.dialables(
transport,
this.switch._peerInfo.multiaddrs.distinct()
)

if (!transport.listeners) {
transport.listeners = []
}

let freshMultiaddrs = []

const createListeners = multiaddrs.map((ma) => {
Expand All @@ -155,7 +201,7 @@ class TransportManager {
return done(err)
}
freshMultiaddrs = freshMultiaddrs.concat(addrs)
transport.listeners.push(listener)
this._transports.get(key).listeners.push(listener)
done()
})
})
Expand All @@ -181,13 +227,13 @@ class TransportManager {
* @returns {void}
*/
close (key, callback) {
const transport = this.switch.transports[key]
const transport = this._transports.get(key)

if (!transport) {
return callback(new Error(`Trying to close non existing transport: ${key}`))
}

parallel(transport.listeners.map((listener) => {
parallel(this._transports.get(key).listeners.map((listener) => {
return (cb) => {
listener.close(cb)
}
Expand Down
28 changes: 14 additions & 14 deletions test/circuit-relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ describe(`circuit`, function () {
swarmB = new Swarm(peerB, new PeerBook())
swarmC = new Swarm(peerC, new PeerBook())

swarmA.transport.add('tcp', new TCP())
swarmA.transport.add('ws', new WS())
swarmB.transport.add('ws', new WS())
swarmA.transportManager.add('tcp', new TCP())
swarmA.transportManager.add('ws', new WS())
swarmB.transportManager.add('ws', new WS())

dialSpyA = sinon.spy(swarmA.transport, 'dial')
dialSpyA = sinon.spy(swarmA.transportManager, 'dial')

done()
}))
Expand All @@ -67,15 +67,15 @@ describe(`circuit`, function () {

it('.enableCircuitRelay', () => {
swarmA.connection.enableCircuitRelay({ enabled: true })
expect(Object.keys(swarmA.transports).length).to.equal(3)
expect(swarmA.transportManager.getAll().size).to.equal(3)

swarmB.connection.enableCircuitRelay({ enabled: true })
expect(Object.keys(swarmB.transports).length).to.equal(2)
expect(swarmB.transportManager.getAll().size).to.equal(2)
})

it('listed on the transports map', () => {
expect(swarmA.transports.Circuit).to.exist()
expect(swarmB.transports.Circuit).to.exist()
expect(swarmA.transportManager.get('Circuit')).to.exist()
expect(swarmB.transportManager.get('Circuit')).to.exist()
})

it('add /p2p-circuit addrs on start', (done) => {
Expand Down Expand Up @@ -222,12 +222,12 @@ describe(`circuit`, function () {
enabled: true
})

bootstrapSwitch.transport.add('tcp', new TCP())
bootstrapSwitch.transport.add('ws', new WS())
tcpSwitch1.transport.add('tcp', new TCP())
tcpSwitch2.transport.add('tcp', new TCP())
wsSwitch1.transport.add('ws', new WS())
wsSwitch2.transport.add('ws', new WS())
bootstrapSwitch.transportManager.add('tcp', new TCP())
bootstrapSwitch.transportManager.add('ws', new WS())
tcpSwitch1.transportManager.add('tcp', new TCP())
tcpSwitch2.transportManager.add('tcp', new TCP())
wsSwitch1.transportManager.add('ws', new WS())
wsSwitch2.transportManager.add('ws', new WS())

series([
// start the nodes
Expand Down
8 changes: 4 additions & 4 deletions test/connection.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ describe('ConnectionFSM', () => {
dialerSwitch._peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/15451/ws')
dialerSwitch.connection.crypto(secio.tag, secio.encrypt)
dialerSwitch.connection.addStreamMuxer(multiplex)
dialerSwitch.transport.add('ws', new WS())
dialerSwitch.transportManager.add('ws', new WS())

listenerSwitch = new Switch(infos.shift(), new PeerBook())
listenerSwitch._peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/15452/ws')
listenerSwitch.connection.crypto(secio.tag, secio.encrypt)
listenerSwitch.connection.addStreamMuxer(multiplex)
listenerSwitch.transport.add('ws', new WS())
listenerSwitch.transportManager.add('ws', new WS())

spdySwitch = new Switch(infos.shift(), new PeerBook())
spdySwitch._peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/15453/ws')
spdySwitch.connection.crypto(secio.tag, secio.encrypt)
spdySwitch.connection.addStreamMuxer(spdy)
spdySwitch.transport.add('ws', new WS())
spdySwitch.transportManager.add('ws', new WS())

parallel([
(cb) => dialerSwitch.start(cb),
Expand Down Expand Up @@ -132,7 +132,7 @@ describe('ConnectionFSM', () => {
peerInfo: listenerSwitch._peerInfo
})

const stub = sinon.stub(dialerSwitch.transport, 'dial').callsArgWith(2, {
const stub = sinon.stub(dialerSwitch.transportManager, 'dial').callsArgWith(2, {
errors: [
new Error('address in use')
]
Expand Down
12 changes: 6 additions & 6 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ describe('dialFSM', () => {
switchB = new Switch(peerB, new PeerBook())
switchC = new Switch(peerC, new PeerBook())

switchA.transport.add('tcp', new TCP())
switchB.transport.add('tcp', new TCP())
switchC.transport.add('ws', new WS())
switchA.transportManager.add('tcp', new TCP())
switchB.transportManager.add('tcp', new TCP())
switchC.transportManager.add('ws', new WS())

switchA.connection.crypto(secio.tag, secio.encrypt)
switchB.connection.crypto(secio.tag, secio.encrypt)
Expand All @@ -57,9 +57,9 @@ describe('dialFSM', () => {
switchC.connection.reuse()

parallel([
(cb) => switchA.transport.listen('tcp', {}, null, cb),
(cb) => switchB.transport.listen('tcp', {}, null, cb),
(cb) => switchC.transport.listen('ws', {}, null, cb)
(cb) => switchA.transportManager.listen('tcp', {}, null, cb),
(cb) => switchB.transportManager.listen('tcp', {}, null, cb),
(cb) => switchC.transportManager.listen('ws', {}, null, cb)
], done)
}))

Expand Down
2 changes: 1 addition & 1 deletion test/dialSelf.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ describe(`dial self`, () => {

swarmA = new Swarm(peerA, new PeerBook())

swarmA.transport.add('tcp', new MockTransport())
swarmA.transportManager.add('tcp', new MockTransport())

done()
}))
Expand Down
Loading