Skip to content

Commit 4a484f5

Browse files
Merge pull request #19 from chris-smith/singleTcprosServer
Centralize TCPROS Server
2 parents 3569fe2 + 1ef3c84 commit 4a484f5

File tree

4 files changed

+202
-201
lines changed

4 files changed

+202
-201
lines changed

lib/Publisher.js

Lines changed: 49 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,13 @@ class Publisher extends EventEmitter {
7575

7676
this._subClients = {};
7777

78-
this._port = 0;
79-
80-
this._server = null;
81-
8278
this._messageHandler = options.typeClass;
8379

8480
// messages published before this publisher
8581
// was registered will be held here
8682
this._msgQueue = [];
8783

88-
this._setupTcp()
89-
.then(() => {
90-
this._register();
91-
});
84+
this._register();
9285
};
9386

9487
getTopic() {
@@ -213,110 +206,60 @@ class Publisher extends EventEmitter {
213206
this._msgQueue = [];
214207
}
215208

216-
_setupTcp() {
217-
// recursively tries to setup server on open port
218-
// calls callback when setup is done
219-
let _createServer = (callback) => {
220-
NetworkUtils.getFreePort()
221-
.then((port) => {
222-
let server = net.createServer((subscriber) => {
223-
let subName = subscriber.remoteAddress + ":"
224-
+ subscriber.remotePort;
225-
subscriber.name = subName;
226-
this._log.debug('Publisher ' + this.getTopic()
227-
+ ' got connection from ' + subName);
228-
229-
// subscriber will send us tcpros handshake before we can
230-
// start publishing to it.
231-
subscriber.$handshake =
232-
this._handleHandshake.bind(this, subscriber);
233-
234-
// handshake will be TCPROS encoded, so use a DeserializeStream to
235-
// handle any chunking
236-
let deserializeStream = new DeserializeStream();
237-
subscriber.pipe(deserializeStream);
238-
deserializeStream.on('message', subscriber.$handshake);
239-
240-
// if this publisher had the tcpNoDelay option set
241-
// disable the nagle algorithm
242-
if (this._tcpNoDelay) {
243-
subscriber.setNoDelay(true);
244-
}
209+
handleSubscriberConnection(subscriber, deserializeStream, header) {
210+
let error = TcprosUtils.validateSubHeader(
211+
header, this.getTopic(), this.getType(),
212+
this._messageHandler.md5sum());
213+
if (error !== null) {
214+
this._log.error('Unable to validate connection header '
215+
+ JSON.stringify(header));
216+
subscriber.end(Serialize(error));
217+
return;
218+
}
219+
// else
220+
this._log.debug('Pub %s got connection header %s', this.getTopic(), JSON.stringify(header));
221+
222+
// create and send response
223+
let respHeader =
224+
TcprosUtils.createPubHeader(
225+
this._nodeHandle.getNodeName(),
226+
this._messageHandler.md5sum(),
227+
this.getType(),
228+
this.getLatching());
229+
subscriber.write(respHeader);
230+
231+
// remove connections to deserializeStream - it's unnecessary now
232+
deserializeStream.removeAllListeners();
233+
234+
// if this publisher had the tcpNoDelay option set
235+
// disable the nagle algorithm
236+
if (this._tcpNoDelay) {
237+
subscriber.setNoDelay(true);
238+
}
245239

246-
subscriber.on('close', () => {
247-
this._log.info('Publisher ' + this.getTopic() + ' client '
248-
+ subscriber.name + ' disconnected!');
249-
delete this._subClients[subscriber.name];
250-
});
251-
252-
subscriber.on('end', () => {
253-
this._log.info('Sub %s sent END', subscriber.name);
254-
});
255-
256-
subscriber.on('error', () => {
257-
this._log.info('Sub %s had error', subscriber.name);
258-
});
259-
}).listen(port);
260-
261-
// it's possible the port was taken before we could use it
262-
server.on('error', (err) => {
263-
if (err.code === 'EADDRINUSE') {
264-
_createServer(callback);
265-
}
266-
});
240+
subscriber.on('close', () => {
241+
this._log.info('Publisher %s' + this.getTopic() + ' client '
242+
+ subscriber.name + ' disconnected!');
243+
delete this._subClients[subscriber.name];
244+
});
267245

268-
// the port was available
269-
server.on('listening', () => {
270-
this._log.debug('Listening on port ' + port);
271-
this._port = port;
272-
this._server = server;
273-
callback(port);
274-
});
275-
});
276-
};
246+
subscriber.on('end', () => {
247+
this._log.info('Sub %s sent END', subscriber.name);
248+
});
277249

278-
return new Promise((resolve, reject) => {
279-
_createServer(resolve);
250+
subscriber.on('error', () => {
251+
this._log.warn('Sub %s had error', subscriber.name);
280252
});
281-
}
282253

283-
_handleHandshake(subscriber, data) {
284-
if (!subscriber.$initialized) {
285-
let header = TcprosUtils.parseSubHeader(data);
286-
let valid = TcprosUtils.validateSubHeader(
287-
header, this.getTopic(), this.getType(),
288-
this._messageHandler.md5sum());
289-
if (valid !== null) {
290-
this._log.error('Unable to validate connection header '
291-
+ JSON.stringify(header));
292-
subscriber.write(Serialize(valid));
293-
return;
294-
}
295-
this._log.debug('Pub ' + this.getTopic()
296-
+ ' got connection header ' + JSON.stringify(header));
297-
298-
let respHeader =
299-
TcprosUtils.createPubHeader(
300-
this._nodeHandle.getNodeName(),
301-
this._messageHandler.md5sum(),
302-
this.getType(),
303-
this.getLatching());
304-
subscriber.write(respHeader);
305-
306-
if (this._lastSentMsg !== null) {
307-
this._log.debug('Sending latched msg to new subscriber');
308-
subscriber.write(this._lastSentMsg);
309-
}
254+
if (this._lastSentMsg !== null) {
255+
this._log.debug('Sending latched msg to new subscriber');
256+
subscriber.write(this._lastSentMsg);
257+
}
310258

311-
// if handshake good, add to list, we'll start publishing to it
312-
this._subClients[subscriber.name] = subscriber;
259+
// if handshake good, add to list, we'll start publishing to it
260+
this._subClients[subscriber.name] = subscriber;
313261

314-
this.emit('connection', subscriber.name);
315-
}
316-
else {
317-
this._log.error(
318-
'Got message from subscriber after handshake - what gives!!');
319-
}
262+
this.emit('connection', header, subscriber.name);
320263
}
321264

322265
_register() {
@@ -332,14 +275,10 @@ class Publisher extends EventEmitter {
332275
}
333276
})
334277
.catch((err, resp) => {
335-
this._log.error('reg pub err ' + err + ' resp: '
278+
this._log.error('reg pub err ' + err + ' resp: '
336279
+ JSON.stringify(resp));
337280
})
338281
}
339-
340-
getSubPort() {
341-
return this._port;
342-
}
343282
};
344283

345284
module.exports = Publisher;

lib/RosNode.js

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ let ServiceClient = require('./ServiceClient.js');
2828
let ServiceServer = require('./ServiceServer.js');
2929
let NetworkUtils = require('../utils/network_utils.js');
3030
let messageUtils = require('../utils/message_utils.js');
31+
let tcprosUtils = require('../utils/tcpros_utils.js');
32+
let SerializationUtils = require('../utils/serialization_utils.js');
33+
let DeserializeStream = SerializationUtils.DeserializeStream;
34+
let Deserialize = SerializationUtils.Deserialize;
35+
let Serialize = SerializationUtils.Serialize;
3136
let EventEmitter = require('events');
3237
let logger = require('../utils/logger.js');
3338

@@ -43,8 +48,12 @@ class RosNode extends EventEmitter {
4348

4449
this._log = logger.createLogger({name: nodeName});
4550

51+
this._slaveApiServer = null;
4652
this._xmlrpcPort = null;
4753

54+
this._tcprosServer = null;
55+
this._tcprosPort = null;
56+
4857
this._nodeName = nodeName;
4958

5059
this._rosMasterAddress = rosMaster;
@@ -60,7 +69,8 @@ class RosNode extends EventEmitter {
6069

6170
this._services = {};
6271

63-
this._setupSlaveApi();
72+
this._setupTcprosServer()
73+
.then(this._setupSlaveApi.bind(this));
6474

6575
this._setupExitHandler();
6676
}
@@ -145,12 +155,12 @@ class RosNode extends EventEmitter {
145155
// Master API
146156
//------------------------------------------------------------------
147157

148-
registerService(service, serviceUri) {
149-
return this._masterApi.registerService(this._nodeName, service, serviceUri, this._getXmlrpcUri());
158+
registerService(service) {
159+
return this._masterApi.registerService(this._nodeName, service, NetworkUtils.formatServiceUri(this._tcprosPort), this._getXmlrpcUri());
150160
}
151161

152-
unregisterService(service, serviceUri) {
153-
return this._masterApi.unregisterService(this._nodeName, service, serviceUri);
162+
unregisterService(service) {
163+
return this._masterApi.unregisterService(this._nodeName, service, NetworkUtils.formatServiceUri(this._tcprosPort));
154164
}
155165

156166
registerSubscriber(topic, topicType) {
@@ -270,13 +280,81 @@ class RosNode extends EventEmitter {
270280
});
271281
}
272282

283+
_setupTcprosServer() {
284+
// recursively tries to setup server on open port
285+
// calls callback when setup is done
286+
let _createServer = (callback) => {
287+
NetworkUtils.getFreePort()
288+
.then((port) => {
289+
let server = net.createServer((connection) => {
290+
let conName = connection.remoteAddress + ":"
291+
+ connection.remotePort;
292+
connection.name = conName;
293+
this._log.debug('Node %s got connection from %s', this.getNodeName(), conName);
294+
295+
// data from connections will be TCPROS encoded, so use a
296+
// DeserializeStream to handle any chunking
297+
let deserializeStream = new DeserializeStream();
298+
connection.pipe(deserializeStream);
299+
300+
const checkConnectionHeader = (headerData) => {
301+
const header = tcprosUtils.parseTcpRosHeader(headerData);
302+
if (!header) {
303+
this._log.error('Unable to validate connection header %s', headerData);
304+
connection.end(Serialize(String('Unable to validate connection header')));
305+
return;
306+
}
307+
this._log.debug('Got connection header: %j', header);
308+
309+
if (header.hasOwnProperty('topic')) {
310+
// this is a subscriber, validate header and pass off connection to appropriate publisher
311+
const topic = header.topic;
312+
const pub = this._publishers[topic];
313+
if (pub) {
314+
pub.handleSubscriberConnection(connection, deserializeStream, header);
315+
}
316+
}
317+
else if (header.hasOwnProperty('service')) {
318+
// this is a service client, validate header and pass off connection to appropriate service provider
319+
const service = header.service;
320+
const serviceProvider = this._services[service];
321+
if (serviceProvider) {
322+
serviceProvider.handleClientConnection(connection, deserializeStream, header);
323+
}
324+
}
325+
};
326+
deserializeStream.on('message', checkConnectionHeader);
327+
}).listen(port);
328+
329+
// it's possible the port was taken before we could use it
330+
server.on('error', (err) => {
331+
if (err.code === 'EADDRINUSE') {
332+
_createServer(callback);
333+
}
334+
});
335+
336+
// the port was available
337+
server.on('listening', () => {
338+
this._log.debug('Listening on port ' + port);
339+
this._tcprosPort = port;
340+
this._server = server;
341+
callback(port);
342+
});
343+
});
344+
};
345+
346+
return new Promise((resolve, reject) => {
347+
_createServer(resolve);
348+
});
349+
}
350+
273351
_handleTopicRequest(err, params, callback) {
274352
this._log.debug('Got topic request ' + JSON.stringify(params));
275353
if (!err) {
276354
let topic = params[1];
277355
let pub = this._publishers[topic];
278356
if (pub) {
279-
let port = pub.getSubPort();
357+
let port = this._tcprosPort;
280358
let resp = [
281359
1,
282360
'Allocated topic connection on port ' + port,
@@ -416,7 +494,7 @@ class RosNode extends EventEmitter {
416494

417495
Object.keys(this._services).forEach((service) => {
418496
let serv = this._services[service];
419-
promises.push(this.unregisterService(service, serv.getServiceUri()));
497+
promises.push(this.unregisterService(service));
420498
});
421499

422500
if (!fromExit) {

0 commit comments

Comments
 (0)