Skip to content

Commit

Permalink
fix: wrong unmarshal method in getPrevOperation
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Nov 10, 2020
1 parent 20858db commit 195a65e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 30 deletions.
61 changes: 31 additions & 30 deletions dmap_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/buraksezer/olric/internal/discovery"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/storage"
"github.com/vmihailenco/msgpack"
)

// Entry is a DMap entry with its metadata.
Expand Down Expand Up @@ -53,11 +52,23 @@ func (db *Olric) unmarshalValue(rawval []byte) (interface{}, error) {
return value, nil
}

// lookupOnOwners collects versions of a key/value pair on the partition owner
// by including previous partition owners.
func (db *Olric) lookupOnOwners(dm *dmap, hkey uint64, name, key string) []*version {
var versions []*version
func (db *Olric) lookupOnPreviousOwner(owner *discovery.Member, name, key string) (*version, error) {
req := protocol.NewDMapMessage(protocol.OpGetPrev)
req.SetDMap(name)
req.SetKey(key)

v := &version{host: owner}
resp, err := db.requestTo(owner.String(), req)
if err != nil {
return nil, err
}
data := storage.NewEntry()
data.Decode(resp.Value())
v.entry = data
return v, nil
}

func (db *Olric) lookupOnThisNode(dm *dmap, hkey uint64, name, key string) *version {
// Check on localhost, the partition owner.
value, err := dm.storage.Get(hkey)
if err != nil {
Expand All @@ -73,42 +84,38 @@ func (db *Olric) lookupOnOwners(dm *dmap, hkey uint64, name, key string) []*vers
db.log.V(3).Printf("[ERROR] Failed to get key: %s on %s could not be found: %s", key, name, err)
}
}

ver := &version{
return &version{
host: &db.this,
entry: value,
}
versions = append(versions, ver)
}

// Run a query on the previous owners.
// lookupOnOwners collects versions of a key/value pair on the partition owner
// by including previous partition owners.
func (db *Olric) lookupOnOwners(dm *dmap, hkey uint64, name, key string) []*version {
owners := db.getPartitionOwners(hkey)
if len(owners) == 0 {
panic("partition owners list cannot be empty")
}

var versions []*version
versions = append(versions, db.lookupOnThisNode(dm, hkey, name, key))

// Run a query on the previous owners.
// Traverse in reverse order. Except from the latest host, this one.
for i := len(owners) - 2; i >= 0; i-- {
owner := owners[i]
req := protocol.NewDMapMessage(protocol.OpGetPrev)
req.SetDMap(name)
req.SetKey(key)

ver := &version{host: &owner}
resp, err := db.requestTo(owner.String(), req)
v, err := db.lookupOnPreviousOwner(&owner, name, key)
if err != nil {
if db.log.V(3).Ok() {
db.log.V(3).Printf("[ERROR] Failed to call get on a previous "+
"primary owner: %s: %v", owner, err)
}
} else {
data := storage.NewEntry()
data.Decode(resp.Value())
err = msgpack.Unmarshal(resp.Value(), &data)
ver.entry = data
// Ignore failed owners. The data on those hosts will be wiped out
// by the rebalancer.
versions = append(versions, ver)
continue
}
// Ignore failed owners. The data on those hosts will be wiped out
// by the rebalancer.
versions = append(versions, v)
}
return versions
}
Expand Down Expand Up @@ -369,12 +376,6 @@ func (db *Olric) getPrevOperation(w, r protocol.EncodeDecoder) {
db.errorResponse(w, ErrKeyNotFound)
return
}

value, err := msgpack.Marshal(*entry)
if err != nil {
db.errorResponse(w, err)
return
}
w.SetStatus(protocol.StatusOK)
w.SetValue(value)
w.SetValue(entry.Encode())
}
40 changes: 40 additions & 0 deletions dmap_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package olric
import (
"bytes"
"context"
"reflect"
"testing"
"time"
)
Expand Down Expand Up @@ -386,3 +387,42 @@ func TestDMap_GetEntryOnCluster(t *testing.T) {
}
}
}

func TestDMap_OpGetPrev(t *testing.T) {
c := newTestCluster(nil)
defer c.teardown()

db, err := c.newDB()
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

dm, err := db.NewDMap("mymap")
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
for i := 0; i < 100; i++ {
err = dm.Put(bkey(i), bval(i))
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
}

v, err := db.lookupOnPreviousOwner(&db.this, "mymap", bkey(10))
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
if !reflect.DeepEqual(v.host, &db.this) {
t.Fatalf("Returned host is different: %v", v.host)
}
val, err := db.unmarshalValue(v.entry.Value)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
if !bytes.Equal(val.([]byte), bval(10)) {
t.Fatalf("Returned value is different")
}
if v.entry.Key != bkey(10) {
t.Fatalf("Returned key is different")
}
}

0 comments on commit 195a65e

Please sign in to comment.