From d6af61f64e63e5ee1b75234e045cc6bfdde6ce1a Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Thu, 8 Oct 2020 19:24:32 +0300 Subject: [PATCH] fix: Race condition in DMap creation refactor: More consistent logging --- dmap.go | 23 ++++------------------- dmap_atomic.go | 4 ++-- dmap_destroy.go | 2 +- dmap_eviction.go | 4 ++-- dmap_expire.go | 4 ++-- dmap_get.go | 2 +- dmap_lock.go | 2 +- dmap_put.go | 4 ++-- rebalancer.go | 40 +++++++++++++++++++--------------------- routing.go | 7 ++++--- 10 files changed, 38 insertions(+), 54 deletions(-) diff --git a/dmap.go b/dmap.go index 3d15f1f6..0f24ba17 100644 --- a/dmap.go +++ b/dmap.go @@ -51,42 +51,31 @@ func (db *Olric) NewDMap(name string) (*DMap, error) { }, nil } -// createDMap creates and returns a new dmap, internal representation of a dmap. +// createDMap creates and returns a new dmap, internal representation of a dmap. This function is not thread-safe. func (db *Olric) createDMap(part *partition, name string, str *storage.Storage) (*dmap, error) { - // We need to protect storage.New - part.Lock() - defer part.Unlock() - - // Try to load one more time. Another goroutine may have created the dmap. - dm, ok := part.m.Load(name) - if ok { - return dm.(*dmap), nil - } - // create a new map here. nm := &dmap{ storage: str, } - if db.config.Cache != nil { err := db.setCacheConfiguration(nm, name) if err != nil { return nil, err } } - // rebalancer code may send a storage instance for the new dmap. Just use it. if nm.storage != nil { nm.storage = str } else { nm.storage = storage.New(db.config.TableSize) } - part.m.Store(name, nm) return nm, nil } func (db *Olric) getOrCreateDMap(part *partition, name string) (*dmap, error) { + part.Lock() + defer part.Unlock() dm, ok := part.m.Load(name) if ok { return dm.(*dmap), nil @@ -102,9 +91,5 @@ func (db *Olric) getDMap(name string, hkey uint64) (*dmap, error) { func (db *Olric) getBackupDMap(name string, hkey uint64) (*dmap, error) { part := db.getBackupPartition(hkey) - dm, ok := part.m.Load(name) - if ok { - return dm.(*dmap), nil - } - return db.createDMap(part, name, nil) + return db.getOrCreateDMap(part, name) } diff --git a/dmap_atomic.go b/dmap_atomic.go index 8df538e6..a9f601f0 100644 --- a/dmap_atomic.go +++ b/dmap_atomic.go @@ -28,7 +28,7 @@ func (db *Olric) atomicIncrDecr(opr string, w *writeop, delta int) (int, error) defer func() { err := db.locker.Unlock(atomicKey) if err != nil { - db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on dmap: %s: %v", w.key, w.dmap, err) + db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on DMap: %s: %v", w.key, w.dmap, err) } }() @@ -105,7 +105,7 @@ func (db *Olric) getPut(w *writeop) ([]byte, error) { defer func() { err := db.locker.Unlock(atomicKey) if err != nil { - db.log.V(3).Printf("[ERROR] Failed to release the lock for key: %s on dmap: %s: %v", w.key, w.dmap, err) + db.log.V(3).Printf("[ERROR] Failed to release the lock for key: %s on DMap: %s: %v", w.key, w.dmap, err) } }() diff --git a/dmap_destroy.go b/dmap_destroy.go index 85999070..7f6dc7ff 100644 --- a/dmap_destroy.go +++ b/dmap_destroy.go @@ -43,7 +43,7 @@ func (db *Olric) destroyDMap(name string) error { db.log.V(6).Printf("[DEBUG] Calling Destroy command on %s for %s", addr, name) _, err := db.requestTo(addr, req) if err != nil { - db.log.V(3).Printf("[ERROR] Failed to destroy dmap: %s on %s", name, addr) + db.log.V(3).Printf("[ERROR] Failed to destroy DMap: %s on %s", name, addr) } return err }) diff --git a/dmap_eviction.go b/dmap_eviction.go index 6b7cb438..3cdbc769 100644 --- a/dmap_eviction.go +++ b/dmap_eviction.go @@ -103,7 +103,7 @@ func (db *Olric) scanDMapForEviction(partID uint64, name string, dm *dmap) { err := db.delKeyVal(dm, hkey, name, vdata.Key) if err != nil { // It will be tried again. - db.log.V(3).Printf("[ERROR] Failed to delete expired hkey: %d on dmap: %s: %v", + db.log.V(3).Printf("[ERROR] Failed to delete expired hkey: %d on DMap: %s: %v", hkey, name, err) return true // this means 'continue' } @@ -212,7 +212,7 @@ func (db *Olric) evictKeyWithLRU(dm *dmap, name string) error { return err } if db.log.V(6).Ok() { - db.log.V(6).Printf("[DEBUG] Evicted item on dmap: %s, key: %s with LRU", name, key) + db.log.V(6).Printf("[DEBUG] Evicted item on DMap: %s, key: %s with LRU", name, key) } return db.delKeyVal(dm, item.HKey, name, key) } diff --git a/dmap_expire.go b/dmap_expire.go index f330360d..9c4075bf 100644 --- a/dmap_expire.go +++ b/dmap_expire.go @@ -70,7 +70,7 @@ func (db *Olric) syncExpireOnCluster(hkey uint64, dm *dmap, w *writeop) error { _, err := db.requestTo(owner.String(), req) if err != nil { if db.log.V(3).Ok() { - db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for dmap: %s: %v", + db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for DMap: %s: %v", owner, w.dmap, err) } continue @@ -80,7 +80,7 @@ func (db *Olric) syncExpireOnCluster(hkey uint64, dm *dmap, w *writeop) error { err := db.localExpire(hkey, dm, w) if err != nil { if db.log.V(3).Ok() { - db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for dmap: %s: %v", + db.log.V(3).Printf("[ERROR] Failed to call expire command on %s for DMap: %s: %v", db.this, w.dmap, err) } } else { diff --git a/dmap_get.go b/dmap_get.go index 89011774..46817d18 100644 --- a/dmap_get.go +++ b/dmap_get.go @@ -58,7 +58,7 @@ func (db *Olric) lookupOnOwners(dm *dmap, hkey uint64, name, key string) []*vers // the requested key can be found on a replica or a previous partition owner. if db.log.V(5).Ok() { db.log.V(5).Printf( - "[DEBUG] key: %s, HKey: %d on dmap: %s could not be found on the local storage: %v", + "[DEBUG] key: %s, HKey: %d on DMap: %s could not be found on the local storage: %v", key, hkey, name, err) } } else { diff --git a/dmap_lock.go b/dmap_lock.go index ca56fc9d..f288b8c7 100644 --- a/dmap_lock.go +++ b/dmap_lock.go @@ -50,7 +50,7 @@ func (db *Olric) unlockKey(name, key string, token []byte) error { defer func() { err := db.locker.Unlock(lkey) if err != nil { - db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on dmap: %s: %v", key, name, err) + db.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on DMap: %s: %v", key, name, err) } }() diff --git a/dmap_put.go b/dmap_put.go index ea30cfb7..e02e648a 100644 --- a/dmap_put.go +++ b/dmap_put.go @@ -179,7 +179,7 @@ func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error { _, err := db.requestTo(owner.String(), req) if err != nil { if db.log.V(3).Ok() { - db.log.V(3).Printf("[ERROR] Failed to call put command on %s for dmap: %s: %v", owner, w.dmap, err) + db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", owner, w.dmap, err) } continue } @@ -188,7 +188,7 @@ func (db *Olric) syncPutOnCluster(hkey uint64, dm *dmap, w *writeop) error { err := db.localPut(hkey, dm, w) if err != nil { if db.log.V(3).Ok() { - db.log.V(3).Printf("[ERROR] Failed to call put command on %s for dmap: %s: %v", db.this, w.dmap, err) + db.log.V(3).Printf("[ERROR] Failed to call put command on %s for DMap: %s: %v", db.this, w.dmap, err) } } else { successful++ diff --git a/rebalancer.go b/rebalancer.go index d328c96c..0402516d 100644 --- a/rebalancer.go +++ b/rebalancer.go @@ -88,24 +88,20 @@ func (db *Olric) selectVersionForMerge(dm *dmap, hkey uint64, vdata *storage.VDa } func (db *Olric) mergeDMaps(part *partition, data *dmapbox) error { - str, err := storage.Import(data.Payload) + dm, err := db.getOrCreateDMap(part, data.Name) if err != nil { return err } - tmp, exist := part.m.Load(data.Name) - if !exist { - // create a new dmap if it doesn't exist. - tmp, err = db.createDMap(part, data.Name, str) - if err != nil { - return err - } - } - // Acquire dmap's lock. No one should work on it. - dm := tmp.(*dmap) dm.Lock() defer dm.Unlock() + defer part.m.Store(data.Name, dm) + + str, err := storage.Import(data.Payload) + if err != nil { + return err + } // Merge accessLog. if dm.cache != nil && dm.cache.accessLog != nil { @@ -118,11 +114,14 @@ func (db *Olric) mergeDMaps(part *partition, data *dmapbox) error { dm.cache.Unlock() } - // We do not need the following loop if the dmap is created here. - if !exist { + if dm.storage.Len() == 0 { + // DMap has no keys. Set the imported storage instance. + // The old one will be garbage collected. + dm.storage = str return nil } + // DMap has some keys. Merge with the new one. var mergeErr error str.Range(func(hkey uint64, vdata *storage.VData) bool { winner, err := db.selectVersionForMerge(dm, hkey, vdata) @@ -171,11 +170,11 @@ func (db *Olric) rebalancePrimaryPartitions() { } // This is a previous owner. Move the keys. part.m.Range(func(name, dm interface{}) bool { - db.log.V(2).Printf("[INFO] Moving dmap: %s (backup: %v) on PartID: %d to %s", + db.log.V(2).Printf("[INFO] Moving DMap: %s (backup: %v) on PartID: %d to %s", name, part.backup, partID, owner) err := db.moveDMap(part, name.(string), dm.(*dmap), owner) if err != nil { - db.log.V(2).Printf("[ERROR] Failed to move dmap: %s on PartID: %d to %s: %v", + db.log.V(2).Printf("[ERROR] Failed to move DMap: %s on PartID: %d to %s: %v", name, partID, owner, err) } // if this returns true, the iteration continues @@ -232,11 +231,11 @@ func (db *Olric) rebalanceBackupPartitions() { } part.m.Range(func(name, dm interface{}) bool { - db.log.V(2).Printf("[INFO] Moving dmap: %s (backup: %v) on PartID: %d to %s", + db.log.V(2).Printf("[INFO] Moving DMap: %s (backup: %v) on PartID: %d to %s", name, part.backup, partID, owner) err := db.moveDMap(part, name.(string), dm.(*dmap), owner) if err != nil { - db.log.V(2).Printf("[ERROR] Failed to move backup dmap: %s on PartID: %d to %s: %v", + db.log.V(2).Printf("[ERROR] Failed to move backup DMap: %s on PartID: %d to %s: %v", name, partID, owner, err) } // if this returns true, the iteration continues @@ -294,20 +293,19 @@ func (db *Olric) moveDMapOperation(w, r protocol.EncodeDecoder) { } // Check ownership before merging. This is useful to prevent data corruption in network partitioning case. if !db.checkOwnership(part) { - db.log.V(2).Printf("[ERROR] Received dmap: %s on PartID: %d (backup: %v) doesn't belong to me", + db.log.V(2).Printf("[ERROR] Received DMap: %s on PartID: %d (backup: %v) doesn't belong to me", box.Name, box.PartID, box.Backup) - err := fmt.Errorf("partID: %d (backup: %v) doesn't belong to %s: %w", box.PartID, box.Backup, db.this, ErrInvalidArgument) db.errorResponse(w, err) return } - db.log.V(2).Printf("[INFO] Received dmap (backup:%v): %s on PartID: %d", + db.log.V(2).Printf("[INFO] Received DMap (backup:%v): %s on PartID: %d", box.Backup, box.Name, box.PartID) err = db.mergeDMaps(part, box) if err != nil { - db.log.V(2).Printf("[ERROR] Failed to merge dmap: %v", err) + db.log.V(2).Printf("[ERROR] Failed to merge DMap: %v", err) db.errorResponse(w, err) return } diff --git a/routing.go b/routing.go index 46fbb909..43a38cb9 100644 --- a/routing.go +++ b/routing.go @@ -125,7 +125,6 @@ func (db *Olric) distributeBackups(partID uint64) []discovery.Member { } if count == 0 { // Delete it. - db.log.V(5).Printf("[DEBUG] Empty backup partition found. PartID: %d on %s", partID, backup) owners = append(owners[:i], owners[i+1:]...) i-- } @@ -206,7 +205,6 @@ func (db *Olric) distributePrimaryCopies(partID uint64) []discovery.Member { continue } if count == 0 { - db.log.V(6).Printf("[DEBUG] PartID: %d on %s is empty", partID, owner) // Empty partition. Delete it from ownership list. owners = append(owners[:i], owners[i+1:]...) i-- @@ -283,7 +281,10 @@ func (db *Olric) updateRoutingTableOnCluster(table routingTable) (map[discovery. return nil }) } - return ownershipReports, g.Wait() + if err := g.Wait(); err != nil { + return nil, err + } + return ownershipReports, nil } func (db *Olric) updateRouting() {