diff --git a/python/DESCRIPTION.md b/python/DESCRIPTION.md new file mode 100644 index 0000000..15ff9e9 --- /dev/null +++ b/python/DESCRIPTION.md @@ -0,0 +1,34 @@ +# Locally Optimized Product Quantization + +This is Python training and testing code for Locally Optimized Product Quantization (LOPQ) models, as well as Spark scripts to scale training to hundreds of millions of vectors. The resulting model can be used in Python with code provided here or deployed via a Protobuf format to, e.g., search backends for high performance approximate nearest neighbor search. + +### Overview + +Locally Optimized Product Quantization (LOPQ) [1] is a hierarchical quantization algorithm that produces codes of configurable length for data points. These codes are efficient representations of the original vector and can be used in a variety of ways depending on application, including as hashes that preserve locality, as a compressed vector from which an approximate vector in the data space can be reconstructed, and as a representation from which to compute an approximation of the Euclidean distance between points. + +Conceptually, the LOPQ quantization process can be broken into 4 phases. The training process also fits these phases to the data in the same order. + +1. The raw data vector is PCA'd to `D` dimensions (possibly the original dimensionality). This allows subsequent quantization to more efficiently represent the variation present in the data. +2. The PCA'd data is then product quantized [2] by two k-means quantizers. This means that each vector is split into two subvectors each of dimension `D / 2`, and each of the two subspaces is quantized independently with a vocabulary of size `V`. Since the two quantizations occur independently, the dimensions of the vectors are permuted such that the total variance in each of the two subspaces is approximately equal, which allows the two vocabularies to be equally important in terms of capturing the total variance of the data. This results in a pair of cluster ids that we refer to as "coarse codes". +3. The residuals of the data after coarse quantization are computed. The residuals are then locally projected independently for each coarse cluster. This projection is another application of PCA and dimension permutation on the residuals and it is "local" in the sense that there is a different projection for each cluster in each of the two coarse vocabularies. These local rotations make the next and final step, another application of product quantization, very efficient in capturing the variance of the residuals. +4. The locally projected data is then product quantized a final time by `M` subquantizers, resulting in `M` "fine codes". Usually the vocabulary for each of these subquantizers will be a power of 2 for effective storage in a search index. With vocabularies of size 256, the fine codes for each indexed vector will require `M` bytes to store in the index. + +The final LOPQ code for a vector is a `(coarse codes, fine codes)` pair, e.g. `((3, 2), (14, 164, 83, 49, 185, 29, 196, 250))`. + +### Nearest Neighbor Search + +A nearest neighbor index can be built from these LOPQ codes by indexing each document into its corresponding coarse code bucket. That is, each pair of coarse codes (which we refer to as a "cell") will index a bucket of the vectors quantizing to that cell. + +At query time, an incoming query vector undergoes substantially the same process. First, the query is split into coarse subvectors and the distance to each coarse centroid is computed. These distances can be used to efficiently compute a priority-ordered sequence of cells [3] such that cells later in the sequence are less likely to have near neighbors of the query than earlier cells. The items in cell buckets are retrieved in this order until some desired quota has been met. + +After this retrieval phase, the fine codes are used to rank by approximate Euclidean distance. The query is projected into each local space and the distance to each indexed item is estimated as the sum of the squared distances of the query subvectors to the corresponding subquantizer centroid indexed by the fine codes. + +NN search with LOPQ is highly scalable and has excellent properties in terms of both index storage requirements and query-time latencies when implemented well. + +#### References + +For more information and performance benchmarks can be found at http://image.ntua.gr/iva/research/lopq/. + +1. Y. Kalantidis, Y. Avrithis. [Locally Optimized Product Quantization for Approximate Nearest Neighbor Search.](http://image.ntua.gr/iva/files/lopq.pdf) CVPR 2014. +2. H. Jegou, M. Douze, and C. Schmid. [Product quantization for nearest neighbor search.](https://lear.inrialpes.fr/pubs/2011/JDS11/jegou_searching_with_quantization.pdf) PAMI, 33(1), 2011. +3. A. Babenko and V. Lempitsky. [The inverted multi-index.](http://www.computer.org/csdl/trans/tp/preprint/06915715.pdf) CVPR 2012. \ No newline at end of file diff --git a/python/lopq/__init__.py b/python/lopq/__init__.py index 33be870..5576293 100644 --- a/python/lopq/__init__.py +++ b/python/lopq/__init__.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import model import search import utils diff --git a/python/lopq/eval.py b/python/lopq/eval.py index 5d5ae48..122b7e9 100644 --- a/python/lopq/eval.py +++ b/python/lopq/eval.py @@ -1,13 +1,13 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import time import numpy as np def compute_all_neighbors(data1, data2=None, just_nn=True): - """ - For each point in data1, compute a ranked list of neighbor indices from data2. - If data2 is not provided, compute neighbors relative to data1 + """ For each point in data1, compute a ranked list of neighbor indices + from data2. If data2 is not provided, compute neighbors relative to data1 :param ndarray data1: an m1 x n dim matrix with observations on the rows @@ -15,7 +15,8 @@ def compute_all_neighbors(data1, data2=None, just_nn=True): an m2 x n dim matrix with observations on the rows :returns ndarray: - an m1 x m2 dim matrix with the distance-sorted indices of neighbors on the rows + an m1 x m2 dim matrix with the distance-sorted indices of neighbors + on the rows """ from scipy.spatial.distance import cdist @@ -89,10 +90,11 @@ def get_proportion_of_reconstructions_with_same_codes(data, model): return float(count) / N -def get_recall(searcher, queries, nns, thresholds=[1, 10, 100, 1000], normalize=True, verbose=False): - """ - Given a LOPQSearcher object with indexed data and groundtruth nearest neighbors for a set of test - query vectors, collect and return recall statistics. +def get_recall(searcher, queries, nns, thresholds=[1, 10, 100, 1000], + normalize=True, verbose=False): + """ Given a LOPQSearcher object with indexed data and groundtruth nearest + neighbors for a set of test query vectors, collect and return recall + statistics. :param LOPQSearcher searcher: a searcher that contains the indexed nearest neighbors @@ -101,10 +103,11 @@ def get_recall(searcher, queries, nns, thresholds=[1, 10, 100, 1000], normalize= :param ndarray nns: a list of true nearest neighbor ids for each vector in queries :param list thresholds: - the recall thresholds to evaluate - the last entry defines the number of - results to retrieve before ranking + the recall thresholds to evaluate - the last entry defines the number + of results to retrieve before ranking :param bool normalize: - flag to indicate whether the result should be normalized by the number of queries + flag to indicate whether the result should be normalized by the number + of queries :param bool verbose: flag to print every 50th search to visualize progress @@ -156,6 +159,9 @@ def get_subquantizer_distortion(data, model): pall = np.concatenate((p1, p2), axis=1) suball = model.subquantizers[0] + model.subquantizers[1] - dists = np.array([sum(np.linalg.norm(compute_residuals(d, c)[0], ord=2, axis=1) ** 2) for c, d in zip(suball, np.split(pall, 8, axis=1))]) + dists = np.array([ + sum(np.linalg.norm(compute_residuals(d, c)[0], ord=2, axis=1) ** 2) + for c, d in zip(suball, np.split(pall, 8, axis=1)) + ]) return dists / data.shape[0] diff --git a/python/lopq/lopq_model_pb2.py b/python/lopq/lopq_model_pb2.py index 2109a8c..bc25fa7 100644 --- a/python/lopq/lopq_model_pb2.py +++ b/python/lopq/lopq_model_pb2.py @@ -2,7 +2,9 @@ # source: lopq_model.proto import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +_b = (sys.version_info[0] < 3 and (lambda x: x) or ( + lambda x: x.encode('latin1'))) + from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection @@ -13,161 +15,149 @@ _sym_db = _symbol_database.Default() - - DESCRIPTOR = _descriptor.FileDescriptor( - name='lopq_model.proto', - package='com.flickr.vision.lopq', - serialized_pb=_b('\n\x10lopq_model.proto\x12\x16\x63om.flickr.vision.lopq\"\x1c\n\x06Vector\x12\x12\n\x06values\x18\x01 \x03(\x02\x42\x02\x10\x01\"+\n\x06Matrix\x12\x12\n\x06values\x18\x01 \x03(\x02\x42\x02\x10\x01\x12\r\n\x05shape\x18\x02 \x03(\r\"\x80\x02\n\x0fLOPQModelParams\x12\t\n\x01\x44\x18\x01 \x01(\r\x12\t\n\x01V\x18\x02 \x01(\r\x12\t\n\x01M\x18\x03 \x01(\r\x12\x19\n\x11num_subquantizers\x18\x04 \x01(\r\x12*\n\x02\x43s\x18\x05 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.Matrix\x12*\n\x02Rs\x18\x06 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.Matrix\x12+\n\x03mus\x18\x07 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.Vector\x12,\n\x04subs\x18\x08 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.MatrixB/\n\x16\x63om.flickr.vision.lopqB\x13LOPQModelParametersH\x01') + name='lopq_model.proto', + package='com.flickr.vision.lopq', + serialized_pb=_b('\n\x10lopq_model.proto\x12\x16\x63om.flickr.vision.lopq\"\x1c\n\x06Vector\x12\x12\n\x06values\x18\x01 \x03(\x02\x42\x02\x10\x01\"+\n\x06Matrix\x12\x12\n\x06values\x18\x01 \x03(\x02\x42\x02\x10\x01\x12\r\n\x05shape\x18\x02 \x03(\r\"\x80\x02\n\x0fLOPQModelParams\x12\t\n\x01\x44\x18\x01 \x01(\r\x12\t\n\x01V\x18\x02 \x01(\r\x12\t\n\x01M\x18\x03 \x01(\r\x12\x19\n\x11num_subquantizers\x18\x04 \x01(\r\x12*\n\x02\x43s\x18\x05 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.Matrix\x12*\n\x02Rs\x18\x06 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.Matrix\x12+\n\x03mus\x18\x07 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.Vector\x12,\n\x04subs\x18\x08 \x03(\x0b\x32\x1e.com.flickr.vision.lopq.MatrixB/\n\x16\x63om.flickr.vision.lopqB\x13LOPQModelParametersH\x01') ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) - - _VECTOR = _descriptor.Descriptor( - name='Vector', - full_name='com.flickr.vision.lopq.Vector', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='values', full_name='com.flickr.vision.lopq.Vector.values', index=0, - number=1, type=2, cpp_type=6, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\020\001'))), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - oneofs=[ - ], - serialized_start=44, - serialized_end=72, + name='Vector', + full_name='com.flickr.vision.lopq.Vector', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='values', full_name='com.flickr.vision.lopq.Vector.values', + index=0, number=1, type=2, cpp_type=6, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), + _b('\020\001')) + ), ], + extensions=[], + nested_types=[], + enum_types=[], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[], + serialized_start=44, + serialized_end=72, ) _MATRIX = _descriptor.Descriptor( - name='Matrix', - full_name='com.flickr.vision.lopq.Matrix', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='values', full_name='com.flickr.vision.lopq.Matrix.values', index=0, - number=1, type=2, cpp_type=6, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\020\001'))), - _descriptor.FieldDescriptor( - name='shape', full_name='com.flickr.vision.lopq.Matrix.shape', index=1, - number=2, type=13, cpp_type=3, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - oneofs=[ - ], - serialized_start=74, - serialized_end=117, + name='Matrix', + full_name='com.flickr.vision.lopq.Matrix', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='values', full_name='com.flickr.vision.lopq.Matrix.values', index=0, + number=1, type=2, cpp_type=6, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\020\001'))), + _descriptor.FieldDescriptor( + name='shape', full_name='com.flickr.vision.lopq.Matrix.shape', index=1, + number=2, type=13, cpp_type=3, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[], + nested_types=[], + enum_types=[], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[], + serialized_start=74, + serialized_end=117, ) _LOPQMODELPARAMS = _descriptor.Descriptor( - name='LOPQModelParams', - full_name='com.flickr.vision.lopq.LOPQModelParams', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='D', full_name='com.flickr.vision.lopq.LOPQModelParams.D', index=0, - number=1, type=13, cpp_type=3, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='V', full_name='com.flickr.vision.lopq.LOPQModelParams.V', index=1, - number=2, type=13, cpp_type=3, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='M', full_name='com.flickr.vision.lopq.LOPQModelParams.M', index=2, - number=3, type=13, cpp_type=3, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='num_subquantizers', full_name='com.flickr.vision.lopq.LOPQModelParams.num_subquantizers', index=3, - number=4, type=13, cpp_type=3, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='Cs', full_name='com.flickr.vision.lopq.LOPQModelParams.Cs', index=4, - number=5, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='Rs', full_name='com.flickr.vision.lopq.LOPQModelParams.Rs', index=5, - number=6, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='mus', full_name='com.flickr.vision.lopq.LOPQModelParams.mus', index=6, - number=7, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='subs', full_name='com.flickr.vision.lopq.LOPQModelParams.subs', index=7, - number=8, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - oneofs=[ - ], - serialized_start=120, - serialized_end=376, + name='LOPQModelParams', + full_name='com.flickr.vision.lopq.LOPQModelParams', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='D', full_name='com.flickr.vision.lopq.LOPQModelParams.D', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='V', full_name='com.flickr.vision.lopq.LOPQModelParams.V', index=1, + number=2, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='M', full_name='com.flickr.vision.lopq.LOPQModelParams.M', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='num_subquantizers', full_name='com.flickr.vision.lopq.LOPQModelParams.num_subquantizers', index=3, + number=4, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='Cs', full_name='com.flickr.vision.lopq.LOPQModelParams.Cs', index=4, + number=5, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='Rs', full_name='com.flickr.vision.lopq.LOPQModelParams.Rs', index=5, + number=6, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='mus', full_name='com.flickr.vision.lopq.LOPQModelParams.mus', index=6, + number=7, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='subs', full_name='com.flickr.vision.lopq.LOPQModelParams.subs', index=7, + number=8, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[], + nested_types=[], + enum_types=[], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[], + serialized_start=120, + serialized_end=376, ) _LOPQMODELPARAMS.fields_by_name['Cs'].message_type = _MATRIX @@ -179,31 +169,34 @@ DESCRIPTOR.message_types_by_name['LOPQModelParams'] = _LOPQMODELPARAMS Vector = _reflection.GeneratedProtocolMessageType('Vector', (_message.Message,), dict( - DESCRIPTOR = _VECTOR, - __module__ = 'lopq_model_pb2' - # @@protoc_insertion_point(class_scope:com.flickr.vision.lopq.Vector) - )) + DESCRIPTOR=_VECTOR, + __module__='lopq_model_pb2' + # @@protoc_insertion_point(class_scope:com.flickr.vision.lopq.Vector) +)) _sym_db.RegisterMessage(Vector) Matrix = _reflection.GeneratedProtocolMessageType('Matrix', (_message.Message,), dict( - DESCRIPTOR = _MATRIX, - __module__ = 'lopq_model_pb2' - # @@protoc_insertion_point(class_scope:com.flickr.vision.lopq.Matrix) - )) + DESCRIPTOR=_MATRIX, + __module__='lopq_model_pb2' + # @@protoc_insertion_point(class_scope:com.flickr.vision.lopq.Matrix) +)) _sym_db.RegisterMessage(Matrix) LOPQModelParams = _reflection.GeneratedProtocolMessageType('LOPQModelParams', (_message.Message,), dict( - DESCRIPTOR = _LOPQMODELPARAMS, - __module__ = 'lopq_model_pb2' - # @@protoc_insertion_point(class_scope:com.flickr.vision.lopq.LOPQModelParams) - )) + DESCRIPTOR=_LOPQMODELPARAMS, + __module__='lopq_model_pb2' + # @@protoc_insertion_point(class_scope:com.flickr.vision.lopq.LOPQModelParams) +)) _sym_db.RegisterMessage(LOPQModelParams) DESCRIPTOR.has_options = True -DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\026com.flickr.vision.lopqB\023LOPQModelParametersH\001')) +DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b( + '\n\026com.flickr.vision.lopqB\023LOPQModelParametersH\001')) _VECTOR.fields_by_name['values'].has_options = True -_VECTOR.fields_by_name['values']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\020\001')) +_VECTOR.fields_by_name['values']._options = _descriptor._ParseOptions( + descriptor_pb2.FieldOptions(), _b('\020\001')) _MATRIX.fields_by_name['values'].has_options = True -_MATRIX.fields_by_name['values']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\020\001')) +_MATRIX.fields_by_name['values']._options = _descriptor._ParseOptions( + descriptor_pb2.FieldOptions(), _b('\020\001')) # @@protoc_insertion_point(module_scope) diff --git a/python/lopq/model.py b/python/lopq/model.py index 7c89f0c..a331030 100644 --- a/python/lopq/model.py +++ b/python/lopq/model.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import numpy as np from sklearn.cluster import KMeans import logging @@ -11,23 +12,24 @@ logger.setLevel(logging.WARNING) logger.addHandler(logging.StreamHandler(sys.stdout)) - -######################################## +# ============================================================================ # Core training algo -######################################## +# ============================================================================ + def eigenvalue_allocation(num_buckets, eigenvalues): """ Compute a permutation of eigenvalues to balance variance accross buckets of dimensions. - Described in section 3.2.4 in http://research.microsoft.com/pubs/187499/cvpr13opq.pdf + Described in section 3.2.4 in: - Note, the following slides indicate this function will break when fed eigenvalues < 1 - without the scaling trick implemented below: + - http://research.microsoft.com/pubs/187499/cvpr13opq.pdf - https://www.robots.ox.ac.uk/~vgg/rg/slides/ge__cvpr2013__optimizedpq.pdf + Note, the following slides indicate this function will break when fed + eigenvalues < 1 without the scaling trick implemented below: + - https://www.robots.ox.ac.uk/~vgg/rg/slides/ge__cvpr2013__optimizedpq.pdf :param int num_buckets: the number of dimension buckets over which to allocate eigenvalues @@ -46,7 +48,8 @@ def eigenvalue_allocation(num_buckets, eigenvalues): # We first must scale the eigenvalues by dividing by their # smallets non-zero value to avoid problems with the algorithm # when eigenvalues are less than 1. - min_non_zero_eigenvalue = np.min(np.abs(eigenvalues[np.nonzero(eigenvalues)])) + min_non_zero_eigenvalue = np.min( + np.abs(eigenvalues[np.nonzero(eigenvalues)])) eigenvalues = eigenvalues / min_non_zero_eigenvalue # Iterate eigenvalues in descending order @@ -82,11 +85,13 @@ def compute_local_rotations(data, C, num_buckets): :param ndarray C: a VxD matrix of cluster centroids :param int num_buckets: - the number of buckets accross which to balance variance in the rotation matrix + the number of buckets accross which to balance variance + in the rotation matrix :returns ndarray: a VxDxD tensor containing the rotation matrix for each cluster - where V is the number of clusters and D the dimension of the data (which is split here) + where V is the number of clusters and D the dimension of the data + (which is split here) :returns ndarray: a Nx1 vector of cluster assignments for each data point :returns ndarray: @@ -97,7 +102,8 @@ def compute_local_rotations(data, C, num_buckets): logger.info('Fitting local rotations...') - A, mu, count, assignments, residuals = accumulate_covariance_estimators(data, C) + A, mu, count, assignments, residuals = accumulate_covariance_estimators( + data, C) R, mu = compute_rotations_from_accumulators(A, mu, count, num_buckets) @@ -107,8 +113,8 @@ def compute_local_rotations(data, C, num_buckets): def accumulate_covariance_estimators(data, C): - """ - Accumulate covariance estimators for each cluster with a pass through the data. + """ Accumulate covariance estimators for each cluster with a pass + through the data. :param ndarray data: NxD array - observations on the rows @@ -132,11 +138,13 @@ def accumulate_covariance_estimators(data, C): D = data.shape[1] # Essential variables - A = np.zeros((V, D, D)) # accumulators for covariance estimator per cluster + # accumulators for covariance estimator per cluster + A = np.zeros((V, D, D)) mu = np.zeros((V, D)) # residual means count = np.zeros(V, dtype=int) # count of points per cluster assignments = np.zeros(N, dtype=int) # point cluster assignments - residuals = np.zeros((N, D)) # residual for data points given cluster assignment + # residual for data points given cluster assignment + residuals = np.zeros((N, D)) # Iterate data points, accumulate estimators for i in xrange(N): @@ -160,8 +168,8 @@ def accumulate_covariance_estimators(data, C): def compute_rotations_from_accumulators(A, mu, count, num_buckets): """ Given accumulators computed on cluster residuals, compute the optimal - rotation matrix. The A and mu variables are modified in place and returned to - avoid memory allocation. + rotation matrix. The A and mu variables are modified in place and returned + to avoid memory allocation. :param ndarray A: a VxDxD array - total sum of outer products of residuals per cluster @@ -173,7 +181,8 @@ def compute_rotations_from_accumulators(A, mu, count, num_buckets): the number of subvectors to balance variance across :returns ndarray A: - a VxDxD array - per cluster local rotations of size DxD on the first dimension + a VxDxD array - per cluster local rotations of size DxD on the first + dimension :returns ndarray mu: a VxD array of mean residuals per cluster """ @@ -189,11 +198,13 @@ def compute_rotations_from_accumulators(A, mu, count, num_buckets): mu[i] /= num_points # Compute covariance estimator - cov = (A[i] + A[i].transpose()) / (2 * (num_points - 1)) - np.outer(mu[i], mu[i]) + cov = (A[i] + A[i].transpose()) / \ + (2 * (num_points - 1)) - np.outer(mu[i], mu[i]) # Compute eigenvalues, reuse A matrix if num_points < D: - logger.warn('Fewer points (%d) than dimensions (%d) in rotation computation for cluster %d' % (num_points, D, i)) + logger.warn('Fewer points (%d) than dimensions (%d) in rotation' + ' computation for cluster %d' % (num_points, D, i)) eigenvalues = np.ones(D) A[i] = np.eye(D) else: @@ -240,7 +251,8 @@ def compute_residuals(data, C): return residuals, assignments -def train_coarse(data, V=8, kmeans_coarse_iters=10, n_init=10, random_state=None, n_jobs=1): +def train_coarse(data, V=8, kmeans_coarse_iters=10, n_init=10, + random_state=None, n_jobs=1): """ Train a kmeans model. @@ -262,7 +274,9 @@ def train_coarse(data, V=8, kmeans_coarse_iters=10, n_init=10, random_state=None logger.info('Fitting coarse quantizer...') # Fit coarse model - model = KMeans(n_clusters=V, init="k-means++", max_iter=kmeans_coarse_iters, n_init=n_init, n_jobs=n_jobs, verbose=False, random_state=random_state) + model = KMeans(n_clusters=V, init="k-means++", n_init=n_init, + n_jobs=n_jobs, max_iter=kmeans_coarse_iters, + verbose=False, random_state=random_state) model.fit(data) logger.info('Done fitting coarse quantizer.') @@ -270,15 +284,18 @@ def train_coarse(data, V=8, kmeans_coarse_iters=10, n_init=10, random_state=None return model.cluster_centers_ -def train_subquantizers(data, num_buckets, subquantizer_clusters=256, kmeans_local_iters=20, n_init=10, random_state=None, n_jobs=1): +def train_subquantizers(data, num_buckets, subquantizer_clusters=256, + kmeans_local_iters=20, n_init=10, random_state=None, + n_jobs=1): """ Fit a set of num_buckets subquantizers for corresponding subvectors. """ subquantizers = list() for i, d in enumerate(np.split(data, num_buckets, axis=1)): - model = KMeans(n_clusters=subquantizer_clusters, init="k-means++", max_iter=kmeans_local_iters, - n_init=n_init, n_jobs=n_jobs, verbose=False, random_state=random_state) + model = KMeans(n_clusters=subquantizer_clusters, init="k-means++", + max_iter=kmeans_local_iters, n_init=n_init, + n_jobs=n_jobs, verbose=False, random_state=random_state) model.fit(d) subquantizers.append(model.cluster_centers_) logger.info('Fit subquantizer %d of %d.' % (i + 1, num_buckets)) @@ -311,10 +328,12 @@ def train(data, V=8, M=4, subquantizer_clusters=256, parameters=None, :param int kmeans_local_iters: kmeans iterations for subquantizers :param int n_init: - the number of independent kmeans runs for all kmeans when training - set low for faster training + the number of independent kmeans runs for all kmeans when training. + Set a low n_init for faster training :param float subquantizer_sample_ratio: - the proportion of the training data to sample for training subquantizers - since the number of - subquantizer clusters is much smaller then the number of coarse clusters, less data is needed + the proportion of the training data to sample for training + subquantizers - since the number of subquantizer clusters is much + smaller then the number of coarse clusters, less data is needed :param int random_state: a random seed used in all random operations during training if provided :param bool verbose: @@ -323,7 +342,8 @@ def train(data, V=8, M=4, subquantizer_clusters=256, parameters=None, the number of jobs to use for KMeans cluster computation :returns tuple: - a tuple of model parameters that can be used to instantiate an LOPQModel object + a tuple of model parameters that can be used to instantiate + an LOPQModel object """ # Set logging level for verbose mode @@ -348,8 +368,10 @@ def train(data, V=8, M=4, subquantizer_clusters=256, parameters=None, logger.info('Using existing coarse quantizers.') C1, C2 = Cs else: - C1 = train_coarse(first_half, V, kmeans_coarse_iters, n_init, random_state, n_jobs) - C2 = train_coarse(second_half, V, kmeans_coarse_iters, n_init, random_state, n_jobs) + C1 = train_coarse(first_half, V, kmeans_coarse_iters, + n_init, random_state, n_jobs) + C2 = train_coarse(second_half, V, kmeans_coarse_iters, + n_init, random_state, n_jobs) # Compute local rotations if Rs is not None and mus is not None: @@ -358,15 +380,18 @@ def train(data, V=8, M=4, subquantizer_clusters=256, parameters=None, mu1, mu2 = mus assignments1 = assignments2 = residuals1 = residuals2 = None else: - Rs1, mu1, assignments1, residuals1 = compute_local_rotations(first_half, C1, M / 2) - Rs2, mu2, assignments2, residuals2 = compute_local_rotations(second_half, C2, M / 2) + Rs1, mu1, assignments1, residuals1 = compute_local_rotations( + first_half, C1, M / 2) + Rs2, mu2, assignments2, residuals2 = compute_local_rotations( + second_half, C2, M / 2) # Subquantizers don't need as much data, so we could sample here subquantizer_sample_ratio = min(subquantizer_sample_ratio, 1.0) N = data.shape[0] N2 = int(np.floor(subquantizer_sample_ratio * N)) sample_inds = np.random.RandomState(random_state).choice(N, N2, False) - logger.info('Sampled training data for subquantizers with %f proportion (%d points).' % (subquantizer_sample_ratio, N2)) + logger.info('Sampled training data for subquantizers with %f proportion' + ' (%d points).' % (subquantizer_sample_ratio, N2)) # Use assignments and residuals from rotation computation if available if assignments1 is not None: @@ -375,8 +400,10 @@ def train(data, V=8, M=4, subquantizer_clusters=256, parameters=None, assignments1 = assignments1[sample_inds] assignments2 = assignments2[sample_inds] else: - residuals1, assignments1 = compute_residuals(first_half[sample_inds], C1) - residuals2, assignments2 = compute_residuals(second_half[sample_inds], C2) + residuals1, assignments1 = compute_residuals( + first_half[sample_inds], C1) + residuals2, assignments2 = compute_residuals( + second_half[sample_inds], C2) # Project residuals logger.info('Projecting residuals to local frame...') @@ -384,15 +411,20 @@ def train(data, V=8, M=4, subquantizer_clusters=256, parameters=None, projected2 = project_residuals_to_local(residuals2, assignments2, Rs2, mu2) logger.info('Fitting subquantizers...') - subquantizers1 = train_subquantizers(projected1, M / 2, subquantizer_clusters, kmeans_local_iters, n_init, random_state=random_state, n_jobs=n_jobs) - subquantizers2 = train_subquantizers(projected2, M / 2, subquantizer_clusters, kmeans_local_iters, n_init, random_state=random_state, n_jobs=n_jobs) + subquantizers1 = train_subquantizers( + projected1, M / 2, subquantizer_clusters, kmeans_local_iters, n_init, + random_state=random_state, n_jobs=n_jobs) + subquantizers2 = train_subquantizers( + projected2, M / 2, subquantizer_clusters, kmeans_local_iters, n_init, + random_state=random_state, n_jobs=n_jobs) logger.info('Done fitting subquantizers.') return (C1, C2), (Rs1, Rs2), (mu1, mu2), (subquantizers1, subquantizers2) -######################################## + +# ============================================================================ # Model class -######################################## +# ============================================================================ # Named tuple type for LOH codes LOPQCode = namedtuple('LOPQCode', ['coarse', 'fine']) @@ -401,12 +433,14 @@ def train(data, V=8, M=4, subquantizer_clusters=256, parameters=None, class LOPQModel(object): def __init__(self, V=8, M=4, subquantizer_clusters=256, parameters=None): """ - Create an LOPQModel instance that encapsulates a complete LOPQ model with parameters and hyperparameters. + Create an LOPQModel instance that encapsulates a complete LOPQ model + with parameters and hyperparameters. :param int V: the number of clusters per a coarse split :param int M: - the total number of subvectors (equivalent to the total number of subquantizers) + the total number of subvectors (equivalent to the total number + of subquantizers) :param int subquantizer_clusters: the number of clusters for each subquantizer :param tuple parameters: @@ -414,21 +448,31 @@ def __init__(self, V=8, M=4, subquantizer_clusters=256, parameters=None): the tuple will look like the following - ((C1, C2), (Rs1, Rs2), (mu1, mu2), (subquantizers1, subquantizers2)) + ((C1, C2), + (Rs1, Rs2), + (mu1, mu2), + (subquantizers1, subquantizers2) + ) - where each element is itself a pair with one split of parameters for the each of the coarse splits. + where each element is itself a pair with one split of parameters + for the each of the coarse splits. - the parameters have the following data types (V and M have the meaning described above, - D is the total dimension of the data, and S is the number of subquantizer clusters): + The parameters have the following data types (V and M have + the meaning described above, D is the total dimension of the data, + and S is the number of subquantizer clusters): C: VxD/2 ndarray of coarse centroids - R: VxD/2xD/2 ndarray of fitted rotation matrices for each coarse cluster + R: VxD/2xD/2 ndarray of fitted rotation matrices for each + coarse cluster mu: VxD/2 ndarray of mean residuals for each coar cluster - subquantizer: length M/2 list of SxD/M ndarrays of cluster centroids for each subvector + subquantizer: length M/2 list of SxD/M ndarrays of cluster + centroids for each subvector """ - # If learned parameters are passed in explicitly, derive the model params by inspection. - self.Cs, self.Rs, self.mus, self.subquantizers = parameters if parameters is not None else (None, None, None, None) + # If learned parameters are passed in explicitly, derive the model + # params by inspection. + self.Cs, self.Rs, self.mus, self.subquantizers = parameters or ( + None, None, None, None) if self.Cs is not None: self.V = self.Cs[0].shape[0] @@ -446,22 +490,26 @@ def __init__(self, V=8, M=4, subquantizer_clusters=256, parameters=None): self.M = M self.subquantizer_clusters = subquantizer_clusters - def fit(self, data, kmeans_coarse_iters=10, kmeans_local_iters=20, n_init=10, subquantizer_sample_ratio=1.0, random_state=None, verbose=False, n_jobs=1): - """ - Fit a model with the current model parameters. This method will use existing parameters and only - train missing parameters. + def fit(self, data, kmeans_coarse_iters=10, kmeans_local_iters=20, + n_init=10, subquantizer_sample_ratio=1.0, random_state=None, + verbose=False, n_jobs=1): + """ Fit a model with the current model parameters. This method will + use existing parameters and only train missing parameters. :param int kmeans_coarse_iters: the number of kmeans iterations for coarse quantizer training :param int kmeans_local_iters: the number of kmeans iterations for subquantizer taining :param int n_init: - the number of independent kmeans runs for all kmeans when training - set low for faster training + The number of independent kmeans runs for all kmeans when training. + Set low for faster training :param float subquantizer_sample_ratio: - the proportion of the training data to sample for training subquantizers - since the number of - subquantizer clusters is much smaller then the number of coarse clusters, less data is needed + The proportion of the training data to sample for training + subquantizers. Since the number of subquantizer clusters is much + smaller then the number of coarse clusters, less data is needed :param int random_state: - a random seed used in all random operations during training if provided + a random seed used in all random operations during training + if provided :param bool verbose: a bool enabling verbose output during training :param int n_jobs: @@ -469,8 +517,10 @@ def fit(self, data, kmeans_coarse_iters=10, kmeans_local_iters=20, n_init=10, su """ existing_parameters = (self.Cs, self.Rs, self.mus, self.subquantizers) - parameters = train(data, self.V, self.M, self.subquantizer_clusters, existing_parameters, - kmeans_coarse_iters, kmeans_local_iters, n_init, subquantizer_sample_ratio, + parameters = train(data, self.V, self.M, self.subquantizer_clusters, + existing_parameters, kmeans_coarse_iters, + kmeans_local_iters, n_init, + subquantizer_sample_ratio, random_state, verbose, n_jobs) self.Cs, self.Rs, self.mus, self.subquantizers = parameters @@ -489,12 +539,15 @@ def get_split_parameters(self, split): :returns list: a list of rotation matrices for each cluster :returns list: - a list of centroid matrices for each subquantizer in this coarse split + a list of centroid matrices for each subquantizer + in this coarse split """ - return self.Cs[split] if self.Cs is not None else None, \ - self.Rs[split] if self.Rs is not None else None, \ - self.mus[split] if self.mus is not None else None, \ - self.subquantizers[split] if self.subquantizers is not None else None + return ( + self.Cs[split] if self.Cs else None, + self.Rs[split] if self.Rs else None, + self.mus[split] if self.mus else None, + self.subquantizers[split] if self.subquantizers else None + ) def predict(self, x): """ @@ -526,7 +579,10 @@ def predict_coarse(self, x): :returns tuple: a tuple of coarse codes """ - return tuple([predict_cluster(cx, self.Cs[split]) for cx, split in iterate_splits(x, self.num_coarse_splits)]) + return tuple([ + predict_cluster(cx, self.Cs[split]) + for cx, split in iterate_splits(x, self.num_coarse_splits) + ]) def predict_fine(self, x, coarse_codes=None): """ @@ -553,21 +609,23 @@ def predict_fine(self, x, coarse_codes=None): _, _, _, subC = self.get_split_parameters(split) # Compute subquantizer codes - fine_codes += [predict_cluster(fx, subC[sub_split]) for fx, sub_split in iterate_splits(cx, self.num_fine_splits)] + fine_codes += [ + predict_cluster(fx, subC[sub_split]) + for fx, sub_split in iterate_splits(cx, self.num_fine_splits)] return tuple(fine_codes) def project(self, x, coarse_codes, coarse_split=None): - """ - Project this vector to its local residual space defined by the coarse codes. + """ Project this vector to its local residual space defined + by the coarsecodes. :param ndarray x: the point to project :param ndarray coarse_codes: the coarse codes defining the local space :param int coarse_split: - index of the coarse split to get distances for - if None then all splits - are computed + index of the coarse split to get distances for. + If None then all splits are computed :returns ndarray: the projected vector @@ -577,7 +635,8 @@ def project(self, x, coarse_codes, coarse_split=None): if coarse_split is None: split_iter = iterate_splits(x, self.num_coarse_splits) else: - split_iter = [(np.split(x, self.num_coarse_splits)[coarse_split], coarse_split)] + split_iter = [(np.split(x, self.num_coarse_splits) + [coarse_split], coarse_split)] for cx, split in split_iter: @@ -615,7 +674,8 @@ def reconstruct(self, codes): C, R, mu, subC = self.get_split_parameters(split) # Concatenate the cluster centroids for this split of fine codes - sx = reduce(lambda acc, c: np.concatenate((acc, subC[c[0]][c[1]])), enumerate(fc), []) + sx = reduce(lambda acc, c: np.concatenate( + (acc, subC[c[0]][c[1]])), enumerate(fc), []) # Project residual out of local space cluster = coarse_codes[split] @@ -628,20 +688,21 @@ def reconstruct(self, codes): def get_subquantizer_distances(self, x, coarse_codes, coarse_split=None): """ - Project a given query vector to the local space of the given coarse codes - and compute the distances of each subvector to the corresponding subquantizer - clusters. + Project a given query vector to the local space of the given coarse + codes and compute the distances of each subvector to the corresponding + subquantizer clusters. :param ndarray x: a query vector :param tuple coarse_codes: the coarse codes defining which local space to project to :param int coarse_split: - index of the coarse split to get distances for - if None then all splits - are computed + index of the coarse split to get distances for. + If None then all splits are computed :returns list: - a list of distances to each subquantizer cluster for each subquantizer + A list of distances to each subquantizer cluster + for each subquantizer. """ px = self.project(x, coarse_codes) @@ -650,12 +711,15 @@ def get_subquantizer_distances(self, x, coarse_codes, coarse_split=None): if coarse_split is None: split_iter = iterate_splits(px, self.num_coarse_splits) else: - split_iter = [(np.split(px, self.num_coarse_splits)[coarse_split], coarse_split)] + split_iter = [(np.split(px, self.num_coarse_splits) + [coarse_split], coarse_split)] # for cx, split in iterate_splits(px, self.num_coarse_splits): for cx, split in split_iter: _, _, _, subC = self.get_split_parameters(split) - subquantizer_dists += [((fx - subC[sub_split]) ** 2).sum(axis=1) for fx, sub_split in iterate_splits(cx, self.num_fine_splits)] + subquantizer_dists += [ + ((fx - subC[sub_split]) ** 2).sum(axis=1) + for fx, sub_split in iterate_splits(cx, self.num_fine_splits)] return subquantizer_dists @@ -666,12 +730,13 @@ def get_coarse_codes_for_cell_id(self, cell_id): return (int(np.floor(float(cell_id) / self.V)), cell_id % self.V) def export_mat(self, filename): - """ - Export model parameters in .mat file format. + """ Export model parameters in .mat file format. - Splits in the parameters (coarse splits and fine splits) are concatenated together in the - resulting arrays. For example, the Cs paramaters become a 2 x V x D array where the first dimension - indexes the split. The subquantizer centroids are encoded similarly as a 2 x (M/2) x 256 x (D/M) array. + Splits in the parameters (coarse splits and fine splits) + are concatenated together in the resulting arrays. For example, the Cs + paramaters become a 2 x V x D array where the first dimension indexes + the split. The subquantizer centroids are encoded similarly + as a 2 x (M/2) x 256 x (D/M) array. """ from scipy.io import savemat from .utils import concat_new_first @@ -681,12 +746,14 @@ def export_mat(self, filename): mus = concat_new_first(self.mus) subs = concat_new_first(map(concat_new_first, self.subquantizers)) - savemat(filename, {'Cs': Cs, 'Rs': Rs, 'mus': mus, 'subs': subs, 'V': self.V, 'M': self.M}) + savemat(filename, {'Cs': Cs, 'Rs': Rs, 'mus': mus, + 'subs': subs, 'V': self.V, 'M': self.M}) @staticmethod def load_mat(filename): """ - Reconstitute an LOPQModel in the format exported by the `export_mat` method above. + Reconstitute an LOPQModel in the format exported + by the `export_mat` method above. """ from scipy.io import loadmat @@ -697,7 +764,9 @@ def load_mat(filename): Rs = tuple(map(np.squeeze, np.split(d['Rs'], 2, axis=0))) mus = tuple(map(np.squeeze, np.split(d['mus'], 2, axis=0))) - subs = tuple([map(np.squeeze, np.split(half, M / 2, axis=0)) for half in map(np.squeeze, np.split(d['subs'], 2, axis=0))]) + subs = tuple([ + map(np.squeeze, np.split(half, M / 2, axis=0)) + for half in map(np.squeeze, np.split(d['subs'], 2, axis=0))]) return LOPQModel(parameters=(Cs, Rs, mus, subs)) @@ -723,16 +792,16 @@ def vector_from_ndarray(m, a): m.values.extend(map(float, np.nditer(a, order='C'))) return m - if self.Cs != None: + if self.Cs is not None: for C in self.Cs: matrix_from_ndarray(lopq_params.Cs.add(), C) - if self.Rs != None: + if self.Rs is not None: for R in chain(*self.Rs): matrix_from_ndarray(lopq_params.Rs.add(), R) - if self.mus != None: + if self.mus is not None: for mu in chain(*self.mus): vector_from_ndarray(lopq_params.mus.add(), mu) - if self.subquantizers != None: + if self.subquantizers is not None: for sub in chain(*self.subquantizers): matrix_from_ndarray(lopq_params.subs.add(), sub) @@ -763,14 +832,17 @@ def halves(arr): if len(lopq_params.Cs) != 0: Cs = [np.reshape(C.values, C.shape) for C in lopq_params.Cs] if len(lopq_params.Rs) != 0: - Rs = map(concat_new_first, halves([np.reshape(R.values, R.shape) for R in lopq_params.Rs])) + Rs = map(concat_new_first, halves( + [np.reshape(R.values, R.shape) for R in lopq_params.Rs])) if len(lopq_params.mus) != 0: - mus = map(concat_new_first, halves([np.array(mu.values) for mu in lopq_params.mus])) + mus = map(concat_new_first, halves( + [np.array(mu.values) for mu in lopq_params.mus])) if len(lopq_params.subs) != 0: - subs = halves([np.reshape(sub.values, sub.shape) for sub in lopq_params.subs]) + subs = halves([np.reshape(sub.values, sub.shape) + for sub in lopq_params.subs]) return LOPQModel(parameters=(Cs, Rs, mus, subs)) except IOError: - print filename + ": Could not open file." + print(filename + ": Could not open file.") return None diff --git a/python/lopq/search.py b/python/lopq/search.py index 4fbd425..5b6fdcc 100644 --- a/python/lopq/search.py +++ b/python/lopq/search.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import heapq from collections import defaultdict, namedtuple from itertools import count @@ -12,7 +13,8 @@ def multisequence(x, centroids): """ Implementation of multi-sequence algorithm for traversing a multi-index. - The algorithm is described in http://download.yandex.ru/company/cvpr2012.pdf. + The algorithm is described in: + - http://download.yandex.ru/company/cvpr2012.pdf. :param ndarray x: a query vector @@ -100,8 +102,8 @@ def add_data(self, data, ids=None, num_procs=1): def get_result_quota(self, x, quota=10): """ - Given a query vector and result quota, retrieve as many cells as necessary - to fill the quota. + Given a query vector and result quota, retrieve as many cells + as necessary to fill the quota. :param ndarray x: a query vector @@ -126,9 +128,10 @@ def get_result_quota(self, x, quota=10): def compute_distances(self, x, items): """ - Given a query and a list of index items, compute the approximate distance of the query - to each item and return a list of tuples that contain the distance and the item. - Memoize subquantizer distances per coarse cluster to save work. + Given a query and a list of index items, compute the approximate + distance of the query to each item and return a list of tuples + that contain the distance and the item. Memoize subquantizer + distances per coarse cluster to save work. :param ndarray x: a query vector @@ -146,10 +149,12 @@ def get_subquantizer_distances(x, coarse): c0, c1 = coarse if c0 not in d0: - d0[c0] = self.model.get_subquantizer_distances(x, coarse, coarse_split=0) + d0[c0] = self.model.get_subquantizer_distances( + x, coarse, coarse_split=0) if c1 not in d1: - d1[c1] = self.model.get_subquantizer_distances(x, coarse, coarse_split=1) + d1[c1] = self.model.get_subquantizer_distances( + x, coarse, coarse_split=1) return d0[c0] + d1[c1] @@ -160,7 +165,8 @@ def get_subquantizer_distances(x, coarse): coarse, fine = codes subquantizer_distances = get_subquantizer_distances(x, coarse) - dist = sum([subquantizer_distances[i][fc] for i, fc in enumerate(fine)]) + dist = sum([subquantizer_distances[i][fc] + for i, fc in enumerate(fine)]) results.append((dist, item)) @@ -168,8 +174,8 @@ def get_subquantizer_distances(x, coarse): def search(self, x, quota=10, limit=None, with_dists=False): """ - Return euclidean distance ranked results, along with the number of cells - traversed to fill the quota. + Return euclidean distance ranked results, along with the number + of cells traversed to fill the quota. :param ndarray x: a query vector @@ -178,7 +184,8 @@ def search(self, x, quota=10, limit=None, with_dists=False): :param int limit: the number of desired results to return - defaults to quota :param bool with_dists: - boolean indicating whether result items should be returned with their distance + boolean indicating whether result items should be returned + with their distance :returns list results: the list of ranked results @@ -232,12 +239,13 @@ def get_cell(self, cell): """ raise NotImplementedError() + class LOPQSearcher(LOPQSearcherBase): def __init__(self, model): """ - Create an LOPQSearcher instance that encapsulates retrieving and ranking - with LOPQ. Requires an LOPQModel instance. This class uses a Python dict - to implement the index. + Create an LOPQSearcher instance that encapsulates retrieving + and ranking with LOPQ. Requires an LOPQModel instance. This class + uses a Python dict to implement the index. :param LOPQModel model: the model for indexing and ranking @@ -279,17 +287,17 @@ def get_cell(self, cell): class LOPQSearcherLMDB(LOPQSearcherBase): def __init__(self, model, lmdb_path, id_lambda=int): """ - Create an LOPQSearcher instance that encapsulates retrieving and ranking - with LOPQ. Requires an LOPQModel instance. This class uses an lmbd database - to implement the index. + Create an LOPQSearcher instance that encapsulates retrieving + and ranking with LOPQ. Requires an LOPQModel instance. + This class uses an lmbd database to implement the index. :param LOPQModel model: the model for indexing and ranking :param str lmdb_path: path for the lmdb database; if it does not exist it is created :param callable id_lambda: - a lambda function to reconstruct item ids from their string representation - (computed by calling `bytes`) during retrieval + a lambda function to reconstruct item ids from their string + representation (computed by calling `bytes`) during retrieval """ import lmdb @@ -297,7 +305,11 @@ def __init__(self, model, lmdb_path, id_lambda=int): self.lmdb_path = lmdb_path self.id_lambda = id_lambda - self.env = lmdb.open(self.lmdb_path, map_size=1024*2000000*2, writemap=False, map_async=True, max_dbs=1) + self.env = lmdb.open(self.lmdb_path, + map_size=1024 * 2000000 * 2, + writemap=False, + map_async=True, + max_dbs=1) self.index_db = self.env.open_db("index") def encode_cell(self, cell): diff --git a/python/lopq/utils.py b/python/lopq/utils.py index 7275985..7a7c315 100644 --- a/python/lopq/utils.py +++ b/python/lopq/utils.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import numpy as np import multiprocessing from itertools import chain @@ -79,7 +80,8 @@ def load_xvecs(filename, base_type='f', max_num=None): if j == 0: np.uint32(struct.unpack(format_code, f.read(4))) else: - A[i, j - 1] = py_type(struct.unpack(format_code, f.read(format_size))[0]) + A[i, j - 1] = py_type(struct.unpack(format_code, + f.read(format_size))[0]) f.close() return np.squeeze(A) @@ -117,7 +119,8 @@ def save_xvecs(data, filename, base_type='f'): def parmap(f, X, nprocs=multiprocessing.cpu_count()): """ - Parallel map implementation adapted from http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class + Parallel map implementation adapted from + http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class # noqa """ def func_wrap(f, q_in, q_out): @@ -130,7 +133,8 @@ def func_wrap(f, q_in, q_out): q_in = multiprocessing.Queue(1) q_out = multiprocessing.Queue() - proc = [multiprocessing.Process(target=func_wrap, args=(f, q_in, q_out)) for _ in range(nprocs)] + proc = [multiprocessing.Process(target=func_wrap, args=( + f, q_in, q_out)) for _ in range(nprocs)] for p in proc: p.daemon = True p.start() @@ -147,21 +151,25 @@ def func_wrap(f, q_in, q_out): def get_chunk_ranges(N, num_procs): """ - A helper that given a number N representing the size of an iterable and the num_procs over which to - divide the data return a list of (start_index, end_index) pairs that divide the data as evenly as possible - into num_procs buckets. + A helper that given a number N representing the size of an iterable + and the num_procs over which to divide the data return a list + of (start_index, end_index) pairs that divide the data as evenly + as possible into num_procs buckets. """ per_thread = N / num_procs allocation = [per_thread] * num_procs allocation[0] += N - num_procs * per_thread - data_ranges = [0] + reduce(lambda acc, num: acc + [num + (acc[-1] if len(acc) else 0)], allocation, []) - data_ranges = [(data_ranges[i], data_ranges[i + 1]) for i in range(len(data_ranges) - 1)] + data_ranges = [0] + reduce( + lambda acc, num: acc + [num + (acc[-1] if len(acc) else 0)], + allocation, []) + data_ranges = [(data_ranges[i], data_ranges[i + 1]) + for i in range(len(data_ranges) - 1)] return data_ranges def compute_codes_parallel(data, model, num_procs=4): """ - A helper function that parallelizes the computation of LOPQ codes in + A helper function that parallelizes the computation of LOPQ codes in a configurable number of processes. :param ndarray data: diff --git a/python/setup.py b/python/setup.py index f7aa601..d00a874 100644 --- a/python/setup.py +++ b/python/setup.py @@ -1,7 +1,8 @@ #!/usr/bin/env python # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import os import json from setuptools import setup @@ -13,42 +14,12 @@ # Long description of package -LONG_DESCRIPTION = """ -# Locally Optimized Product Quantization +def get_package_description(): + with open("DESCRIPTION.md", "r") as description: + return "".join(description.readlines()) -This is Python training and testing code for Locally Optimized Product Quantization (LOPQ) models, as well as Spark scripts to scale training to hundreds of millions of vectors. The resulting model can be used in Python with code provided here or deployed via a Protobuf format to, e.g., search backends for high performance approximate nearest neighbor search. -### Overview - -Locally Optimized Product Quantization (LOPQ) [1] is a hierarchical quantization algorithm that produces codes of configurable length for data points. These codes are efficient representations of the original vector and can be used in a variety of ways depending on application, including as hashes that preserve locality, as a compressed vector from which an approximate vector in the data space can be reconstructed, and as a representation from which to compute an approximation of the Euclidean distance between points. - -Conceptually, the LOPQ quantization process can be broken into 4 phases. The training process also fits these phases to the data in the same order. - -1. The raw data vector is PCA'd to `D` dimensions (possibly the original dimensionality). This allows subsequent quantization to more efficiently represent the variation present in the data. -2. The PCA'd data is then product quantized [2] by two k-means quantizers. This means that each vector is split into two subvectors each of dimension `D / 2`, and each of the two subspaces is quantized independently with a vocabulary of size `V`. Since the two quantizations occur independently, the dimensions of the vectors are permuted such that the total variance in each of the two subspaces is approximately equal, which allows the two vocabularies to be equally important in terms of capturing the total variance of the data. This results in a pair of cluster ids that we refer to as "coarse codes". -3. The residuals of the data after coarse quantization are computed. The residuals are then locally projected independently for each coarse cluster. This projection is another application of PCA and dimension permutation on the residuals and it is "local" in the sense that there is a different projection for each cluster in each of the two coarse vocabularies. These local rotations make the next and final step, another application of product quantization, very efficient in capturing the variance of the residuals. -4. The locally projected data is then product quantized a final time by `M` subquantizers, resulting in `M` "fine codes". Usually the vocabulary for each of these subquantizers will be a power of 2 for effective storage in a search index. With vocabularies of size 256, the fine codes for each indexed vector will require `M` bytes to store in the index. - -The final LOPQ code for a vector is a `(coarse codes, fine codes)` pair, e.g. `((3, 2), (14, 164, 83, 49, 185, 29, 196, 250))`. - -### Nearest Neighbor Search - -A nearest neighbor index can be built from these LOPQ codes by indexing each document into its corresponding coarse code bucket. That is, each pair of coarse codes (which we refer to as a "cell") will index a bucket of the vectors quantizing to that cell. - -At query time, an incoming query vector undergoes substantially the same process. First, the query is split into coarse subvectors and the distance to each coarse centroid is computed. These distances can be used to efficiently compute a priority-ordered sequence of cells [3] such that cells later in the sequence are less likely to have near neighbors of the query than earlier cells. The items in cell buckets are retrieved in this order until some desired quota has been met. - -After this retrieval phase, the fine codes are used to rank by approximate Euclidean distance. The query is projected into each local space and the distance to each indexed item is estimated as the sum of the squared distances of the query subvectors to the corresponding subquantizer centroid indexed by the fine codes. - -NN search with LOPQ is highly scalable and has excellent properties in terms of both index storage requirements and query-time latencies when implemented well. - -#### References - -For more information and performance benchmarks can be found at http://image.ntua.gr/iva/research/lopq/. - -1. Y. Kalantidis, Y. Avrithis. [Locally Optimized Product Quantization for Approximate Nearest Neighbor Search.](http://image.ntua.gr/iva/files/lopq.pdf) CVPR 2014. -2. H. Jegou, M. Douze, and C. Schmid. [Product quantization for nearest neighbor search.](https://lear.inrialpes.fr/pubs/2011/JDS11/jegou_searching_with_quantization.pdf) PAMI, 33(1), 2011. -3. A. Babenko and V. Lempitsky. [The inverted multi-index.](http://www.computer.org/csdl/trans/tp/preprint/06915715.pdf) CVPR 2012. -""" +LONG_DESCRIPTION = get_package_description() # Create a dictionary of our arguments, this way this script can be imported # without running setup() to allow external scripts to see the setup settings. @@ -59,10 +30,14 @@ 'author_email': 'clayton@yahoo-inc.com', 'url': 'http://github.com/yahoo/lopq', 'license': 'Apache-2.0', - 'keywords': ['lopq', 'locally optimized product quantization', 'product quantization', 'compression', 'ann', 'approximate nearest neighbor', 'similarity search'], + 'keywords': ['lopq', 'locally optimized product quantization', + 'product quantization', 'compression', 'ann', + 'approximate nearest neighbor', 'similarity search'], 'packages': ['lopq'], 'long_description': LONG_DESCRIPTION, - 'description': 'Python code for training and deploying Locally Optimized Product Quantization (LOPQ) for approximate nearest neighbor search of high dimensional data.', + 'description': 'Python code for training and deploying Locally Optimized' + 'Product Quantization (LOPQ) for approximate nearest' + 'neighbor search of high dimensional data.', 'classifiers': [ 'Development Status :: 5 - Production/Stable', 'License :: OSI Approved :: Apache Software License', @@ -83,7 +58,11 @@ }, 'platforms': 'Windows,Linux,Solaris,Mac OS-X,Unix', 'include_package_data': True, - 'install_requires': ['protobuf>=2.6', 'numpy>=1.9', 'scipy>=0.14', 'scikit-learn>=0.18', 'lmdb>=0.87'] + 'install_requires': ['protobuf>=2.6', + 'numpy>=1.9', + 'scipy>=0.14', + 'scikit-learn>=0.18', + 'lmdb>=0.87'] } @@ -153,7 +132,7 @@ def add_scripts_to_package(): def get_and_update_package_metadata(): """ - Update the package metadata for this package if we are building the package. + Update the package metadata when the package is being built. :return:metadata - Dictionary of metadata information """ global setup_arguments diff --git a/python/test/tests.py b/python/test/tests.py index 96832d0..4980d35 100644 --- a/python/test/tests.py +++ b/python/test/tests.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. from nose.tools import assert_true, assert_equal import pickle as pkl @@ -8,17 +9,20 @@ import numpy as np from sklearn.model_selection import train_test_split -sys.path.insert(1, os.path.abspath('..')) -from lopq.model import LOPQModel, eigenvalue_allocation, accumulate_covariance_estimators, compute_rotations_from_accumulators +sys.path.insert(1, os.path.abspath('..')) # noqa +from lopq.model import (LOPQModel, eigenvalue_allocation, + accumulate_covariance_estimators, + compute_rotations_from_accumulators) from lopq.search import LOPQSearcher, LOPQSearcherLMDB from lopq.eval import compute_all_neighbors, get_cell_histogram, get_recall -######################################## +# ============================================================================ # Helpers -######################################## +# ============================================================================ -relpath = lambda x: os.path.abspath(os.path.join(os.path.dirname(__file__), x)) +def relpath(x): + return os.path.abspath(os.path.join(os.path.dirname(__file__), x)) def load_oxford_data(): @@ -33,13 +37,14 @@ def make_random_model(): m.fit(np.random.RandomState(42).rand(200, 8), n_init=1) return m -######################################## +# ============================================================================ # Tests -######################################## +# ============================================================================ def test_eigenvalue_allocation(): - a = pkl.load(open(relpath('./testdata/test_eigenvalue_allocation_input.pkl'))) + a = pkl.load( + open(relpath('./testdata/test_eigenvalue_allocation_input.pkl'))) vals, vecs = np.linalg.eigh(a) res = eigenvalue_allocation(4, vals) @@ -73,8 +78,10 @@ def test_eigenvalue_allocation_normalized_features(): def test_accumulate_covariance_estimators(): - data, centroids = pkl.load(open(relpath('./testdata/test_accumulate_covariance_estimators_input.pkl'))) - expected = pkl.load(open(relpath('./testdata/test_accumulate_covariance_estimators_output.pkl'))) + data, centroids = pkl.load(open(relpath( + './testdata/test_accumulate_covariance_estimators_input.pkl'))) + expected = pkl.load(open(relpath( + './testdata/test_accumulate_covariance_estimators_output.pkl'))) actual = accumulate_covariance_estimators(data, centroids) @@ -96,8 +103,10 @@ def test_accumulate_covariance_estimators(): def test_compute_rotations_from_accumulators(): - A, mu, count, num_buckets = pkl.load(open(relpath('./testdata/test_compute_rotations_from_accumulators_input.pkl'))) - expected = pkl.load(open(relpath('./testdata/test_compute_rotations_from_accumulators_output.pkl'))) + A, mu, count, num_buckets = pkl.load(open(relpath( + './testdata/test_compute_rotations_from_accumulators_input.pkl'))) + expected = pkl.load(open(relpath( + './testdata/test_compute_rotations_from_accumulators_output.pkl'))) actual = compute_rotations_from_accumulators(A, mu, count, num_buckets) @@ -113,7 +122,8 @@ def test_reconstruction(): code = ((0, 1), (0, 1, 2, 3)) r = m.reconstruct(code) - expected = [-2.27444688, 6.47126941, 4.5042611, 4.76683476, 0.83671082, 9.36027283, 8.11780532, 6.34846377] + expected = [-2.27444688, 6.47126941, 4.5042611, 4.76683476, + 0.83671082, 9.36027283, 8.11780532, 6.34846377] assert_true(np.allclose(expected, r)) @@ -122,9 +132,11 @@ def test_oxford5k(): random_state = 40 data = load_oxford_data() - train, test = train_test_split(data, test_size=0.2, random_state=random_state) + train, test = train_test_split( + data, test_size=0.2, random_state=random_state) - # Compute distance-sorted neighbors in training set for each point in test set + # Compute distance-sorted neighbors in training set for each point + # in test set nns = compute_all_neighbors(test, train) # Fit model @@ -132,7 +144,8 @@ def test_oxford5k(): m.fit(train, n_init=1, random_state=random_state) # Assert correct code computation - assert_equal(m.predict(test[0]), ((3, 2), (14, 164, 83, 49, 185, 29, 196, 250))) + assert_equal(m.predict(test[0]), + ((3, 2), (14, 164, 83, 49, 185, 29, 196, 250))) # Assert low number of empty cells h = get_cell_histogram(train, m) @@ -277,7 +290,7 @@ def test_proto_partial(): import os filename = './temp_proto_partial.lopq' - c = (np.random.rand(8, 8), np.random.rand(8,8)) + c = (np.random.rand(8, 8), np.random.rand(8, 8)) m = LOPQModel(parameters=(c, None, None, None)) m.export_proto(filename) m2 = LOPQModel.load_proto(filename) @@ -289,6 +302,6 @@ def test_proto_partial(): assert_true(np.allclose(m.Cs[0], m2.Cs[0])) assert_true(m.Rs == m2.Rs) assert_true(m.mus == m2.mus) - assert_true(m.subquantizers == m.subquantizers) + assert_true(m.subquantizers == m.subquantizers) os.remove(filename) diff --git a/scripts/example.py b/scripts/example.py index 02b21e8..dcb0e92 100644 --- a/scripts/example.py +++ b/scripts/example.py @@ -1,10 +1,12 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import sys import os -# Add the lopq module - not needed if they are available in the python environment -sys.path.append(os.path.abspath('../python')) +# Add the lopq module - not needed if they are available +# in the python environment +sys.path.append(os.path.abspath('../python')) # noqa import numpy as np from sklearn.cross_validation import train_test_split @@ -26,8 +28,8 @@ def pca(data): A simple PCA implementation that demonstrates how eigenvalue allocation is used to permute dimensions in order to balance the variance across subvectors. There are plenty of PCA implementations elsewhere. What is - important is that the eigenvalues can be used to compute a variance-balancing - dimension permutation. + important is that the eigenvalues can be used to compute + a variance-balancing dimension permutation. """ # Compute mean @@ -35,13 +37,15 @@ def pca(data): mu = data.sum(axis=0) / float(count) # Compute covariance - summed_covar = reduce(lambda acc, x: acc + np.outer(x, x), data, np.zeros((D, D))) + summed_covar = reduce(lambda acc, x: acc + np.outer(x, x), + data, np.zeros((D, D))) A = summed_covar / (count - 1) - np.outer(mu, mu) # Compute eigen decomposition eigenvalues, P = np.linalg.eigh(A) - # Compute a permutation of dimensions to balance variance among 2 subvectors + # Compute a permutation of dimensions to balance variance among + # 2 subvectors permuted_inds = eigenvalue_allocation(2, eigenvalues) # Build the permutation into the rotation matrix. One can alternately keep @@ -77,27 +81,30 @@ def main(): # a set of queries for which we will compute the true nearest neighbors. train, test = train_test_split(data, test_size=0.2) - # Compute distance-sorted neighbors in training set for each point in test set. - # These will be our groundtruth for recall evaluation. + # Compute distance-sorted neighbors in training set for each point + # in test set. These will be our groundtruth for recall evaluation. nns = compute_all_neighbors(test, train) # Fit model m = LOPQModel(V=16, M=8) m.fit(train, n_init=1) - # Note that we didn't specify a random seed for fitting the model, so different - # runs will be different. You may also see a warning that some local projections - # can't be estimated because too few points fall in a cluster. This is ok for the - # purposes of this demo, but you might want to avoid this by increasing the amount - # of training data or decreasing the number of clusters (the V hyperparameter). - - # With a model in hand, we can test it's recall. We populate a LOPQSearcher - # instance with data and get recall stats. By default, we will retrieve 1000 - # ranked results for each query vector for recall evaluation. + # Note that we didn't specify a random seed for fitting the model, + # so different runs will be different. You may also see a warning that + # some local projections can't be estimated because too few points fall + # in a cluster. This is ok for the purposes of this demo, but you might + # want to avoid this by increasing the amount of training data + # or decreasing the number of clusters (the V hyperparameter). + + # With a model in hand, we can test it's recall. We populate + # a LOPQSearcher instance with data and get recall stats. By default, + # we will retrieve 1000 ranked results for each query vector for recall + # evaluation. searcher = LOPQSearcher(m) searcher.add_data(train) recall, _ = get_recall(searcher, test, nns) - print 'Recall (V=%d, M=%d, subquants=%d): %s' % (m.V, m.M, m.subquantizer_clusters, str(recall)) + print 'Recall (V=%d, M=%d, subquants=%d): %s' % ( + m.V, m.M, m.subquantizer_clusters, str(recall)) # We can experiment with other hyperparameters without discarding all # parameters everytime. Here we train a new model that uses the same coarse @@ -109,25 +116,29 @@ def main(): searcher = LOPQSearcher(m2) searcher.add_data(train) recall, _ = get_recall(searcher, test, nns) - print 'Recall (V=%d, M=%d, subquants=%d): %s' % (m2.V, m2.M, m2.subquantizer_clusters, str(recall)) + print 'Recall (V=%d, M=%d, subquants=%d): %s' % ( + m2.V, m2.M, m2.subquantizer_clusters, str(recall)) - # The recall is probably higher. We got better recall with a finer quantization - # at the expense of more data required for index items. + # The recall is probably higher. We got better recall with a finer + # quantization at the expense of more data required for index items. # We can also hold both coarse quantizers and rotations fixed and see what # increasing the number of subquantizer clusters does to performance. - m3 = LOPQModel(V=16, M=8, subquantizer_clusters=512, parameters=(m.Cs, m.Rs, m.mus, None)) + m3 = LOPQModel(V=16, M=8, subquantizer_clusters=512, + parameters=(m.Cs, m.Rs, m.mus, None)) m3.fit(train, n_init=1) searcher = LOPQSearcher(m3) searcher.add_data(train) recall, _ = get_recall(searcher, test, nns) - print 'Recall (V=%d, M=%d, subquants=%d): %s' % (m3.V, m3.M, m3.subquantizer_clusters, str(recall)) - - # The recall is probably better than the first but worse than the second. We increased recall - # only a little by increasing the number of model parameters (double the subquatizer centroids), - # the index storage requirement (another bit for each fine code), and distance computation time - # (double the subquantizer centroids). + print 'Recall (V=%d, M=%d, subquants=%d): %s' % ( + m3.V, m3.M, m3.subquantizer_clusters, str(recall)) + + # The recall is probably better than the first but worse than the second. + # We increased recall only a little by increasing the number of model + # parameters (double the subquatizer centroids), the index storage + # requirement (another bit for each fine code), and distance computation + # time (double the subquantizer centroids). if __name__ == '__main__': diff --git a/scripts/query_runtime.py b/scripts/query_runtime.py index 795fd6a..2ad3c4f 100644 --- a/scripts/query_runtime.py +++ b/scripts/query_runtime.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. from math import sqrt, ceil @@ -9,7 +10,11 @@ def multiseq_flops(V, D): compute the number of flops to required to rank each coarse vocabulary to the query. """ - # (total coarse vocabulary) * (dims per coarse split) * (flops per squared distance) + # ( + # (total coarse vocabulary) * + # (dims per coarse split) * + # (flops per squared distance) + # ) return (2 * V) * (D / 2) * 2 @@ -28,7 +33,12 @@ def subquantizer_flops(D, M, clusters=256): subquantizer vocabulary size, compute the number of flops to compute a projected query's LOPQ distance for a single half of the query. """ - # (subquants per half) * (dims per subquant) * (cluster per subquant) * (flops per squared distance) + # ( + # (subquants per half) * + # (dims per subquant) * + # (cluster per subquant) * + # (flops per squared distance) + # ) return (M / 2) * (D / M) * clusters * 2 @@ -37,10 +47,11 @@ def total_rank_flops(D, M, N, cells, badness=0.5): Given the dimension of the data, the number of subquantizers, the number of results to rank, the number of multi-index cells retrieved by the query, and a badness measure that interpolates between best case and worst case - in terms of reusable (cacheable) subquantizer distance computations, compute - the number of flops to rank the N results. + in terms of reusable (cacheable) subquantizer distance computations, + compute the number of flops to rank the N results. - The 'badness' will vary query to query and is determined by the data distribution. + The 'badness' will vary query to query and is determined by the data + distribution. """ # Corresponds to traversing a row or column of the multi-index grid worst_case = cells + 1 @@ -51,8 +62,12 @@ def total_rank_flops(D, M, N, cells, badness=0.5): # Interpolated number of clusters num_clusters = ceil(worst_case * badness + best_case * (1 - badness)) - # (total local projections required) + (total number of sums to compute distance) - return num_clusters * (cluster_rotation_flops(D) + subquantizer_flops(D, M)) + N * (M - 1) + # ( + # (total local projections required) + + # (total number of sums to compute distance) + # ) + return num_clusters * (cluster_rotation_flops(D) + subquantizer_flops(D, + M)) + N * (M - 1) def brute_force_flops(D, N): diff --git a/spark/compute_codes.py b/spark/compute_codes.py index b9c3412..9e23e56 100644 --- a/spark/compute_codes.py +++ b/spark/compute_codes.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. from pyspark.context import SparkContext import cPickle as pkl @@ -10,21 +11,24 @@ def default_data_loading(sc, data_path, sampling_ratio, seed): + """ This function loads data from a text file, sampling it by the provided + ratio and random seed, and interprets each line as a tab-separated + (id, data) pair where 'data' is assumed to be a base64-encoded pickled + numpy array. The data is returned as an RDD of (id, numpy array) tuples. """ - This function loads data from a text file, sampling it by the provided - ratio and random seed, and interprets each line as a tab-separated (id, data) pair - where 'data' is assumed to be a base64-encoded pickled numpy array. - The data is returned as an RDD of (id, numpy array) tuples. - """ - # Compute the number of cores in our cluster - used below to heuristically set the number of partitions - total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores')) + # Compute the number of cores in our cluster. + # used below to heuristically set the number of partitions + total_cores = int(sc._conf.get('spark.executor.instances') + ) * int(sc._conf.get('spark.executor.cores')) # Load and sample down the dataset - d = sc.textFile(data_path, total_cores * 3).sample(False, sampling_ratio, seed) + d = (sc.textFile(data_path, total_cores * 3) + .sample(False, sampling_ratio, seed)) # The data is (id, vector) tab-delimited pairs where each vector is # a base64-encoded pickled numpy array - d = d.map(lambda x: x.split('\t')).map(lambda x: (x[0], pkl.loads(base64.decodestring(x[1])))) + d = d.map(lambda x: x.split('\t')).map( + lambda x: (x[0], pkl.loads(base64.decodestring(x[1])))) return d @@ -45,24 +49,40 @@ def main(sc, args, data_load_fn=default_data_loading): m = sc.broadcast(model) # Compute codes and convert to string - codes = d.map(lambda x: (x[0], m.value.predict(x[1]))).map(lambda x: '%s\t%s' % (x[0], json.dumps(x[1]))) + codes = d.map(lambda x: (x[0], m.value.predict(x[1]))).map( + lambda x: '%s\t%s' % (x[0], json.dumps(x[1]))) codes.saveAsTextFile(args.output) + if __name__ == "__main__": from argparse import ArgumentParser parser = ArgumentParser() # Data handling parameters - parser.add_argument('--data', dest='data', type=str, default=None, required=True, help='hdfs path to input data') - parser.add_argument('--data_udf', dest='data_udf', type=str, default=None, help='module name from which to load a data loading UDF') - parser.add_argument('--seed', dest='seed', type=int, default=None, help='optional random seed for sampling') - parser.add_argument('--sampling_ratio', dest='sampling_ratio', type=float, default=1.0, help='proportion of data to sample for model application') - parser.add_argument('--output', dest='output', type=str, default=None, required=True, help='hdfs path to output data') + parser.add_argument( + '--data', dest='data', type=str, default=None, + required=True, help='hdfs path to input data') + parser.add_argument( + '--data_udf', dest='data_udf', type=str, default=None, + help='module name from which to load a data loading UDF') + parser.add_argument( + '--seed', dest='seed', type=int, + default=None, help='optional random seed for sampling') + parser.add_argument( + '--sampling_ratio', dest='sampling_ratio', type=float, + default=1.0, help='proportion of data to sample for model application') + parser.add_argument( + '--output', dest='output', type=str, + default=None, required=True, help='hdfs path to output data') existing_model_group = parser.add_mutually_exclusive_group(required=True) - existing_model_group.add_argument('--model_pkl', dest='model_pkl', type=str, default=None, help='a pickled LOPQModel to evaluate on the data') - existing_model_group.add_argument('--model_proto', dest='model_proto', type=str, default=None, help='a protobuf LOPQModel to evaluate on the data') + existing_model_group.add_argument( + '--model_pkl', dest='model_pkl', type=str, + default=None, help='a pickled LOPQModel to evaluate on the data') + existing_model_group.add_argument( + '--model_proto', dest='model_proto', type=str, + default=None, help='a protobuf LOPQModel to evaluate on the data') args = parser.parse_args() diff --git a/spark/example_udf.py b/spark/example_udf.py index 1944579..81ea3d8 100644 --- a/spark/example_udf.py +++ b/spark/example_udf.py @@ -1,22 +1,26 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. import cPickle as pkl import base64 def udf(sc, data_path, sampling_ratio, seed): """ - This is an example UDF function to load training data. It loads data from a text file - with base64-encoded pickled numpy arrays on each line. + This is an example UDF function to load training data. It loads data from + a text file with base64-encoded pickled numpy arrays on each line. """ - # Compute the number of cores in our cluster - used below to heuristically set the number of partitions - total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores')) + # Compute the number of cores in our cluster. + # used below to heuristically set the number of partitions + total_cores = int(sc._conf.get('spark.executor.instances') + ) * int(sc._conf.get('spark.executor.cores')) # Load and sample down the dataset - d = sc.textFile(data_path, total_cores * 3).sample(False, sampling_ratio, seed) + d = (sc.textFile(data_path, total_cores * 3) + .sample(False, sampling_ratio, seed)) - deserialize_vec = lambda s: pkl.loads(base64.decodestring(s)) - vecs = d.map(deserialize_vec) + def deserialize_vec(s): + return pkl.loads(base64.decodestring(s)) - return vecs + return d.map(deserialize_vec) diff --git a/spark/pca_preparation.py b/spark/pca_preparation.py index 849899a..32c327b 100644 --- a/spark/pca_preparation.py +++ b/spark/pca_preparation.py @@ -1,15 +1,17 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. """ -This script illustrates how to prepare PCA parameters before using them in the LOPQ pipeline. - -The `pca_params` argument is the path to a pickle file containing PCA parameters like that -produced as a result of `train_pca.py`, and the `D` argument is the desired dimension of the -final feature. This script truncates then permutes the dimensions of the PCA matrix to balance -the variance across the the two halves of the final vector. +This script illustrates how to prepare PCA parameters before using them +in the LOPQ pipeline. + +The `pca_params` argument is the path to a pickle file containing PCA +parameters like that produced as a result of `train_pca.py`, and the `D` +argument is the desired dimension of the final feature. This script truncates +then permutes the dimensions of the PCA matrix to balance the variance across +the the two halves of the final vector. """ import cPickle as pkl -import base64 import numpy as np from lopq.model import eigenvalue_allocation @@ -23,14 +25,14 @@ def main(args): # Reduce dimension - eigenvalues assumed in ascending order E = E[-args.D:] - P = P[:,-args.D:] + P = P[:, -args.D:] # Balance variance across halves permuted_inds = eigenvalue_allocation(2, E) P = P[:, permuted_inds] # Save new params - pkl.dump({ 'P': P, 'mu': mu }, open(args.output, 'w')) + pkl.dump({'P': P, 'mu': mu}, open(args.output, 'w')) def apply_PCA(x, mu, P): @@ -44,9 +46,15 @@ def apply_PCA(x, mu, P): from argparse import ArgumentParser parser = ArgumentParser() - parser.add_argument('--pca_params', dest='pca_params', type=str, required=True, help='path to pickle file of PCA parameters') - parser.add_argument('--D', dest='D', type=int, default=128, help='desired final feature dimension') - parser.add_argument('--output', dest='output', type=str, required=True, help='path to pickle file of new PCA parameters') + parser.add_argument( + '--pca_params', dest='pca_params', type=str, + required=True, help='path to pickle file of PCA parameters') + parser.add_argument( + '--D', dest='D', type=int, default=128, + help='desired final feature dimension') + parser.add_argument( + '--output', dest='output', type=str, + required=True, help='path to pickle file of new PCA parameters') args = parser.parse_args() main(args) diff --git a/spark/train_model.py b/spark/train_model.py index d31c14c..8115617 100644 --- a/spark/train_model.py +++ b/spark/train_model.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. from pyspark.context import SparkContext import numpy as np @@ -21,21 +22,26 @@ def default_data_loading(sc, data_path, sampling_ratio, seed): - """ - This function loads training data from a text file, sampling it by the provided - ratio and random seed, and interprets each line as a tab-separated (id, data) pair - where 'data' is assumed to be a base64-encoded pickled numpy array. The ids are discarded. + """ This function loads training data from a text file, sampling it by + the provided ratio and random seed, and interprets each line as + a tab-separated (id, data) pair where 'data' is assumed to be + a base64-encoded pickled numpy array. The ids are discarded. The data is returned as an RDD of numpy arrays. """ - # Compute the number of cores in our cluster - used below to heuristically set the number of partitions - total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores')) + + # Compute the number of cores in our cluster. + # Used below to heuristically set the number of partitions + total_cores = int(sc._conf.get('spark.executor.instances') + ) * int(sc._conf.get('spark.executor.cores')) # Load and sample down the dataset - d = sc.textFile(data_path, total_cores * 3).sample(False, sampling_ratio, seed) + d = (sc.textFile(data_path, total_cores * 3) + .sample(False, sampling_ratio, seed)) # The data is (id, vector) tab-delimited pairs where each vector is # a base64-encoded pickled numpy array - deserialize_vec = lambda s: pkl.loads(base64.decodestring(s.split('\t')[1])) + def deserialize_vec(s): + return pkl.loads(base64.decodestring(s.split('\t')[1])) vecs = d.map(deserialize_vec) return vecs @@ -64,7 +70,8 @@ def train_coarse(sc, split_vecs, V, seed=None): first.cache() print 'Total training set size: %d' % first.count() print 'Starting training coarse quantizer...' - C0 = KMeans.train(first, V, initializationMode='random', maxIterations=10, seed=seed) + C0 = KMeans.train(first, V, initializationMode='random', + maxIterations=10, seed=seed) print '... done training coarse quantizer.' first.unpersist() @@ -72,7 +79,8 @@ def train_coarse(sc, split_vecs, V, seed=None): second = split_vecs.map(lambda x: x[1]) second.cache() print 'Starting training coarse quantizer...' - C1 = KMeans.train(second, V, initializationMode='random', maxIterations=10, seed=seed) + C1 = KMeans.train(second, V, initializationMode='random', + maxIterations=10, seed=seed) print '... done training coarse quantizer.' second.unpersist() @@ -80,8 +88,8 @@ def train_coarse(sc, split_vecs, V, seed=None): def train_rotations(sc, split_vecs, M, Cs): - """ - For compute rotations for each split of the data using given coarse quantizers. + """ For compute rotations for each split of the data using given coarse + quantizers. """ Rs = [] @@ -147,16 +155,15 @@ def seq_op(acc, x): def dict_to_ndarray(d, N): - """ - Helper for collating a dict with int keys into an ndarray. The value for a key - becomes the value at the corresponding index in the ndarray and indices missing - from the dict become zero ndarrays of the same dimension. + """ Helper for collating a dict with int keys into an ndarray. The value + for a key becomes the value at the corresponding index in the ndarray and + indices missing from the dict become zero ndarrays of the same dimension. :param dict d: a dict of (int, ndarray) or (int, number) key/values :param int N: - the size of the first dimension of the new ndarray (the rest of the dimensions - are determined by the shape of elements in d) + the size of the first dimension of the new ndarray (the rest of the + dimensions are determined by the shape of elements in d) """ el = d.values()[0] @@ -199,10 +206,10 @@ def compute_local_rotations(sc, data, model, num_buckets): return R, mu, count -def train_subquantizers(sc, split_vecs, M, subquantizer_clusters, model, seed=None): - """ - Project each data point into it's local space and compute subquantizers by clustering - each fine split of the locally projected data. +def train_subquantizers(sc, split_vecs, M, subquantizer_clusters, model, + seed=None): + """Project each data point into it's local space and compute subquantizers + by clustering each fine split of the locally projected data. """ b = sc.broadcast(model) @@ -221,19 +228,22 @@ def project_local(x): for split in xrange(M): data = split_vecs.map(lambda x: x[split]) data.cache() - sub = KMeans.train(data, subquantizer_clusters, initializationMode='random', maxIterations=10, seed=seed) + sub = KMeans.train(data, subquantizer_clusters, + initializationMode='random', + maxIterations=10, seed=seed) data.unpersist() subquantizers.append(np.vstack(sub.clusterCenters)) - return (subquantizers[:len(subquantizers) / 2], subquantizers[len(subquantizers) / 2:]) + return (subquantizers[:len(subquantizers) / 2], + subquantizers[len(subquantizers) / 2:]) def save_hdfs_pickle(m, pkl_path): """ - Given a python object and a path on hdfs, save the object as a pickle file locally and copy the file - to the hdfs path. + Given a python object and a path on hdfs, save the object as a pickle + file locally and copy the file to the hdfs path. """ - print 'Saving pickle to temp file...' + print('Saving pickle to temp file...') f = NamedTemporaryFile(delete=False) pkl.dump(m, f, -1) f.close() @@ -244,16 +254,16 @@ def save_hdfs_pickle(m, pkl_path): def save_hdfs_proto(m, proto_path): + """ Given an LOPQModel object and a path on hdfs, save the model + parameters as a protobuf file locally and copy the file + to the hdfs path. """ - Given an LOPQModel object and a path on hdfs, save the model parameters as a protobuf file locally and - copy the file to the hdfs path. - """ - print 'Saving protobuf to temp file...' + print('Saving protobuf to temp file...') f = NamedTemporaryFile(delete=False) m.export_proto(f) f.close() - print 'Copying proto file to hdfs...' + print('Copying proto file to hdfs...') copy_to_hdfs(f, proto_path) os.remove(f.name) @@ -263,8 +273,8 @@ def copy_to_hdfs(f, hdfs_path): def validate_arguments(args, model): - """ - Check provided command line arguments to ensure they are coherent. Provide feedback for potential errors. + """ Check provided command line arguments to ensure they are coherent. + Provide feedback for potential errors. """ # Parse steps @@ -272,95 +282,141 @@ def validate_arguments(args, model): # Check that the steps make sense if STEP_ROTATION not in args.steps and len(args.steps) == 2: - print 'Training steps invalid' + print('Training steps invalid') sys.exit(1) # Find parameters and warn of possibly unintentional discrepancies if args.V is None: if model is not None: args.V = model.V - print 'Parameter V not specified: using V=%d from provided model.' % model.V + print('Parameter V not specified: using V=%d from provided' + ' model.' % model.V) else: - print 'Parameter V not specified and no existing model provided. Exiting.' + print('Parameter V not specified and no existing model provided.' + ' Exiting.') sys.exit(1) else: if model is not None and model.V != args.V: if STEP_COARSE in args.steps: - print 'Parameter V differs between command line argument and provided model: ' + \ - 'coarse quantizers will be trained with V=%d' % args.V + print('Parameter V differs between command line argument and ' + 'provided model: coarse quantizers will be trained with' + ' V=%d' % args.V) else: - print 'Parameter V differs between command line argument and provided model: ' + \ - 'coarse quantizers must be retrained or this discrepancy corrected. Exiting.' + print('Parameter V differs between command line argument and ' + 'provided model: coarse quantizers must be retrained or' + ' this discrepancy corrected. Exiting.') sys.exit(1) if STEP_ROTATION in args.steps or STEP_SUBQUANT in args.steps: if args.M is None: if model is not None: args.M = model.M - print 'Parameter M not specified: using M=%d from provided model.' % model.M + print('Parameter M not specified: using M=%d from provided ' + 'model.' % model.M) else: - print 'Parameter M not specified and no existing model provided. Exiting.' + print('Parameter M not specified and no existing model ' + 'provided. Exiting.') sys.exit(1) else: if model is not None and model.M != args.M: if STEP_ROTATION in args.steps: - print 'Parameter M differs between command line argument and provided model: ' + \ - 'model will be trained with M=%d' % args.M + print('Parameter M differs between command line argument ' + 'and provided model: model will be trained with ' + 'M=%d' % args.M) else: - print 'Parameter M differs between command line argument and provided model: ' + \ - 'rotations must be retrained or this discrepancy corrected. Exiting.' + print('Parameter M differs between command line argument ' + 'and provided model: rotations must be retrained or' + ' this discrepancy corrected. Exiting.') sys.exit(1) if STEP_ROTATION in args.steps: - if STEP_COARSE not in args.steps and (model is None or model.Cs is None): - print 'Cannot train rotations without coarse quantizers. Either train coarse quantizers or provide an existing model. Exiting.' + is_invalid_model = (model is None or model.Cs is None) + if STEP_COARSE not in args.steps and is_invalid_model: + print('Cannot train rotations without coarse quantizers. Either ' + 'train coarse quantizers or provide an existing model. ' + 'Exiting.') sys.exit(1) if STEP_SUBQUANT in args.steps: - if STEP_COARSE not in args.steps and (model is None or model.Cs is None): - print 'Cannot train subquantizers without coarse quantizers. Either train coarse quantizers or provide an existing model. Exiting.' + is_invalid_model = (model is None or model.Cs is None) + if STEP_COARSE not in args.steps and is_invalid_model: + print('Cannot train subquantizers without coarse quantizers. ' + 'Either train coarse quantizers or provide an existing' + ' model. Exiting.') sys.exit(1) - if STEP_ROTATION not in args.steps and (model is None or model.Rs is None or model.mus is None): - print 'Cannot train subquantizers without rotations. Either train rotations or provide an existing model. Exiting.' + + is_invalid_model = (model is None or model.Rs is None + or model.mus is None) + if STEP_ROTATION not in args.steps and is_invalid_model: + print('Cannot train subquantizers without rotations. Either train' + ' rotations or provide an existing model. Exiting.') sys.exit(1) return args + if __name__ == "__main__": from argparse import ArgumentParser parser = ArgumentParser() # Data handling parameters - parser.add_argument('--data', dest='data', type=str, required=True, help='hdfs path to input data') - parser.add_argument('--data_udf', dest='data_udf', type=str, default=None, help='module name from which to load a data loading UDF') - parser.add_argument('--seed', dest='seed', type=int, default=None, help='optional random seed') - parser.add_argument('--sampling_ratio', dest='sampling_ratio', type=float, default=1.0, help='proportion of data to sample for training') - parser.add_argument('--subquantizer_sampling_ratio', dest='subquantizer_sampling_ratio', type=float, default=1.0, - help='proportion of data to subsample for subquantizer training') + parser.add_argument( + '--data', dest='data', type=str, + required=True, help='hdfs path to input data') + parser.add_argument( + '--data_udf', dest='data_udf', type=str, default=None, + help='module name from which to load a data loading UDF') + parser.add_argument( + '--seed', dest='seed', type=int, + default=None, help='optional random seed') + parser.add_argument( + '--sampling_ratio', dest='sampling_ratio', type=float, + default=1.0, help='proportion of data to sample for training') + parser.add_argument( + '--subquantizer_sampling_ratio', dest='subquantizer_sampling_ratio', + type=float, default=1.0, + help='proportion of data to subsample for subquantizer training') # Model parameters existing_model_group = parser.add_mutually_exclusive_group() - existing_model_group.add_argument('--existing_model_pkl', dest='existing_model_pkl', type=str, default=None, - help='a pickled LOPQModel from which to extract existing parameters') - existing_model_group.add_argument('--existing_model_proto', dest='existing_model_proto', type=str, default=None, - help='a protobuf of existing model parameters') + existing_model_group.add_argument( + '--existing_model_pkl', dest='existing_model_pkl', + type=str, default=None, + help='a pickled LOPQModel from which to extract existing parameters') + existing_model_group.add_argument( + '--existing_model_proto', dest='existing_model_proto', + type=str, default=None, + help='a protobuf of existing model parameters') # Model hyperparameters - parser.add_argument('--V', dest='V', type=int, default=None, help='number of coarse clusters') - parser.add_argument('--M', dest='M', type=int, default=None, help='total number of subquantizers') - parser.add_argument('--subquantizer_clusters', dest='subquantizer_clusters', type=int, default=256, help='number of subquantizer clusters') + parser.add_argument( + '--V', dest='V', type=int, + default=None, help='number of coarse clusters') + parser.add_argument( + '--M', dest='M', type=int, default=None, + help='total number of subquantizers') + parser.add_argument( + '--subquantizer_clusters', dest='subquantizer_clusters', + type=int, default=256, help='number of subquantizer clusters') # Training and output directives - parser.add_argument('--steps', dest='steps', type=str, default='0,1,2', - help='comma-separated list of integers indicating which steps of training to perform') - parser.add_argument('--model_pkl', dest='model_pkl', type=str, default=None, help='hdfs path to save pickle file of resulting LOPQModel') - parser.add_argument('--model_proto', dest='model_proto', type=str, default=None, help='hdfs path to save protobuf file of resulting model parameters') + parser.add_argument( + '--steps', dest='steps', type=str, default='0,1,2', + help='comma-separated list of integers indicating which' + 'steps of training to perform') + parser.add_argument( + '--model_pkl', dest='model_pkl', type=str, default=None, + help='hdfs path to save pickle file of resulting LOPQModel') + parser.add_argument( + '--model_proto', dest='model_proto', type=str, default=None, + help='hdfs path to save protobuf file of resulting model parameters') args = parser.parse_args() # Check that some output format was provided if args.model_pkl is None and args.model_proto is None: - parser.error('at least one of --model_pkl and --model_proto is required') + parser.error( + 'at least one of --model_pkl and --model_proto is required') # Load existing model if provided model = None @@ -372,8 +428,14 @@ def validate_arguments(args, model): args = validate_arguments(args, model) # Build descriptive app name - get_step_name = lambda x: {STEP_COARSE: 'coarse', STEP_ROTATION: 'rotations', STEP_SUBQUANT: 'subquantizers'}.get(x, None) - steps_str = ', '.join(filter(lambda x: x is not None, map(get_step_name, sorted(args.steps)))) + def get_step_name(x): + mapping = {STEP_COARSE: 'coarse', + STEP_ROTATION: 'rotations', + STEP_SUBQUANT: 'subquantizers'} + return mapping.get(x, None) + + steps_str = ', '.join(filter(lambda x: x is not None, + map(get_step_name, sorted(args.steps)))) APP_NAME = 'LOPQ{V=%d,M=%d}; training %s' % (args.V, args.M, steps_str) sc = SparkContext(appName=APP_NAME) @@ -404,15 +466,23 @@ def validate_arguments(args, model): # Get subquantizers if STEP_SUBQUANT in args.steps: - model = LOPQModel(V=args.V, M=args.M, subquantizer_clusters=args.subquantizer_clusters, parameters=(Cs, Rs, mus, None)) + model = LOPQModel( + V=args.V, M=args.M, + subquantizer_clusters=args.subquantizer_clusters, + parameters=(Cs, Rs, mus, None)) if args.subquantizer_sampling_ratio != 1.0: - data = data.sample(False, args.subquantizer_sampling_ratio, args.seed) + data = data.sample( + False, args.subquantizer_sampling_ratio, args.seed) - subs = train_subquantizers(sc, data, args.M, args.subquantizer_clusters, model, seed=args.seed) + subs = train_subquantizers( + sc, data, args.M, args.subquantizer_clusters, model, + seed=args.seed) # Final output model - model = LOPQModel(V=args.V, M=args.M, subquantizer_clusters=args.subquantizer_clusters, parameters=(Cs, Rs, mus, subs)) + model = LOPQModel( + V=args.V, M=args.M, subquantizer_clusters=args.subquantizer_clusters, + parameters=(Cs, Rs, mus, subs)) if args.model_pkl: save_hdfs_pickle(model, args.model_pkl) diff --git a/spark/train_pca.py b/spark/train_pca.py index 40ede09..659a23a 100644 --- a/spark/train_pca.py +++ b/spark/train_pca.py @@ -1,5 +1,6 @@ # Copyright 2015, Yahoo Inc. -# Licensed under the terms of the Apache License, Version 2.0. See the LICENSE file associated with the project for terms. +# Licensed under the terms of the Apache License, Version 2.0. +# See the LICENSE file associated with the project for terms. from pyspark.context import SparkContext import numpy as np @@ -13,20 +14,26 @@ def default_data_loading(sc, data_path, sampling_ratio, seed): """ - This function loads training data from a text file, sampling it by the provided - ratio and random seed, and interprets each line as a tab-separated (id, data) pair - where 'data' is assumed to be a base64-encoded pickled numpy array. The ids are discarded. + This function loads training data from a text file, + sampling it by the provided ratio and random seed, and interprets + each line as a tab-separated (id, data) pair where 'data' is assumed + to be a base64-encoded pickled numpy array. The ids are discarded. The data is returned as an RDD of numpy arrays. """ - # Compute the number of cores in our cluster - used below to heuristically set the number of partitions - total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores')) + # Compute the number of cores in our cluster - used below + # to heuristically set the number of partitions + total_cores = int(sc._conf.get('spark.executor.instances') + ) * int(sc._conf.get('spark.executor.cores')) # Load and sample down the dataset - d = sc.textFile(data_path, total_cores * 3).sample(False, sampling_ratio, seed) + d = (sc.textFile(data_path, total_cores * 3) + .sample(False, sampling_ratio, seed) + ) # The data is (id, vector) tab-delimited pairs where each vector is # a base64-encoded pickled numpy array - deserialize_vec = lambda s: pkl.loads(base64.decodestring(s.split('\t')[1])) + def deserialize_vec(s): + return pkl.loads(base64.decodestring(s.split('\t')[1])) vecs = d.map(deserialize_vec) return vecs @@ -55,7 +62,8 @@ def combOp(a, b): mu = mu / float(count) # Compute covariance estimator - summed_covar = d.treeAggregate(np.zeros((D, D)), seqOp, combOp, depth=args.agg_depth) + summed_covar = d.treeAggregate( + np.zeros((D, D)), seqOp, combOp, depth=args.agg_depth) A = summed_covar / (count - 1) - np.outer(mu, mu) E, P = np.linalg.eigh(A) @@ -73,8 +81,8 @@ def combOp(a, b): def save_hdfs_pickle(m, pkl_path): """ - Given a python object and a path on hdfs, save the object as a pickle file locally and copy the file - to the hdfs path. + Given a python object and a path on hdfs, save the object as a pickle + file locally and copy the file to the hdfs path. """ print 'Saving pickle to temp file...' f = NamedTemporaryFile(delete=False) @@ -95,13 +103,24 @@ def copy_to_hdfs(f, hdfs_path): parser = ArgumentParser() # Data handling parameters - parser.add_argument('--data', dest='data', type=str, required=True, help='hdfs path to input data') - parser.add_argument('--data_udf', dest='data_udf', type=str, default=None, help='module name from which to load a data loading UDF') - parser.add_argument('--seed', dest='seed', type=int, default=None, help='optional random seed') - parser.add_argument('--sampling_ratio', dest='sampling_ratio', type=float, default=1.0, help='proportion of data to sample for training') - parser.add_argument('--agg_depth', dest='agg_depth', type=int, default=4, help='depth of tree aggregation to compute covariance estimator') - - parser.add_argument('--output', dest='output', type=str, default=None, help='hdfs path to output pickle file of parameters') + parser.add_argument( + '--data', dest='data', type=str, + required=True, help='hdfs path to input data') + parser.add_argument( + '--data_udf', dest='data_udf', type=str, default=None, + help='module name from which to load a data loading UDF') + parser.add_argument( + '--seed', dest='seed', type=int, + default=None, help='optional random seed') + parser.add_argument( + '--sampling_ratio', dest='sampling_ratio', type=float, + default=1.0, help='proportion of data to sample for training') + parser.add_argument( + '--agg_depth', dest='agg_depth', type=int, default=4, + help='depth of tree aggregation to compute covariance estimator') + + parser.add_argument('--output', dest='output', type=str, default=None, + help='hdfs path to output pickle file of parameters') args = parser.parse_args()