Skip to content

Commit

Permalink
Cursor.getmulti() initital implementation (#201) (#252)
Browse files Browse the repository at this point in the history
Add a method getmulti on cursor that retrieves multiple values at a time

Contributed by willsthompson
  • Loading branch information
willsthompson authored Sep 29, 2020
1 parent 62a36e8 commit 509ebd7
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 0 deletions.
50 changes: 50 additions & 0 deletions lmdb/cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,56 @@ def get(self, key, default=None):
return self.value()
return default

def getmulti(self, keys, dupdata=False, dupfixed_bytes=None):
"""Returns an iterable of `(key, value)` 2-tuples containing results
for each key in the iterable `keys`.
`keys`:
Iterable to read keys from.
`dupdata`:
If ``True`` and database was opened with `dupsort=True`, read
all duplicate values for each matching key.
`dupfixed_bytes`:
If database was opened with `dupsort=True` and `dupfixed=True`,
accepts the size of each value, in bytes, and applies an
optimization reducing the number of database lookups.
"""
if dupfixed_bytes and dupfixed_bytes < 0:
raise _error("dupfixed_bytes must be a positive integer.")
elif dupfixed_bytes and not dupfixed_bytes:
raise _error("dupdata is required for dupfixed_bytes.")

if dupfixed_bytes:
get_op = _lib.MDB_GET_MULTIPLE
next_op = _lib.MDB_NEXT_MULTIPLE
else:
get_op = _lib.MDB_GET_CURRENT
next_op = _lib.MDB_NEXT_DUP

for key in keys:
if self.set_key(key):
while self._valid:
self._cursor_get(get_op)
preload(self._val)
key = self._to_py(self._key)
val = self._to_py(self._val)

if dupfixed_bytes:
gen = (
(key, val[i:i+dupfixed_bytes])
for i in range(0, len(val), dupfixed_bytes))
for k, v in gen:
yield k, v
else:
yield key, val

if dupdata:
self._cursor_get(next_op)
else:
break

def set_range(self, key):
"""Seek to the first key greater than or equal to `key`, returning
``True`` on success, or ``False`` to indicate key was past end of
Expand Down
147 changes: 147 additions & 0 deletions lmdb/cpython.c
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,152 @@ cursor_first_dup(CursorObject *self)
static PyObject *
cursor_value(CursorObject *self);

/**
* Cursor.getmulti() -> Iterable of (key, value)
*/
static PyObject *
cursor_get_multi(CursorObject *self, PyObject *args, PyObject *kwds)
{
struct cursor_get {
PyObject *keys;
int dupdata;
int dupfixed_bytes;
} arg = {Py_None, 0, 0};

int i, as_buffer;
PyObject *iter, *item, *tup, *key, *val;
PyObject *ret = PyList_New(0);
MDB_cursor_op get_op, next_op;
bool done;

static const struct argspec argspec[] = {
{"keys", ARG_OBJ, OFFSET(cursor_get, keys)},
{"dupdata", ARG_BOOL, OFFSET(cursor_get, dupdata)},
{"dupfixed_bytes", ARG_INT, OFFSET(cursor_get, dupfixed_bytes)} // ARG_SIZE?
};

static PyObject *cache = NULL;
if(parse_args(self->valid, SPECSIZE(), argspec, &cache, args, kwds, &arg)) {
return NULL;
}

if(arg.dupfixed_bytes < 0) {
return type_error("dupfixed_bytes must be a positive integer.");
}else if (arg.dupfixed_bytes > 0 && !arg.dupdata) {
return type_error("dupdata is required for dupfixed_bytes.");
}

if(! ((iter = PyObject_GetIter(arg.keys)))) {
return NULL;
}

/* Choose ops for dupfixed vs standard */
if(arg.dupfixed_bytes) {
get_op = MDB_GET_MULTIPLE;
next_op = MDB_NEXT_MULTIPLE;
} else {
get_op = MDB_GET_CURRENT;
next_op = MDB_NEXT_DUP;
}

as_buffer = self->trans->flags & TRANS_BUFFERS;

while((item = PyIter_Next(iter))) {
MDB_val mkey;

// validate item?

if(val_from_buffer(&mkey, item)) {
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
} /* val_from_buffer sets exception */

self->key = mkey;
if(_cursor_get_c(self, MDB_SET_KEY)) { // MDB_SET?
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
}

done = false;
while (!done) {
//TODO valid cursor check?

if(! self->positioned) {
done = true;
}
// TODO check for mutation and refresh key?
else if(_cursor_get_c(self, get_op)) {
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
} else {
key = obj_from_val(&self->key, as_buffer);
PRELOAD_UNLOCKED(0, self->val.mv_data, self->val.mv_size);

if(!arg.dupfixed_bytes) {
/* Not dupfixed, MDB_GET_CURRENT returns single item */
val = obj_from_val(&self->val, as_buffer);
tup = PyTuple_New(2);

if (tup && key && val) {
PyTuple_SET_ITEM(tup, 0, key);
PyTuple_SET_ITEM(tup, 1, val);
PyList_Append(ret, tup);
Py_DECREF(tup);
} else {
Py_DECREF(key);
Py_DECREF(val);
Py_DECREF(tup);
}
} else {
/* dupfixed, MDB_GET_MULTIPLE returns batch, iterate values */
int len = (int) self->val.mv_size/arg.dupfixed_bytes; // size_t?
for(i=0; i<len; i++){
// TODO Handle as_buffer?
val = PyBytes_FromStringAndSize(
(char *) self->val.mv_data+(i*arg.dupfixed_bytes),
(Py_ssize_t) arg.dupfixed_bytes);
tup = PyTuple_New(2);

if (tup && key && val) {
Py_INCREF(key); // Hold key in loop
PyTuple_SET_ITEM(tup, 0, key);
PyTuple_SET_ITEM(tup, 1, val);
PyList_Append(ret, tup);
Py_DECREF(tup);
} else {
Py_DECREF(val);
Py_DECREF(tup);
}
}
Py_DECREF(key); // Release key
}

if(arg.dupdata){
if(_cursor_get_c(self, next_op)) {
Py_DECREF(item);
Py_DECREF(iter);
return NULL;
}
}
else {
done = true;
}
}
}
Py_DECREF(item);
}

Py_DECREF(iter);
if(PyErr_Occurred()) {
return NULL;
}

return ret;
}

/**
* Cursor.get() -> result
*/
Expand Down Expand Up @@ -2829,6 +2975,7 @@ static struct PyMethodDef cursor_methods[] = {
{"first", (PyCFunction)cursor_first, METH_NOARGS},
{"first_dup", (PyCFunction)cursor_first_dup, METH_NOARGS},
{"get", (PyCFunction)cursor_get, METH_VARARGS|METH_KEYWORDS},
{"getmulti", (PyCFunction)cursor_get_multi, METH_VARARGS|METH_KEYWORDS},
{"item", (PyCFunction)cursor_item, METH_NOARGS},
{"iternext", (PyCFunction)cursor_iternext, METH_VARARGS|METH_KEYWORDS},
{"iternext_dup", (PyCFunction)cursor_iternext_dup, METH_VARARGS|METH_KEYWORDS},
Expand Down
64 changes: 64 additions & 0 deletions tests/getmulti_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import absolute_import
from __future__ import with_statement
import unittest

import testlib
from testlib import KEYS2, ITEMS2_MULTI
from testlib import putBigDataMulti

class GetMultiTestBase(unittest.TestCase):

def tearDown(self):
testlib.cleanup()

def setUp(self, dupsort=None, dupfixed=None):
self.db_key = "testdb".encode('utf-8')
self.path, self.env = testlib.temp_env(max_dbs=1)
self.txn = self.env.begin(write=True)
self.db = self.env.open_db(
key=self.db_key, txn=self.txn,
dupsort=dupsort,
dupfixed=dupfixed
)
putBigDataMulti(self.txn, db=self.db)
self.c = self.txn.cursor(db=self.db)

def matchList(self, ls_a, ls_b):
return ((not (ls_a or ls_b)) or
(ls_a and ls_b and all(map(lambda x, y: x == y, ls_a, ls_b))))


class GetMultiTestNoDupsortNoDupfixed(GetMultiTestBase):

ITEMS2_MULTI_NODUP = ITEMS2_MULTI[1::2]

def setUp(self, dupsort=False, dupfixed=False):
super(GetMultiTestNoDupsortNoDupfixed, self).setUp(dupsort=dupsort, dupfixed=dupfixed)

def testGetMulti(self):
test_list = list(self.c.getmulti(KEYS2))
self.assertEqual(self.matchList(test_list, self.ITEMS2_MULTI_NODUP), True)


class GetMultiTestDupsortNoDupfixed(GetMultiTestBase):

def setUp(self, dupsort=True, dupfixed=False):
super(GetMultiTestDupsortNoDupfixed, self).setUp(dupsort=dupsort, dupfixed=dupfixed)

def testGetMulti(self):
test_list = list(self.c.getmulti(KEYS2, dupdata=True))
self.assertEqual(self.matchList(test_list, ITEMS2_MULTI), True)


class GetMultiTestDupsortDupfixed(GetMultiTestBase):

def setUp(self, dupsort=True, dupfixed=True):
super(GetMultiTestDupsortDupfixed, self).setUp(dupsort=dupsort, dupfixed=dupfixed)

def testGetMulti(self):
test_list = list(self.c.getmulti(KEYS2, dupdata=True, dupfixed_bytes=1))
self.assertEqual(self.matchList(test_list, ITEMS2_MULTI), True)


if __name__ == '__main__':
unittest.main()
11 changes: 11 additions & 0 deletions tests/testlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def debug_collect():
ITEMS2 = [(k, B('')) for k in KEYS2]
REV_ITEMS2 = ITEMS2[::-1]
VALUES2 = [B('') for k in KEYS2]
VALUES2_MULTI = [(B('r'), B('s')) for k in KEYS2]
ITEMS2_MULTI = [
(kv[0], v) for kv in list(zip(KEYS2, VALUES2_MULTI)) for v in kv[1]
]

def putData(t, db=None):
for k, v in ITEMS:
Expand All @@ -151,3 +155,10 @@ def putBigData(t, db=None):
t.put(k, v, db=db)
else:
t.put(k, v)

def putBigDataMulti(t, db=None):
for k, v in ITEMS2_MULTI:
if db:
t.put(k, v, db=db) #defaults to dupdata=True
else:
t.put(k, v)

0 comments on commit 509ebd7

Please sign in to comment.