Skip to content
This repository was archived by the owner on Nov 22, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (e *Exchange) Tx(ctx context.Context, opts ...TxOption) *Tx {
ctx, cancel := context.WithCancel(ctx)
ms := e.opts.MultiStore
storeID := ms.Next()
store, err := ms.Get(storeID)
store, err := ms.Get(ctx, storeID)
tx := &Tx{
ctx: ctx,
cancelCtx: cancel,
Expand Down
6 changes: 3 additions & 3 deletions exchange/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestExchangeE2E(t *testing.T) {
ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()

mn := mocknet.New(bgCtx)
mn := mocknet.New()

var client *Exchange
var cnode *testutil.TestNode
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestExchangeE2E(t *testing.T) {
fname := cnode.CreateRandomFile(t, 256000)
link, storeID, origBytes := cnode.LoadFileToNewStore(ctx, t, fname)
rootCid := link.(cidlink.Link).Cid
bss, err := cnode.Ms.Get(storeID)
bss, err := cnode.Ms.Get(ctx, storeID)
require.NoError(t, err)
err = utils.MigrateBlocks(ctx, bss.Bstore, client.Index().bstore)
require.NoError(t, err)
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestExchangeJoiningNetwork(t *testing.T) {
ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()

mn := mocknet.New(bgCtx)
mn := mocknet.New()

newNode := func() (*Exchange, *testutil.TestNode) {
n := testutil.NewTestNode(mn, t)
Expand Down
18 changes: 9 additions & 9 deletions exchange/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewIndex(ds datastore.Batching, bstore blockstore.Blockstore, opts ...Index
o(idx)
}
// keep a reference of the blockstore for loading in graphsync
idx.store = cbor.NewCborStore(idx.bstore)
idx.store = cbor.NewCborStore(utils.NewBlockstoreWrapper(idx.bstore))
if err := idx.loadFromStore(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func NewIndex(ds datastore.Batching, bstore blockstore.Blockstore, opts ...Index

func (idx *Index) loadFromStore() error {
// var err error
enc, err := idx.ds.Get(datastore.NewKey(KIndex))
enc, err := idx.ds.Get(context.TODO(), datastore.NewKey(KIndex))
if err != nil && errors.Is(err, datastore.ErrNotFound) {
nd, err := hamt.NewNode(idx.store, hamt.UseTreeBitWidth(5), utils.HAMTHashOption)
if err != nil {
Expand Down Expand Up @@ -248,7 +248,7 @@ func (idx *Index) Flush() error {
return err
}
idx.rootCID = r
return idx.ds.Put(datastore.NewKey(KIndex), r.Bytes())
return idx.ds.Put(context.TODO(), datastore.NewKey(KIndex), r.Bytes())
}

// DropRef removes all content linked to a root CID and associated Refs
Expand Down Expand Up @@ -513,11 +513,11 @@ func (idx *Index) GC() error {
return errors.New("blockstore is not a GCBlockstore")
}

unlock := gcbs.GCLock()
defer unlock.Unlock()
unlock := gcbs.GCLock(context.TODO())
defer unlock.Unlock(context.TODO())

err := idx.gcSet.ForEach(func(c cid.Cid) error {
return idx.bstore.DeleteBlock(c)
return idx.bstore.DeleteBlock(context.TODO(), c)
})
if err != nil {
return fmt.Errorf("failed to run garbage collector: %v", err)
Expand All @@ -530,7 +530,7 @@ func (idx *Index) GC() error {
if !ok {
return errors.New("datastore is not a GCDatastore")
}
err = gcds.CollectGarbage()
err = gcds.CollectGarbage(context.TODO())
if err != nil {
return err
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (idx *Index) CleanBlockStore(ctx context.Context) error {
if cidSet.Has(k) {
continue
}
err = idx.Bstore().DeleteBlock(k)
err = idx.Bstore().DeleteBlock(ctx, k)
if err != nil {
return err
}
Expand All @@ -598,7 +598,7 @@ func (idx *Index) CleanBlockStore(ctx context.Context) error {
if !ok {
return errors.New("datastore is not a GCDatastore")
}
err = gcds.CollectGarbage()
err = gcds.CollectGarbage(ctx)
if err != nil {
return err
}
Expand Down
38 changes: 21 additions & 17 deletions exchange/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ func BenchmarkFlush(b *testing.B) {

// This selector should query a HAMT without following the links
func TestIndexSelector(t *testing.T) {
t.Skip()
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewGCBlockstore(blockstore.NewBlockstore(ds), blockstore.NewGCLocker())

Expand Down Expand Up @@ -396,7 +397,7 @@ func TestIndexSelector(t *testing.T) {
require.NoError(t, err)
blk, err := blocks.NewBlockWithCid(buffer.Bytes(), lnk.(cidlink.Link).Cid)
require.NoError(t, err)
require.NoError(t, idx.Bstore().Put(blk))
require.NoError(t, idx.Bstore().Put(context.TODO(), blk))

require.NoError(t, idx.SetRef(&DataRef{
PayloadCID: blk.Cid(),
Expand All @@ -422,7 +423,7 @@ func TestIndexSelector(t *testing.T) {
continue
}
key := l.Cid
blk, err := idx.Bstore().Get(key)
blk, err := idx.Bstore().Get(context.TODO(), key)
require.NoError(t, err)
err = traverser.Advance(bytes.NewBuffer(blk.RawData()))
require.NoError(t, err)
Expand Down Expand Up @@ -589,6 +590,7 @@ func TestLoadInterest(t *testing.T) {
}

func TestUnitGC(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewGCBlockstore(blockstore.NewBlockstore(ds), blockstore.NewGCLocker())

Expand All @@ -597,11 +599,11 @@ func TestUnitGC(t *testing.T) {

// generate random block1
blk1 := testutil.CreateRandomBlock(t, idx.Bstore())
require.NoError(t, idx.Bstore().Put(blk1))
require.NoError(t, idx.Bstore().Put(ctx, blk1))

// generate random block2
blk2 := testutil.CreateRandomBlock(t, idx.Bstore())
require.NoError(t, idx.Bstore().Put(blk2))
require.NoError(t, idx.Bstore().Put(ctx, blk2))

// set blk1-ref1 in index
require.NoError(t, idx.SetRef(&DataRef{
Expand All @@ -625,11 +627,11 @@ func TestUnitGC(t *testing.T) {
require.Equal(t, ref2.PayloadCID, blk2.Cid())

// check if bstore has blocks blk1 & blk2
has, err := idx.Bstore().Has(blk1.Cid())
has, err := idx.Bstore().Has(ctx, blk1.Cid())
require.NoError(t, err)
require.Equal(t, true, has)

has, err = idx.Bstore().Has(blk2.Cid())
has, err = idx.Bstore().Has(ctx, blk2.Cid())
require.NoError(t, err)
require.Equal(t, true, has)

Expand All @@ -642,17 +644,18 @@ func TestUnitGC(t *testing.T) {
require.NoError(t, err)

// check if GC did remove tagged block1 ...
has, err = idx.Bstore().Has(blk1.Cid())
has, err = idx.Bstore().Has(ctx, blk1.Cid())
require.NoError(t, err)
require.Equal(t, false, has)

// ... but not block2
has, err = idx.Bstore().Has(blk2.Cid())
has, err = idx.Bstore().Has(ctx, blk2.Cid())
require.NoError(t, err)
require.Equal(t, true, has)
}

func TestCleanBlockStore(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewGCBlockstore(blockstore.NewBlockstore(ds), blockstore.NewGCLocker())

Expand All @@ -661,11 +664,11 @@ func TestCleanBlockStore(t *testing.T) {

// generate random block1 (codec: dagcbor)
blk1 := testutil.CreateRandomBlock(t, idx.Bstore())
require.NoError(t, idx.Bstore().Put(blk1))
require.NoError(t, idx.Bstore().Put(ctx, blk1))

// generate random block2 (codec: raw)
blk2 := gstestutil.GenerateBlocksOfSize(1, 10000)[0]
require.NoError(t, idx.Bstore().Put(blk2))
require.NoError(t, idx.Bstore().Put(ctx, blk2))

// set blk1-ref1 in index
require.NoError(t, idx.SetRef(&DataRef{
Expand All @@ -689,11 +692,11 @@ func TestCleanBlockStore(t *testing.T) {
require.Equal(t, ref2.PayloadCID, blk2.Cid())

// check if bstore has blocks
has, err := idx.Bstore().Has(blk1.Cid())
has, err := idx.Bstore().Has(ctx, blk1.Cid())
require.NoError(t, err)
require.Equal(t, true, has)

has, err = idx.Bstore().Has(blk2.Cid())
has, err = idx.Bstore().Has(ctx, blk2.Cid())
require.NoError(t, err)
require.Equal(t, true, has)

Expand All @@ -706,12 +709,12 @@ func TestCleanBlockStore(t *testing.T) {
require.NoError(t, err)

// check if CleanBlockStore did remove tagged block1 ...
has, err = idx.Bstore().Has(blk1.Cid())
has, err = idx.Bstore().Has(ctx, blk1.Cid())
require.NoError(t, err)
require.Equal(t, false, has)

// ... but not block2
has, err = idx.Bstore().Has(blk2.Cid())
has, err = idx.Bstore().Has(ctx, blk2.Cid())
require.NoError(t, err)
require.Equal(t, true, has)

Expand All @@ -721,6 +724,7 @@ func TestCleanBlockStore(t *testing.T) {
}

func TestCleanBlockStoreRecover(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewGCBlockstore(blockstore.NewBlockstore(ds), blockstore.NewGCLocker())

Expand All @@ -729,11 +733,11 @@ func TestCleanBlockStoreRecover(t *testing.T) {

// generate random block1 (codec: dagcbor)
blk1 := testutil.CreateRandomBlock(t, idx.Bstore())
require.NoError(t, idx.Bstore().Put(blk1))
require.NoError(t, idx.Bstore().Put(ctx, blk1))

// generate random block2 (codec: raw)
blk2 := gstestutil.GenerateBlocksOfSize(1, 10000)[0]
require.NoError(t, idx.Bstore().Put(blk2))
require.NoError(t, idx.Bstore().Put(ctx, blk2))

// set blk1-ref1 in index
require.NoError(t, idx.SetRef(&DataRef{
Expand All @@ -747,7 +751,7 @@ func TestCleanBlockStoreRecover(t *testing.T) {
PayloadSize: int64(len(blk2.RawData())),
}))

require.NoError(t, bs.DeleteBlock(blk1.Cid()))
require.NoError(t, bs.DeleteBlock(ctx, blk1.Cid()))

// should not return any error and clean it up from the blockstore
require.NoError(t, idx.CleanBlockStore(context.TODO()))
Expand Down
11 changes: 3 additions & 8 deletions exchange/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (opts Options) fillDefaults(ctx context.Context, h host.Host, ds datastore.
opts.Blockstore = blockstore.NewGCBlockstore(opts.Blockstore, blockstore.NewGCLocker())
}
if opts.MultiStore == nil {
opts.MultiStore, err = multistore.NewMultiDstore(ds)
opts.MultiStore, err = multistore.NewMultiDstore(ctx, ds)
if err != nil {
return opts, err
}
Expand Down Expand Up @@ -145,24 +145,19 @@ func (opts Options) fillDefaults(ctx context.Context, h host.Host, ds datastore.
opts.PPB = big.NewInt(0)
}


return opts, nil
}

// NewDataTransfer packages together all the things needed for a new manager to work
func NewDataTransfer(ctx context.Context, h host.Host, gs graphsync.GraphExchange, ds datastore.Batching, dsprefix string, dir string) (datatransfer.Manager, error) {
cidDir, err := mkCidListDir(dir)
if err != nil {
return nil, err
}
// Create a special key for persisting the datatransfer manager state
dtDs := namespace.Wrap(ds, datastore.NewKey(dsprefix+"-datatransfer"))
// Setup datatransfer network
dtNet := dtnet.NewFromLibp2pHost(h)
// Setup graphsync transport
tp := gstransport.NewTransport(h.ID(), gs, dtNet)
tp := gstransport.NewTransport(h.ID(), gs)
// Build the manager
dt, err := dtfimpl.NewDataTransfer(dtDs, cidDir, dtNet, tp)
dt, err := dtfimpl.NewDataTransfer(dtDs, dtNet, tp)
if err != nil {
return nil, err
}
Expand Down
7 changes: 2 additions & 5 deletions exchange/peermgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestHeyEvtPeerMgr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

mn := mocknet.New(ctx)
mn := mocknet.New()
n1 := testutil.NewTestNode(mn, t)
n2 := testutil.NewTestNode(mn, t)

Expand Down Expand Up @@ -48,10 +48,7 @@ func TestHeyEvtPeerMgr(t *testing.T) {
}

func TestRecordLatency(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

mn := mocknet.New(ctx)
mn := mocknet.New()
n1 := testutil.NewTestNode(mn, t)
n2 := testutil.NewTestNode(mn, t)
idx, err := NewIndex(n1.Ds, n1.Bs)
Expand Down
6 changes: 3 additions & 3 deletions exchange/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (r *Replication) pumpIndexes(ctx context.Context, sub event.Subscription) {
if res.err == nil {
go func(rt cid.Cid) {
store := r.GetStore(rt)
err := r.idx.LoadInterest(rt, cbor.NewCborStore(store.Bstore))
err := r.idx.LoadInterest(rt, cbor.NewCborStore(utils.NewBlockstoreWrapper(store.Bstore)))
if err != nil {
log.Error().Err(err).Msg("failed to load interest")
return
Expand Down Expand Up @@ -326,7 +326,7 @@ func (r *Replication) fetchIndex(ctx context.Context, hvt HeyEvt) error {

// AddStore assigns a store for a given root cid and store ID
func (r *Replication) AddStore(k cid.Cid, sid multistore.StoreID) error {
store, err := r.ms.Get(sid)
store, err := r.ms.Get(context.TODO(), sid)
if err != nil {
return err
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func (r *Replication) handleRequest(s network.Stream) {
log.Error().Err(err).Msg("setting ref")
}

if err := r.ms.Delete(sid); err != nil {
if err := r.ms.Delete(context.TODO(), sid); err != nil {
log.Error().Err(err).Msg("deleting store")
}
return
Expand Down
Loading