Skip to content
This repository was archived by the owner on Jan 9, 2024. It is now read-only.

Commit 5b105cb

Browse files
committed
Merge branch 'unstable'
2 parents e0b2f53 + 48ea3e7 commit 5b105cb

File tree

11 files changed

+243
-233
lines changed

11 files changed

+243
-233
lines changed

docs/pipelines.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ Pipelines
55
How pipelining works
66
--------------------
77

8+
In redis-py-cluster, pipelining is all about trying to achieve greater network efficiency. Transaction support is disabled in redis-py-cluster. Use pipelines to avoid extra network round-trips, not to ensure atomicity.
9+
810
Just like in `redis-py`, `redis-py-cluster` queues up all the commands inside the client until execute is called. But, once execute is called, `redis-py-cluster` internals work slightly differently. It still packs the commands to efficiently transmit multiple commands across the network. But since different keys may be mapped to different nodes, redis-py-cluster must first map each key to the expected node. It then packs all the commands destined for each node in the cluster into its own packed sequence of commands. It uses the redis-py library to communicate with each node in the cluster.
911

10-
Ideally all the commands should be sent to each node in the cluster in parallel so that all the commands can be processed as fast as possible. The naive approach is to iterate through each node and send each batch of commands sequentially to each node. If redis-py supported some sort of non-blocking i/o we could send the network requests first and multiplex the socket responses from each node. Instead, we use threads to send the requests in parallel so that the total execution time only equals the amount of time for the slowest round trip to and from the given set of nodes in the cluster needed to process the commands.
12+
Ideally all the commands should be sent to each node in the cluster in parallel so that all the commands can be processed as fast as possible. We do this by first writing all of the commands to the sockets sequentially before reading any of the responses. This allows us to parallelize the network i/o without the overhead of managing python threads.
1113

12-
In previous versions of the library there were some bugs associated with threaded operations and pipelining. We were freeing connections back into the connection pool prior to reading the responses from each thread and it caused all kinds of problems. Those issues were fixed but there was a special flag to allow you to turn off threading in case you were worried about it. Since we no longer have to use threads at all to get the performance we want, that flag was removed from the client.
14+
In previous versions of the library there were some bugs associated with pipelining operations. In an effort to simplify the logic and lessen the likelihood of bugs, if we get back connection errors, MOVED errors, ASK errors or any other error that can safely be retried, we fall back to sending these remaining commands sequentially to each individual node just as we would in a normal redis call. We still buffer the results inside the pipeline response so there will be no change in client behavior. During normal cluster operations, pipelined commands should work nearly efficiently as pipelined commands to a single instance redis. When there is a disruption to the cluster topography, like when keys are being resharded, or when a slave takes over for a master, there will be a slight loss of network efficiency. Commands that are rejected by the server are tried one at a time as we rebuild the slot mappings. Once the slots table is rebuilt correctly (usally in a second or so), the client resumes efficient networking behavior. We felt it was more important to prioritize correctness of behavior and reliable error handling over networking efficiency for the rare cases where the cluster topography is in flux.
1315

1416

1517

docs/release-notes.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@ Release Notes
22
=============
33

44

5+
1.3.1 (Oct 13, 2016)
6+
--------------------
7+
8+
* Rebuilt broken method scan_iter. Previous tests was to small to detect the problem but is not corrected to work on a bigger dataset during the test of that method. (korvus81, Grokzen, RedWhiteMiko)
9+
* Errors in pipeline that should be retried, like connection errors, moved, errors and ask errors now fall back to single operation logic in StrictRedisCluster.execute_command. (72squared).
10+
* Moved reinitialize_steps and counter into nodemanager so it can be correctly counted across pipeline operations (72squared).
11+
12+
513
1.3.0 (Sep 11, 2016)
614
--------------------
715

docs/upgrading.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@ Upgrading redis-py-cluster
33

44
This document describes what must be done when upgrading between different versions to ensure that code still works.
55

