diff --git a/.gitignore b/.gitignore index ff20996..41607a0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ *.pyc /*.egg-info/ /dist/ +.cache +.idea +venv +venv* diff --git a/.travis.yml b/.travis.yml index 04b6215..887c04d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,19 +1,18 @@ language: python python: - - "2.6" +# Twisted not supported in Python 2.6. +# Even with Twisted 15.4, there are issues with autobahn. +# - "2.6" - "2.7" # Disable testing 3.2 since autobahn uses unicode literal syntax that wasn't # re-added into 3.3. See PEP 414 # - "3.2" - "3.3" - "3.4" + - "3.5" + - "3.5-dev" # 3.5 development branch + - "nightly" # currently points to 3.6-dev install: - - pip install . - # Our test server uses autobahn - - pip install autobahn - # For 2.x, our server needs twisted - - pip install twisted - # For 3.x where x < 4, our server needs trollius - - pip install trollius -script: python tests/test_pusherclient.py + - pip install -r requirements_tests.txt +script: nosetests diff --git a/MANIFEST.in b/MANIFEST.in index 04f196a..4542392 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ +include README.rst include README.md include LICENSE diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..ff8c5b7 --- /dev/null +++ b/README.rst @@ -0,0 +1,75 @@ +|Build Status| + +pusherclient +============ + +pusherclient is a python module for handling pusher websockets + +Installation +------------ + +Simply run "python setup.py install". + +This module depends on websocket-client module available from: +http://github.com/liris/websocket-client + +Example +------- + +Example of using this pusher client to consume websockets:: + +:: + + import pusherclient + + # Add a logging handler so we can see the raw communication data + import logging + root = logging.getLogger() + root.setLevel(logging.INFO) + ch = logging.StreamHandler(sys.stdout) + root.addHandler(ch) + + global pusher + + # We can't subscribe until we've connected, so we use a callback handler + # to subscribe when able + def connect_handler(data): + channel = pusher.subscribe('mychannel') + channel.bind('myevent', callback) + + pusher = pusherclient.Pusher(appkey) + pusher.connection.bind('pusher:connection_established', connect_handler) + pusher.connect() + + while True: + # Do other things in the meantime here... + time.sleep(1) + +Sending pusher events to a channel can be done simply using the pusher +client supplied by pusher. You can get it here: +http://github.com/newbamboo/pusher_client_python + +:: + + import pusher + pusher.app_id = app_id + pusher.key = appkey + + p = pusher.Pusher() + p['mychannel'].trigger('myevent', 'mydata') + +Thanks +------ + +Built using the websocket-client module from +http://github.com/liris/websocket-client. The ruby gem by Logan Koester +which provides a similar service was also very helpful for a reference. +Take a look at it here: http://github.com/logankoester/pusher-client. + +Copyright +--------- + +MTI License - See LICENSE for details. + +.. |Build Status| image:: https://travis-ci.org/ekulyk/PythonPusherClient.svg?branch=master + :target: https://travis-ci.org/ekulyk/PythonPusherClient diff --git a/pusherclient/connection.py b/pusherclient/connection.py index 75fe53f..3478d1a 100755 --- a/pusherclient/connection.py +++ b/pusherclient/connection.py @@ -4,6 +4,7 @@ import time try: + # noinspection PyPackageRequirements import simplejson as json except ImportError: import json @@ -36,10 +37,14 @@ def __init__(self, event_handler, url, log_level=logging.INFO, daemon=True, reco self.state = "initialized" - self.logger = logging.getLogger(self.__module__) # create a new logger + self.root_logger = logging.getLogger(self.__module__) # create a new logger + self.connection_logger = logging.getLogger('{}.connection'.format(self.__module__)) + self.events_logger = logging.getLogger('{}.events'.format(self.__module__)) if log_level == logging.DEBUG: websocket.enableTrace(True) - self.logger.setLevel(log_level) + + if log_level is not None: + self.root_logger.setLevel(log_level) # From Martyn's comment at: # https://pusher.tenderapp.com/discussions/problems/36-no-messages-received-after-1-idle-minute-heartbeat @@ -84,7 +89,7 @@ def reconnect(self, reconnect_interval=None): if reconnect_interval is None: reconnect_interval = self.default_reconnect_interval - self.logger.info("Connection: Reconnect in %s" % reconnect_interval) + self.connection_logger.info("Connection: Reconnect in %s" % reconnect_interval) self.reconnect_interval = reconnect_interval self.needs_reconnect = True @@ -108,8 +113,9 @@ def _connect(self): self.socket.run_forever() while self.needs_reconnect and not self.disconnect_called: - self.logger.info("Attempting to connect again in %s seconds." - % self.reconnect_interval) + self.connection_logger.info( + "Attempting to connect again in %s seconds.", + self.reconnect_interval) self.state = "unavailable" time.sleep(self.reconnect_interval) @@ -118,21 +124,21 @@ def _connect(self): self.socket.keep_running = True self.socket.run_forever() - def _on_open(self, ws): - self.logger.info("Connection: Connection opened") + def _on_open(self, _): + self.connection_logger.info("Connection: Connection opened") # Send a ping right away to inform that the connection is alive. If you # don't do this, it takes the ping interval to subcribe to channel and # events self.send_ping() self._start_timers() - def _on_error(self, ws, error): - self.logger.info("Connection: Error - %s" % error) + def _on_error(self, _, error): + self.connection_logger.error("Connection: Error - %s" % error) self.state = "failed" self.needs_reconnect = True - def _on_message(self, ws, message): - self.logger.info("Connection: Message - %s" % message) + def _on_message(self, _, message): + self.connection_logger.info("Connection: Message - %s" % message) # Stop our timeout timer, since we got some data self._stop_timers() @@ -144,12 +150,13 @@ def _on_message(self, ws, message): # We've got a connection event. Lets handle it. if params['event'] in self.event_callbacks.keys(): for callback in self.event_callbacks[params['event']]: + # noinspection PyBroadException try: callback(params['data']) - except Exception: - self.logger.exception("Callback raised unhandled") + except: + self.events_logger.exception("Callback raised unhandled") else: - self.logger.info("Connection: Unhandled event") + self.events_logger.warning("Connection: Unhandled event") else: # We've got a channel event. Lets pass it up to the pusher # so it can be handled by the appropriate channel. @@ -162,8 +169,8 @@ def _on_message(self, ws, message): # We've handled our data, so restart our connection timeout handler self._start_timers() - def _on_close(self, ws, *args): - self.logger.info("Connection: Connection closed") + def _on_close(self, *_): + self.connection_logger.info("Connection: Connection closed") self.state = "disconnected" self._stop_timers() @@ -195,27 +202,28 @@ def send_event(self, event_name, data, channel_name=None): if channel_name: event['channel'] = channel_name - self.logger.info("Connection: Sending event - %s" % event) + self.events_logger.info("Connection: Sending event - %s" % event) try: self.socket.send(json.dumps(event)) except Exception as e: - self.logger.error("Failed send event: %s" % e) + self.events_logger.error("Failed send event: %s" % e) def send_ping(self): - self.logger.info("Connection: ping to pusher") + self.connection_logger.info("Connection: ping to pusher") try: self.socket.send(json.dumps({'event': 'pusher:ping', 'data': ''})) except Exception as e: - self.logger.error("Failed send ping: %s" % e) + self.connection_logger.error("Failed send ping: %s" % e) self.pong_timer = Timer(self.pong_timeout, self._check_pong) self.pong_timer.start() def send_pong(self): - self.logger.info("Connection: pong to pusher") + self.connection_logger.info("Connection: pong to pusher") + # noinspection PyBroadException try: self.socket.send(json.dumps({'event': 'pusher:pong', 'data': ''})) - except Exception as e: - self.logger.error("Failed send pong: %s" % e) + except: + self.connection_logger.exception("Failed send pong") def _check_pong(self): self.pong_timer.cancel() @@ -223,7 +231,7 @@ def _check_pong(self): if self.pong_received: self.pong_received = False else: - self.logger.info("Did not receive pong in time. Will attempt to reconnect.") + self.connection_logger.info("Did not receive pong in time. Will attempt to reconnect.") self.state = "failed" self.reconnect() @@ -232,33 +240,34 @@ def _connect_handler(self, data): self.socket_id = parsed['socket_id'] self.state = "connected" - def _failed_handler(self, data): + def _failed_handler(self, _): self.state = "failed" - def _ping_handler(self, data): + def _ping_handler(self, _): self.send_pong() # Restart our timers since we received something on the connection self._start_timers() - def _pong_handler(self, data): - self.logger.info("Connection: pong from pusher") + def _pong_handler(self, _): + self.connection_logger.info("Connection: pong from pusher") self.pong_received = True def _pusher_error_handler(self, data): if 'code' in data: error_code = None + # noinspection PyBroadException try: error_code = int(data['code']) except: pass if error_code is not None: - self.logger.error("Connection: Received error %s" % error_code) + self.connection_logger.error("Connection: Received error %s" % error_code) if (error_code >= 4000) and (error_code <= 4099): # The connection SHOULD NOT be re-established unchanged - self.logger.info("Connection: Error is unrecoverable. Disconnecting") + self.connection_logger.warning("Connection: Error is unrecoverable. Disconnecting") self.disconnect() elif (error_code >= 4100) and (error_code <= 4199): # The connection SHOULD be re-established after backing off @@ -269,11 +278,11 @@ def _pusher_error_handler(self, data): else: pass else: - self.logger.error("Connection: Unknown error code") + self.connection_logger.error("Connection: Unknown error code") else: - self.logger.error("Connection: No error code supplied") + self.connection_logger.error("Connection: No error code supplied") def _connection_timed_out(self): - self.logger.info("Did not receive any data in time. Reconnecting.") + self.connection_logger.info("Did not receive any data in time. Reconnecting.") self.state = "failed" self.reconnect() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6b93e79 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +websocket-client==0.37.0 diff --git a/requirements_tests.txt b/requirements_tests.txt new file mode 100644 index 0000000..2cb1fb0 --- /dev/null +++ b/requirements_tests.txt @@ -0,0 +1,6 @@ +autobahn==0.16.1 +mock==2.0.0 +nose==1.3.7 +trollius==2.1 +Twisted==16.5.0 +websocket-client==0.37.0 diff --git a/setup.py b/setup.py index 8d0fefa..c095d5b 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,20 @@ from setuptools import setup +import sys VERSION = "0.3.0" -requirements = ["websocket-client"] +major, minor1, minor2, release, serial = sys.version_info +def readfile(filename): + with open(filename, **readfile_kwargs) as fp: + contents = fp.read() + return contents + def readme(): - with open('README.md') as f: + with open("README.rst") as f: return f.read() + setup( name="pusherclient", version=VERSION, @@ -18,21 +25,21 @@ def readme(): author_email="e.kulyk@gmail.com", license="MIT", url="https://github.com/ekulyk/PythonPusherClient", - install_requires=requirements, + install_requires=readfile(os.path.join(os.path.dirname(__file__), "requirements.txt")), packages=["pusherclient"], classifiers=[ - 'Development Status :: 3 - Alpha', - 'Environment :: Web Environment', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Topic :: Internet', - 'Topic :: Software Development :: Libraries ', - 'Topic :: Software Development :: Libraries :: Python Modules', + "Development Status :: 3 - Alpha", + "Environment :: Web Environment", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Topic :: Internet", + "Topic :: Software Development :: Libraries ", + "Topic :: Software Development :: Libraries :: Python Modules", ] ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_pusherclient.py b/tests/test_pusherclient.py index e3ff3f7..76ad9f3 100644 --- a/tests/test_pusherclient.py +++ b/tests/test_pusherclient.py @@ -1,89 +1,73 @@ -#!/usr/bin/env python - -import sys +import json import time -import threading - -import pusherclient -import pusherserver +from unittest import TestCase try: - import simplejson as json + from unittest import mock except ImportError: - import json + # noinspection PyUnresolvedReferences + import mock -# Add a logging handler so we can see the raw communication data -import logging -root = logging.getLogger() -root.setLevel(logging.INFO) -ch = logging.StreamHandler(sys.stdout) -root.addHandler(ch) -PORT = 9000 - -SUCCESS = 0 -ERR_FAILED = 1 -ERR_TIMEOUT = 2 - -global client -global server -global exit_code +import pusherclient +from tests import pusherserver -def test_channel_callback(data): - global exit_code - print("Client: %s" % data) - data = json.loads(data) +class TestPusherClient(TestCase): + _PORT = 9000 + _MAX_ELAPSED_TIME_SECS = 10 - if 'message' in data: - if data['message'] == "test": - # Test successful - exit_code = SUCCESS - server.stop() - sys.exit(exit_code) + def test(self): + server = pusherserver.Pusher(pusherserver.PusherTestServerProtocol, port=self._PORT) + def _stop_test(): + print('Stop timer') + client.disconnect() + server.stop(fromThread=True) -def connect_handler(data): - channel = client.subscribe("test_channel") - channel.bind('test_event', test_channel_callback) + stop_test = mock.Mock() + stop_test.side_effect = _stop_test + success_mock = mock.Mock() -def stop_test(): - global exit_code + def _test_channel_callback(data): + print("Client: %s" % data) - exit_code = ERR_TIMEOUT + data = json.loads(data) - client.disconnect() - server.stop(fromThread=True) + if 'message' in data: + if data['message'] == "test": + # Test successful + success_mock() + server.stop() + client.disconnect() -if __name__ == '__main__': - global client - global server - global exit_code + test_channel_callback = mock.Mock() + test_channel_callback.side_effect = _test_channel_callback - exit_code = ERR_FAILED + def _connect_handler(_): + channel = client.subscribe("test_channel") + channel.bind('test_event', test_channel_callback) - # If testing taking longer than N seconds, we have an issue. This time - # depends on the client reconnect interval most of all. - timer = threading.Timer(10, stop_test) - timer.daemon = True - timer.start() + connect_handler = mock.Mock() + connect_handler.side_effect = _connect_handler - # Set up our client and attempt to connect to the server - appkey = 'appkey' - pusherclient.Pusher.host = "127.0.0.1" - client = pusherclient.Pusher(appkey, port=PORT, secure=False, reconnect_interval=1) + # Set up our client and attempt to connect to the server + appkey = 'appkey' + pusherclient.Pusher.host = "127.0.0.1" + client = pusherclient.Pusher(appkey, port=self._PORT, secure=False, reconnect_interval=1) - print(client._build_url("mykey", False, port=PORT)) - client.connection.bind('pusher:connection_established', connect_handler) - client.connect() + print(client._build_url("mykey", False, port=self._PORT)) + client.connection.bind('pusher:connection_established', connect_handler) + client.connect() - # Sleep a bit before starting the server - this will cause the clients - # initial connect to fail, forcing it to use the retry mechanism - time.sleep(2) + # Sleep a bit before starting the server - this will cause the clients + # initial connect to fail, forcing it to use the retry mechanism + time.sleep(2) - # Start our pusher server on localhost - server = pusherserver.Pusher(pusherserver.PusherTestServerProtocol, port=PORT) - server.run() + start = time.time() + # Start our pusher server on localhost + server.run() - sys.exit(exit_code) + if not success_mock.call_count or time.time() - start > self._MAX_ELAPSED_TIME_SECS: + self.fail('Successful callback not called')