Skip to content

Commit

Permalink
EC: cleanup manager: remove RWmutex and unused map
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Markelov <[email protected]>
  • Loading branch information
VladimirMarkelov committed Dec 13, 2023
1 parent d274b0a commit 7ed0f02
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 68 deletions.
42 changes: 6 additions & 36 deletions ec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"io"
"sync"
ratomic "sync/atomic"

"github.com/NVIDIA/aistore/api/apc"
Expand All @@ -28,8 +27,6 @@ import (
type Manager struct {
bmd *meta.BMD

xacts map[string]*BckXacts // bckName -> xctn map, only ais buckets allowed, no naming collisions

netReq string // network used to send object request
netResp string // network used to send/receive slices

Expand All @@ -38,8 +35,6 @@ type Manager struct {
respBundle ratomic.Pointer[bundle.Streams]

bundleEnabled atomic.Bool // to disable and enable on the fly

mu sync.RWMutex
}

var (
Expand All @@ -52,7 +47,6 @@ func initManager() (err error) {
netReq: cmn.NetIntraControl,
netResp: cmn.NetIntraData,
bmd: g.t.Bowner().Get(),
xacts: make(map[string]*BckXacts),
}
if ECM.bmd.IsECUsed() {
err = ECM.initECBundles()
Expand Down Expand Up @@ -118,28 +112,22 @@ func (mgr *Manager) NewGetXact(bck *cmn.Bck) *XactGet { return newGetXac
func (mgr *Manager) NewPutXact(bck *cmn.Bck) *XactPut { return newPutXact(bck, mgr) }
func (mgr *Manager) NewRespondXact(bck *cmn.Bck) *XactRespond { return newRespondXact(bck, mgr) }

func (mgr *Manager) RestoreBckGetXact(bck *meta.Bck) (xget *XactGet) {
func (*Manager) RestoreBckGetXact(bck *meta.Bck) *XactGet {
xctn, err := _renewXact(bck, apc.ActECGet)
debug.AssertNoErr(err) // TODO: handle, here and elsewhere
xget = xctn.(*XactGet)
mgr.getBckXacts(bck.Name).SetGet(xget)
return
return xctn.(*XactGet)
}

func (mgr *Manager) RestoreBckPutXact(bck *meta.Bck) (xput *XactPut) {
func (*Manager) RestoreBckPutXact(bck *meta.Bck) *XactPut {
xctn, err := _renewXact(bck, apc.ActECPut)
debug.AssertNoErr(err)
xput = xctn.(*XactPut)
mgr.getBckXacts(bck.Name).SetPut(xput)
return
return xctn.(*XactPut)
}

func (mgr *Manager) RestoreBckRespXact(bck *meta.Bck) (xrsp *XactRespond) {
func (*Manager) RestoreBckRespXact(bck *meta.Bck) *XactRespond {
xctn, err := _renewXact(bck, apc.ActECRespond)
debug.AssertNoErr(err)
xrsp = xctn.(*XactRespond)
mgr.getBckXacts(bck.Name).SetReq(xrsp)
return
return xctn.(*XactRespond)
}

func _renewXact(bck *meta.Bck, kind string) (cluster.Xact, error) {
Expand All @@ -150,21 +138,6 @@ func _renewXact(bck *meta.Bck, kind string) (cluster.Xact, error) {
return rns.Entry.Get(), nil
}

func (mgr *Manager) getBckXacts(bckName string) *BckXacts {
mgr.mu.Lock()
defer mgr.mu.Unlock()
return mgr.getBckXactsUnlocked(bckName)
}

func (mgr *Manager) getBckXactsUnlocked(bckName string) *BckXacts {
xacts, ok := mgr.xacts[bckName]
if !ok {
xacts = &BckXacts{}
mgr.xacts[bckName] = xacts
}
return xacts
}

// A function to process command requests from other targets
func (mgr *Manager) recvRequest(hdr *transport.ObjHdr, objReader io.Reader, err error) error {
defer transport.FreeRecv(objReader)
Expand Down Expand Up @@ -317,15 +290,12 @@ func (mgr *Manager) enableBck(bck *meta.Bck) {
}

func (mgr *Manager) BMDChanged() error {
mgr.mu.Lock()
newBMD := g.t.Bowner().Get()
oldBMD := mgr.bmd
if newBMD.Version <= mgr.bmd.Version {
mgr.mu.Unlock()
return nil
}
mgr.bmd = newBMD
mgr.mu.Unlock()

// globally
if newBMD.IsECUsed() && !oldBMD.IsECUsed() {
Expand Down
32 changes: 0 additions & 32 deletions ec/xaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"os"
"sync"
ratomic "sync/atomic"

"github.com/NVIDIA/aistore/cluster"
"github.com/NVIDIA/aistore/cluster/meta"
Expand Down Expand Up @@ -55,12 +54,6 @@ type (
mtx sync.Mutex
slices map[string]*slice
}

BckXacts struct {
get ratomic.Pointer[XactGet]
put ratomic.Pointer[XactPut]
req ratomic.Pointer[XactRespond]
}
)

func (r *xactECBase) init(config *cmn.Config, bck *cmn.Bck, mgr *Manager) {
Expand Down Expand Up @@ -401,28 +394,3 @@ func (r *xactECBase) baseSnap() (snap *cluster.Snap) {
snap.IdleX = r.IsIdle()
return
}

//////////////
// BckXacts //
//////////////

func (xacts *BckXacts) Get() *XactGet { return xacts.get.Load() }
func (xacts *BckXacts) Put() *XactPut { return xacts.put.Load() }
func (xacts *BckXacts) Req() *XactRespond { return xacts.req.Load() }
func (xacts *BckXacts) SetGet(xctn *XactGet) { xacts.get.Store(xctn) }
func (xacts *BckXacts) SetPut(xctn *XactPut) { xacts.put.Store(xctn) }
func (xacts *BckXacts) SetReq(xctn *XactRespond) { xacts.req.Store(xctn) }

func (xacts *BckXacts) AbortGet() { // TODO: caller must provide the error (reason) - here and elsewhere
xctn := xacts.get.Load()
if xctn != nil {
xctn.Abort(nil)
}
}

func (xacts *BckXacts) AbortPut() {
xctn := xacts.put.Load()
if xctn != nil {
xctn.Abort(nil)
}
}

0 comments on commit 7ed0f02

Please sign in to comment.