Skip to content

Commit e838ece

Browse files
author
Brett Lawson
committed
ING-1223: Implemented GetEx operation.
1 parent e0ddb0a commit e838ece

File tree

5 files changed

+327
-0
lines changed

5 files changed

+327
-0
lines changed

memdx/opcode.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ const (
3333
OpCodeSASLAuth = OpCode(OpCodeTypeCli | 0x21)
3434
OpCodeSASLStep = OpCode(OpCodeTypeCli | 0x22)
3535
OpCodeGetAllVBSeqnos = OpCode(OpCodeTypeCli | 0x48)
36+
OpCodeGetEx = OpCode(OpCodeTypeCli | 0x49)
37+
OpCodeGetExReplica = OpCode(OpCodeTypeCli | 0x4a)
3638
OpCodeDcpOpenConnection = OpCode(OpCodeTypeCli | 0x50)
3739
OpCodeDcpAddStream = OpCode(OpCodeTypeCli | 0x51)
3840
OpCodeDcpCloseStream = OpCode(OpCodeTypeCli | 0x52)

memdx/ops_crud.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,88 @@ func (o OpsCrud) Get(d Dispatcher, req *GetRequest, cb func(*GetResponse, error)
235235
})
236236
}
237237

238+
type GetExRequest struct {
239+
CrudRequestMeta
240+
CollectionID uint32
241+
Key []byte
242+
VbucketID uint16
243+
}
244+
245+
func (r GetExRequest) OpName() string { return OpCodeGetEx.String() }
246+
247+
type GetExResponse struct {
248+
CrudResponseMeta
249+
Cas uint64
250+
Flags uint32
251+
Value []byte
252+
Datatype uint8
253+
}
254+
255+
func (o OpsCrud) GetEx(d Dispatcher, req *GetExRequest, cb func(*GetExResponse, error)) (PendingOp, error) {
256+
extFramesBuf := make([]byte, 0, 128)
257+
extFramesBuf, err := o.encodeReqExtFrames(req.OnBehalfOf, 0, 0, false, extFramesBuf)
258+
if err != nil {
259+
return nil, err
260+
}
261+
262+
reqKey, err := o.encodeCollectionAndKey(req.CollectionID, req.Key, nil)
263+
if err != nil {
264+
return nil, err
265+
}
266+
267+
return d.Dispatch(&Packet{
268+
OpCode: OpCodeGetEx,
269+
Key: reqKey,
270+
VbucketID: req.VbucketID,
271+
FramingExtras: extFramesBuf,
272+
}, func(resp *Packet, err error) bool {
273+
if err != nil {
274+
cb(nil, err)
275+
return false
276+
}
277+
278+
decompErr := OpsCore{}.maybeDecompressPacket(resp)
279+
if decompErr != nil {
280+
cb(nil, decompErr)
281+
return false
282+
}
283+
284+
if resp.Status == StatusKeyNotFound {
285+
cb(nil, ErrDocNotFound)
286+
return false
287+
}
288+
289+
if resp.Status != StatusSuccess {
290+
cb(nil, OpsCrud{}.decodeCommonError(resp))
291+
return false
292+
}
293+
294+
if len(resp.Extras) != 4 {
295+
cb(nil, protocolError{"bad extras length"})
296+
return false
297+
}
298+
299+
flags := binary.BigEndian.Uint32(resp.Extras[0:])
300+
301+
serverDuration, err := o.decodeResExtFrames(resp.FramingExtras)
302+
if err != nil {
303+
cb(nil, err)
304+
return false
305+
}
306+
307+
cb(&GetExResponse{
308+
Cas: resp.Cas,
309+
Flags: flags,
310+
Value: resp.Value,
311+
Datatype: resp.Datatype,
312+
CrudResponseMeta: CrudResponseMeta{
313+
ServerDuration: serverDuration,
314+
},
315+
}, nil)
316+
return false
317+
})
318+
}
319+
238320
type GetAndTouchRequest struct {
239321
CrudRequestMeta
240322
CollectionID uint32

memdx/ops_crud_int_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3785,6 +3785,110 @@ func TestOpsCrudCounterNonNumericDoc(t *testing.T) {
37853785
require.ErrorIs(t, err, memdx.ErrDeltaBadval)
37863786
}
37873787

3788+
func TestOpsCrudGetEx(t *testing.T) {
3789+
testutilsint.SkipIfShortTest(t)
3790+
testutilsint.SkipIfOlderServerVersion(t, "8.0.0")
3791+
3792+
t.Run("Basic", func(t *testing.T) {
3793+
key := []byte(uuid.NewString())
3794+
3795+
cli := createTestClient(t)
3796+
3797+
_, err := memdx.SyncUnaryCall(memdx.OpsCrud{
3798+
CollectionsEnabled: true,
3799+
ExtFramesEnabled: true,
3800+
}, memdx.OpsCrud.Set, cli, &memdx.SetRequest{
3801+
CollectionID: 0,
3802+
Key: key,
3803+
VbucketID: defaultTestVbucketID,
3804+
Value: []byte(`{"key":"value"}`),
3805+
})
3806+
require.NoError(t, err)
3807+
3808+
resp, err := memdx.SyncUnaryCall(memdx.OpsCrud{
3809+
CollectionsEnabled: true,
3810+
ExtFramesEnabled: true,
3811+
}, memdx.OpsCrud.GetEx, cli, &memdx.GetExRequest{
3812+
CollectionID: 0,
3813+
Key: key,
3814+
VbucketID: defaultTestVbucketID,
3815+
})
3816+
require.NoError(t, err)
3817+
3818+
hasXattrs := resp.Datatype&uint8(memdx.DatatypeFlagXattrs) != 0
3819+
require.False(t, hasXattrs)
3820+
3821+
require.NoError(t, err)
3822+
require.Equal(t, []byte(`{"key":"value"}`), resp.Value)
3823+
})
3824+
3825+
t.Run("WithXattrs", func(t *testing.T) {
3826+
key := []byte(uuid.NewString())
3827+
3828+
cli := createTestClient(t)
3829+
3830+
_, err := memdx.SyncUnaryCall(memdx.OpsCrud{
3831+
CollectionsEnabled: true,
3832+
ExtFramesEnabled: true,
3833+
}, memdx.OpsCrud.Set, cli, &memdx.SetRequest{
3834+
CollectionID: 0,
3835+
Key: key,
3836+
VbucketID: defaultTestVbucketID,
3837+
Value: []byte(`{"key":"value"}`),
3838+
})
3839+
require.NoError(t, err)
3840+
3841+
_, err = memdx.SyncUnaryCall(memdx.OpsCrud{
3842+
CollectionsEnabled: true,
3843+
ExtFramesEnabled: true,
3844+
}, memdx.OpsCrud.MutateIn, cli, &memdx.MutateInRequest{
3845+
CollectionID: 0,
3846+
Key: key,
3847+
VbucketID: defaultTestVbucketID,
3848+
Ops: []memdx.MutateInOp{
3849+
{
3850+
Op: memdx.MutateInOpTypeDictSet,
3851+
Path: []byte("_foo"),
3852+
Value: []byte(`{"x":"y"}`),
3853+
Flags: memdx.SubdocOpFlagXattrPath,
3854+
},
3855+
{
3856+
Op: memdx.MutateInOpTypeDictSet,
3857+
Path: []byte("_bar"),
3858+
Value: []byte(`{"y":"z"}`),
3859+
Flags: memdx.SubdocOpFlagXattrPath,
3860+
},
3861+
},
3862+
})
3863+
require.NoError(t, err)
3864+
3865+
resp, err := memdx.SyncUnaryCall(memdx.OpsCrud{
3866+
CollectionsEnabled: true,
3867+
ExtFramesEnabled: true,
3868+
}, memdx.OpsCrud.GetEx, cli, &memdx.GetExRequest{
3869+
CollectionID: 0,
3870+
Key: key,
3871+
VbucketID: defaultTestVbucketID,
3872+
})
3873+
require.NoError(t, err)
3874+
3875+
hasXattrs := resp.Datatype&uint8(memdx.DatatypeFlagXattrs) != 0
3876+
require.True(t, hasXattrs)
3877+
3878+
xattrBlob, docValue, err := memdx.SplitXattrBlob(resp.Value)
3879+
require.NoError(t, err)
3880+
3881+
xattrs := make(map[string]string)
3882+
err = memdx.IterXattrBlobEntries(xattrBlob, func(name, value string) {
3883+
xattrs[name] = value
3884+
})
3885+
require.NoError(t, err)
3886+
require.Equal(t, map[string]string{"_foo": `{"x":"y"}`, "_bar": `{"y":"z"}`}, xattrs)
3887+
3888+
require.Equal(t, []byte(`{"key":"value"}`), docValue)
3889+
})
3890+
}
3891+
37883892
type testCrudDispatcher struct {
37893893
Pak *memdx.Packet
37903894
}

memdx/xattrblob.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package memdx
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
)
7+
8+
func SplitXattrBlob(value []byte) ([]byte, []byte, error) {
9+
if len(value) < 4 {
10+
return nil, nil, protocolError{"xattr blob too small"}
11+
}
12+
13+
// The first 4 bytes are the length of the xattr blob
14+
xattrLen := binary.BigEndian.Uint32(value[:4])
15+
return value[4 : 4+xattrLen], value[4+xattrLen:], nil
16+
}
17+
18+
func JoinXattrBlob(xattrEntries, docValue []byte) []byte {
19+
totalLen := 4 + len(xattrEntries) + len(docValue)
20+
buf := make([]byte, totalLen)
21+
binary.BigEndian.PutUint32(buf[:4], uint32(len(xattrEntries)))
22+
copy(buf[4:], xattrEntries)
23+
copy(buf[4+len(xattrEntries):], docValue)
24+
return buf
25+
}
26+
27+
func DecodeXattrBlobEntry(buf []byte) (string, string, int, error) {
28+
if len(buf) < 4 {
29+
return "", "", 0, protocolError{"xattr entry too small"}
30+
}
31+
32+
// The first 4 bytes are the length of the xattr blob
33+
xattrLen := binary.BigEndian.Uint32(buf[:4])
34+
if len(buf) < int(xattrLen)+4 {
35+
return "", "", 0, protocolError{"xattr blob too small"}
36+
}
37+
38+
entryBytes := buf[4 : 4+xattrLen]
39+
40+
nullDelim1Idx := bytes.IndexByte(entryBytes, 0)
41+
if nullDelim1Idx == -1 {
42+
return "", "", 0, protocolError{"xattr entry missing first null delimiter"}
43+
}
44+
45+
entryName := string(entryBytes[:nullDelim1Idx])
46+
entryBytes = entryBytes[nullDelim1Idx+1:]
47+
48+
nullDelim2Idx := bytes.IndexByte(entryBytes, 0)
49+
if nullDelim2Idx == -1 {
50+
return "", "", 0, protocolError{"xattr entry missing second null delimiter"}
51+
}
52+
53+
if nullDelim2Idx != len(entryBytes)-1 {
54+
return "", "", 0, protocolError{"xattr entry has extra data after second null delimiter"}
55+
}
56+
57+
entryValue := string(entryBytes[:nullDelim2Idx])
58+
59+
return entryName, entryValue, int(4 + xattrLen), nil
60+
}
61+
62+
func AppendXattrBlobEntry(buf []byte, name, value string) []byte {
63+
nameBytes := []byte(name)
64+
valueBytes := []byte(value)
65+
66+
entryLen := len(nameBytes) + 1 + len(valueBytes) + 1
67+
68+
entryLenBytes := make([]byte, 4)
69+
binary.BigEndian.PutUint32(entryLenBytes, uint32(entryLen))
70+
71+
buf = append(buf, entryLenBytes...)
72+
buf = append(buf, nameBytes...)
73+
buf = append(buf, 0)
74+
buf = append(buf, valueBytes...)
75+
buf = append(buf, 0)
76+
77+
return buf
78+
}
79+
80+
func IterXattrBlobEntries(buf []byte, cb func(string, string)) error {
81+
for len(buf) > 0 {
82+
name, value, n, err := DecodeXattrBlobEntry(buf)
83+
if err != nil {
84+
return err
85+
}
86+
87+
cb(name, value)
88+
89+
buf = buf[n:]
90+
}
91+
92+
return nil
93+
}

