diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..c078e2b --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Dart", + "program": "bin/main.dart", + "request": "launch", + "type": "dart" + } + ] +} \ No newline at end of file diff --git a/lib/impl/plugin_vm.dart b/lib/impl/plugin_vm.dart index cb8e932..7967758 100644 --- a/lib/impl/plugin_vm.dart +++ b/lib/impl/plugin_vm.dart @@ -4,40 +4,51 @@ library stomp_impl_plugin_vm; import "dart:async"; -import "dart:io"; - -import "plugin.dart" show BytesStompConnector; +import 'package:web_socket_channel/io.dart'; +import "plugin.dart" show StringStompConnector; +import 'package:web_socket_channel/status.dart' as status; /** The implementation on top of [Socket]. */ -class SocketStompConnector extends BytesStompConnector { - final Socket _socket; +class SocketStompConnector extends StringStompConnector { + final IOWebSocketChannel _socket; + StreamSubscription _listen; SocketStompConnector(this._socket) { _init(); } void _init() { - _socket.listen((List data) { - if (data != null && !data.isEmpty) - onBytes(data); - }, onError: (error, stackTrace) { - onError(error, stackTrace); - }, onDone: () { + _listen = _socket.stream.listen((data) { + if (data != null) { + final String sdata = data.toString(); + if (sdata.isNotEmpty) onString(sdata); + } + }); + + _listen.onError((err) => onError(err, null)); + _listen.onDone(() => onClose()); + + _socket.stream.handleError((error) => onError(error, null)); + + _socket.sink.done.then((v) { onClose(); }); } @override Future close() { - _socket.destroy(); + _listen.cancel(); + _socket.sink.close(status.goingAway); return new Future.value(); } + + @override + Future writeStream_(Stream> stream) + => _socket.sink.addStream(stream); @override - void writeBytes_(List bytes) { - _socket.add(bytes); + void writeString_(String string) { + _socket.sink.add(string); + // TODO: implement writeString_ } - @override - Future writeStream_(Stream> stream) - => _socket.addStream(stream); } diff --git a/lib/src/impl/util_write.dart b/lib/src/impl/util_write.dart index 12da1ad..e57bf5f 100644 --- a/lib/src/impl/util_write.dart +++ b/lib/src/impl/util_write.dart @@ -5,7 +5,7 @@ part of stomp_impl_util; //Commands// const String CONNECT = "CONNECT"; -const String STOMP = "STOMP"; +const String STOMP = "CONNECT"; const String CONNECTED = "CONNECTED"; const String DISCONNECT = "DISCONNECT"; const String SEND = "SEND"; @@ -57,7 +57,6 @@ void writeDataFrame(StompConnector connector, String command, Map headers, String string, [List bytes]) { writeHeaders(connector, command, headers, endOfHeaders: false); - if (headers == null || headers[CONTENT_LENGTH] == null) { int len = 0; if (bytes != null) { @@ -78,6 +77,10 @@ void writeDataFrame(StompConnector connector, String command, connector.writeEof(); } +void pongMessage(StompConnector connector) { + writeSimpleFrame(connector,SEND,null); +} + ///Write a frame from the given stream Future writeStreamFrame(StompConnector connector, String command, Map headers, Stream> stream) { diff --git a/lib/src/stomp_impl.dart b/lib/src/stomp_impl.dart index 93149df..68ee9b3 100644 --- a/lib/src/stomp_impl.dart +++ b/lib/src/stomp_impl.dart @@ -54,7 +54,7 @@ class _StompClient implements StompClient { final _DisconnectCallback _onDisconnect; final _ErrorCallback _onError; final _FaultCallback _onFault; - + DateTime lastMessageDate = new DateTime.now(); /// final Map _subscribers = new HashMap(); @@ -81,6 +81,7 @@ class _StompClient implements StompClient { static Future connect( StompConnector connector, String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -105,6 +106,7 @@ class _StompClient implements StompClient { } else { client.heartbeat[0] = client.heartbeat[1] = 0; } + if(customHeaders != null) headers.addAll(customHeaders); writeSimpleFrame(connector, STOMP, headers); return client._connecting.future; @@ -127,9 +129,11 @@ class _StompClient implements StompClient { _connector ..onBytes = (List data) { + lastMessageDate = DateTime.now(); _parser.addBytes(data); } ..onString = (String data) { + lastMessageDate = DateTime.now(); _parser.addString(data); } ..onError = (error, stackTrace) { @@ -139,6 +143,7 @@ class _StompClient implements StompClient { _disconnected = true; _subscribers.clear(); _receipts.clear(); + cleanTimers(); if (_onDisconnect != null) _onDisconnect(this); }; } @@ -155,7 +160,6 @@ class _StompClient implements StompClient { Future disconnect({String receipt}) { _checkSend(); _disconnected = true; - Completer completer; Map headers; diff --git a/lib/src/stomp_util.dart b/lib/src/stomp_util.dart index 747780f..67c6380 100644 --- a/lib/src/stomp_util.dart +++ b/lib/src/stomp_util.dart @@ -5,6 +5,7 @@ part of stomp; const int _SUB_BYTES = 0, _SUB_STRING = 1, _SUB_JSON = 2, _SUB_BLOB = 3; +Timer outgoingTimer,incomingTimer; ///The information of a subscriber class _Subscriber { final String id; @@ -79,10 +80,36 @@ void _handleHeartbeat(_StompClient client, String heartbeat) { sy = int.parse(heartbeat.substring(i + 1)); client.heartbeat[0] = _calcHeartbeat(client.heartbeat[0], sy); client.heartbeat[1] = _calcHeartbeat(client.heartbeat[1], sx); + final int ttlOutgoing = client.heartbeat[0]; + if (ttlOutgoing != 0) { + outgoingTimer = Timer.periodic(new Duration(milliseconds: ttlOutgoing), (_) { + if (!client.isDisconnected) { + print("pong"); + pongMessage(client._connector); + } + }); + } + final int ttlIncoming = client.heartbeat[1]; + if (ttlIncoming != 0) { + incomingTimer = Timer.periodic(new Duration(milliseconds: ttlIncoming), (_) { + int delta = new DateTime.now() + .difference(client.lastMessageDate) + .inMilliseconds; + if (delta > (ttlIncoming * 2)) { + client.disconnect(); + } + }); + } } catch (ex) { // ignore silently } } } +cleanTimers(){ + if(outgoingTimer != null && outgoingTimer.isActive) + outgoingTimer.cancel(); + if(incomingTimer != null && incomingTimer.isActive) + incomingTimer.cancel(); +} int _calcHeartbeat(int a, int b) => a == 0 || b == 0 ? 0 : max(a, b); diff --git a/lib/stomp.dart b/lib/stomp.dart index 1e55d93..91722c7 100644 --- a/lib/stomp.dart +++ b/lib/stomp.dart @@ -79,6 +79,7 @@ abstract class StompClient { */ static Future connect(StompConnector connector, {String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -91,7 +92,7 @@ abstract class StompClient { throw new ArgumentError( "Required: connector. Use stomp_vm's connect() instead."); - return _StompClient.connect(connector, host, login, passcode, heartbeat, + return _StompClient.connect(connector, host, customHeaders,login, passcode, heartbeat, onConnect, onDisconnect, onError, onFault); } diff --git a/lib/vm.dart b/lib/vm.dart index 5a6b99c..f4aa20d 100644 --- a/lib/vm.dart +++ b/lib/vm.dart @@ -4,9 +4,8 @@ library stomp_vm; import "dart:async"; -import "dart:io"; - import "stomp.dart" show StompClient; +import 'package:web_socket_channel/io.dart'; import "impl/plugin_vm.dart" show SocketStompConnector; /** Connects a STOMP server, and instantiates a [StompClient] @@ -32,13 +31,34 @@ import "impl/plugin_vm.dart" show SocketStompConnector; * * [onFault] -- callback when an exception is received. */ Future connect(address, {int port: 61626, - String host, String login, String passcode, List heartbeat, + String host,Map customHeaders, String login, String passcode, List heartbeat, void onConnect(StompClient client, Map headers), void onDisconnect(StompClient client), void onError(StompClient client, String message, String detail, Map headers), void onFault(StompClient client, error, stackTrace)}) -=> Socket.connect(address, port).then((Socket socket) - => StompClient.connect(new SocketStompConnector(socket), - host: host, login: login, passcode: passcode, heartbeat: heartbeat, - onConnect: onConnect, onDisconnect: onDisconnect, - onError: onError, onFault: onFault)); +async => connectWith(await IOWebSocketChannel.connect(address), + host: host, + customHeaders: customHeaders, + login: login, + passcode: passcode, + heartbeat: heartbeat, + onConnect: onConnect, + onDisconnect: onDisconnect, + onError: onError, + onFault: onFault); + +Future connectWith(IOWebSocketChannel channel, + {String host, + Map customHeaders, + String login, + String passcode, + List heartbeat, + void onConnect(StompClient client, Map headers), + void onDisconnect(StompClient client), + void onError(StompClient client, String message, String detail, + Map headers), + void onFault(StompClient client, error, stackTrace)})=> + StompClient.connect(new SocketStompConnector(channel), + host: host,customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat, + onConnect: onConnect, onDisconnect: onDisconnect, + onError: onError, onFault: onFault); diff --git a/lib/websocket.dart b/lib/websocket.dart index 60c99f0..843af63 100644 --- a/lib/websocket.dart +++ b/lib/websocket.dart @@ -31,6 +31,7 @@ import "impl/plugin.dart" show StringStompConnector; */ Future connect(String url, {String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -41,6 +42,7 @@ Future connect(String url, void onFault(StompClient client, error, stackTrace)}) => connectWith(new WebSocket(url), host: host, + customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat, @@ -56,6 +58,7 @@ Future connect(String url, */ Future connectWith(WebSocket socket, {String host, + Map customHeaders, String login, String passcode, List heartbeat, @@ -67,6 +70,7 @@ Future connectWith(WebSocket socket, _WSStompConnector.startWith(socket).then((_WSStompConnector connector) => StompClient.connect(connector, host: host, + customHeaders: customHeaders, login: login, passcode: passcode, heartbeat: heartbeat, diff --git a/test/_echo_test.dart b/test/_echo_test.dart index 623e879..baf0806 100644 --- a/test/_echo_test.dart +++ b/test/_echo_test.dart @@ -6,16 +6,20 @@ part of echo_test; /** It is part of both echo_vm_test.dart and echo_ws_test.dart * so we can test it on both VM and browser. */ -Future testEcho(address) -=> connect(address, onDisconnect: (_) { - print("Disconnected"); +Future testEcho({address,headers,heartbeat}) +=> connect(address,customHeaders: headers,heartbeat: heartbeat, +onConnect: ( client, Map headers){ +} , onDisconnect: (_) { + print("disconnect"); +},onError: ( client, String message, String detail, Map headers){ + }).then((client) { - test("echo test", () { + test("echo test", () { final String destination = "/foo"; final List sends = ["1. apple", "2. orange\nand 2nd line", "3. mango"]; final List sendExtraHeader = ["123", "abc:", "xyz"]; final List receives = [], receiveExtraHeader = []; - +/* client.subscribeString("0", destination, (headers, message) { //print("< customHeaders = new LinkedHashMap(); + customHeaders["userid"]="D7t7G8989y3"; + customHeaders["platform"]="mobile"; + testEcho(address: address,headers: customHeaders,heartbeat: [10000,10000]) + .catchError((ex) { + print("Unable to connect $address\n" + "Check if the server has been started\n\nCause:\n$ex"); + }, test: (ex) => ex is SocketException); +}