6+
1.3.0 --> 1.3.1
7+
---------------
8+
9+
Method `scan_iter` was rebuilt becuase it was broken and did not perform as expected. If you are using this method you should be carefull with this new implementation and test it through before using it. The expanded testing for that method indicates it should work without problems. If you find any issues with the new method please open a issue on github.
10+
11+
A major refactoring was performed in the pipeline system that improved error handling and reliability of execution. It also simplified the code alot to make it easier to understand and continue to develop in the future. Becuase of this major refactoring you should really test throuhg your pipeline code to ensure that none of your code is broken because of this refactoring.
12+
13+
614

715
1.2.0 --> Next release
816
----------------------

rediscluster/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
setattr(redis, "StrictClusterPipeline", StrictClusterPipeline)
1717

1818
# Major, Minor, Fix version
19-
__version__ = (1, 3, 0)
19+
__version__ = (1, 3, 1)
2020

2121
if sys.version_info[0:3] == (3, 4, 0):
2222
raise RuntimeError("CRITICAL: rediscluster do not work with python 3.4.0. Please use 3.4.1 or higher.")

rediscluster/client.py

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from redis import StrictRedis
2929
from redis.client import list_or_args, parse_info
3030
from redis.connection import Token
31-
from redis._compat import iteritems, basestring, b, izip, nativestr
31+
from redis._compat import iteritems, basestring, b, izip, nativestr, long
3232
from redis.exceptions import RedisError, ResponseError, TimeoutError, DataError, ConnectionError, BusyLoadingError
3333

3434

@@ -154,6 +154,7 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32,
154154
startup_nodes=startup_nodes,
155155
init_slot_cache=init_slot_cache,
156156
max_connections=max_connections,
157+
reinitialize_steps=reinitialize_steps,
157158
max_connections_per_node=max_connections_per_node,
158159
**kwargs
159160
)
@@ -164,8 +165,6 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=32,
164165
self.nodes_flags = self.__class__.NODES_FLAGS.copy()
165166
self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy()
166167
self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
167-
self.reinitialize_counter = 0
168-
self.reinitialize_steps = reinitialize_steps or 25
169168
self.response_callbacks = dict_merge(self.response_callbacks, self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS)
170169

171170
def __repr__(self):
@@ -196,10 +195,8 @@ def pipeline(self, transaction=None, shard_hint=None):
196195
return StrictClusterPipeline(
197196
connection_pool=self.connection_pool,
198197
startup_nodes=self.connection_pool.nodes.startup_nodes,
199-
refresh_table_asap=self.refresh_table_asap,
200198
result_callbacks=self.result_callbacks,
201199
response_callbacks=self.response_callbacks,
202-
reinitialize_steps=self.reinitialize_steps
203200
)
204201

205202
def transaction(self, *args, **kwargs):
@@ -326,14 +323,13 @@ def execute_command(self, *args, **kwargs):
326323
# This counter will increase faster when the same client object
327324
# is shared between multiple threads. To reduce the frequency you
328325
# can set the variable 'reinitialize_steps' in the constructor.
329-
self.reinitialize_counter += 1
330-
if self.reinitialize_counter % self.reinitialize_steps == 0:
331-
self.refresh_table_asap = True
326+
self.refresh_table_asap = True
327+
self.connection_pool.nodes.increment_reinitialize_counter()
332328

333329
node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
334330
self.connection_pool.nodes.slots[e.slot_id][0] = node
335331
except TryAgainError as e:
336-
if ttl < self.COMMAND_TTL / 2:
332+
if ttl < self.RedisClusterRequestTTL / 2:
337333
time.sleep(0.05)
338334
except AskError as e:
339335
redirect_addr, asking = "{0}:{1}".format(e.host, e.port), True
@@ -551,6 +547,13 @@ def cluster_slots(self):
551547
##########
552548
# All methods that must have custom implementation
553549