memdx/xattrblob_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package memdx
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
/*
10+
DOC: {"key":"value"}
11+
_bar: {"y":"z"}
12+
_foo: {"x":"y"}
13+
*/
14+
var TEST_VALUE []byte = []byte{
15+
0, 0, 0, 38, 0, 0, 0, 15, 95, 98, 97, 114, 0, 123, 34, 121,
16+
34, 58, 34, 122, 34, 125, 0, 0, 0, 0, 15, 95, 102, 111, 111, 0,
17+
123, 34, 120, 34, 58, 34, 121, 34, 125, 0, 123, 34, 107, 101, 121, 34,
18+
58, 34, 118, 97, 108, 117, 101, 34, 125}
19+
20+
func TestXattrBlob(t *testing.T) {
21+
xattrBlob, docValue, err := SplitXattrBlob(TEST_VALUE)
22+
require.NoError(t, err)
23+
require.Equal(t, []byte(`{"key":"value"}`), docValue)
24+
25+
remainingBytes := xattrBlob
26+
firstName, firstValue, n, err := DecodeXattrBlobEntry(remainingBytes)
27+
require.NoError(t, err)
28+
require.Equal(t, "_bar", firstName)
29+
require.Equal(t, `{"y":"z"}`, firstValue)
30+
31+
remainingBytes = remainingBytes[n:]
32+
secondName, secondValue, n, err := DecodeXattrBlobEntry(remainingBytes)
33+
require.NoError(t, err)
34+
require.Equal(t, "_foo", secondName)
35+
require.Equal(t, `{"x":"y"}`, secondValue)
36+
37+
remainingBytes = remainingBytes[n:]
38+
require.Empty(t, remainingBytes)
39+
40+
newBlob := make([]byte, 0)
41+
newBlob = AppendXattrBlobEntry(newBlob, "_bar", `{"y":"z"}`)
42+
newBlob = AppendXattrBlobEntry(newBlob, "_foo", `{"x":"y"}`)
43+
44+
newBytes := JoinXattrBlob(newBlob, docValue)
45+
require.Equal(t, TEST_VALUE, newBytes)
46+
}

0 commit comments

Comments
 (0)