550+
def _parse_scan(self, response, **options):
551+
"""
552+
Borrowed from redis-py::client.py
553+
"""
554+
cursor, r = response
555+
return long(cursor), r
556+
554557
def scan_iter(self, match=None, count=None):
555558
"""
556559
Make an iterator using the SCAN command so that the client doesn't
@@ -562,13 +565,36 @@ def scan_iter(self, match=None, count=None):
562565
Cluster impl:
563566
Result from SCAN is different in cluster mode.
564567
"""
565-
cursor = '0'
566-
while cursor != 0:
567-
for _, node_data in self.scan(cursor=cursor, match=match, count=count).items():
568-
cursor, data = node_data
568+
cursors = {}
569+
nodeData = {}
570+
for master_node in self.connection_pool.nodes.all_masters():
571+
cursors[master_node["name"]] = "0"
572+
nodeData[master_node["name"]] = master_node
573+
574+
while not all(cursors[node] == 0 for node in cursors):
575+
for node in cursors:
576+
if cursors[node] == 0:
577+
continue
578+
579+
conn = self.connection_pool.get_connection_by_node(nodeData[node])
580+
581+
pieces = ['SCAN', cursors[node]]
582+
if match is not None:
583+
pieces.extend([Token('MATCH'), match])
584+
if count is not None:
585+
pieces.extend([Token('COUNT'), count])
586+
587+
conn.send_command(*pieces)
588+
589+
raw_resp = conn.read_response()
590+
591+
# if you don't release the connection, the driver will make another, and you will hate your life
592+
self.connection_pool.release(conn)
593+
cur, resp = self._parse_scan(raw_resp)
594+
cursors[node] = cur
569595

570-
for item in data:
571-
yield item
596+
for r in resp:
597+
yield r
572598

573599
def mget(self, keys, *args):
574600
"""
@@ -1095,7 +1121,6 @@ def pipeline(self, transaction=True, shard_hint=None):
10951121
return StrictClusterPipeline(
10961122
connection_pool=self.connection_pool,
10971123
startup_nodes=self.connection_pool.nodes.startup_nodes,
1098-
refresh_table_asap=self.refresh_table_asap,
10991124
response_callbacks=self.response_callbacks
11001125
)
11011126

rediscluster/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,15 @@ class ClusterConnectionPool(ConnectionPool):
7070
RedisClusterDefaultTimeout = None
7171

7272
def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=ClusterConnection,
73-
max_connections=None, max_connections_per_node=False, **connection_kwargs):
73+
max_connections=None, max_connections_per_node=False, reinitialize_steps=None, **connection_kwargs):
7474
"""
7575
"""
7676
super(ClusterConnectionPool, self).__init__(connection_class=connection_class, max_connections=max_connections)
7777

7878
self.max_connections = max_connections or 2 ** 31
7979
self.max_connections_per_node = max_connections_per_node
8080

81-
self.nodes = NodeManager(startup_nodes, **connection_kwargs)
81+
self.nodes = NodeManager(startup_nodes, reinitialize_steps=reinitialize_steps, **connection_kwargs)
8282
if init_slot_cache:
8383
self.nodes.initialize()
8484

rediscluster/nodemanager.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ class NodeManager(object):
1919
"""
2020
RedisClusterHashSlots = 16384
2121

22-
def __init__(self, startup_nodes=None, **connection_kwargs):
22+
def __init__(self, startup_nodes=None, reinitialize_steps=None, **connection_kwargs):
2323
"""
2424
"""
2525
self.connection_kwargs = connection_kwargs
2626
self.nodes = {}
2727
self.slots = {}
2828
self.startup_nodes = [] if startup_nodes is None else startup_nodes
2929
self.orig_startup_nodes = [node for node in self.startup_nodes]
30+
self.reinitialize_counter = 0
31+
self.reinitialize_steps = reinitialize_steps or 25
3032

3133
if not self.startup_nodes:
3234
raise RedisClusterException("No startup nodes provided")
@@ -237,6 +239,13 @@ def initialize(self):
237239
# Set the tmp variables to the real variables
238240
self.slots = tmp_slots
239241
self.nodes = nodes_cache
242+
self.reinitialize_counter = 0
243+
244+
def increment_reinitialize_counter(self, ct=1):
245+
for i in range(1, ct):
246+
self.reinitialize_counter += 1
247+
if self.reinitialize_counter % self.reinitialize_steps == 0:
248+
self.initialize()
240249

241250
def cluster_require_full_coverage(self, nodes_cache):
242251
"""

0 commit comments

Comments
 (0)