Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions rbdeal/deal_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func (r *ribs) dealTracker(ctx context.Context) {
}
defer closer()

fcdu, err := ributil.NewFullCDU(ctx, gw)
if err != nil {
log.Errorw("failed to create full CDU", "error", err)
return
}

for {
checkStart := time.Now()
select {
Expand All @@ -40,7 +46,7 @@ func (r *ribs) dealTracker(ctx context.Context) {
default:
}

err := r.runDealCheckLoop(ctx, gw)
err := r.runDealCheckLoop(ctx, gw, fcdu)
if err != nil {
log.Errorw("deal check loop failed", "error", err)
}
Expand All @@ -59,13 +65,7 @@ func (r *ribs) dealTracker(ctx context.Context) {
}
}

func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway) error {
gw, closer, err := client.NewGatewayRPCV1(ctx, r.lotusRPCAddr, nil)
if err != nil {
return xerrors.Errorf("creating gateway rpc client: %w", err)
}
defer closer()

func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway, fcdu *ributil.FullCDU) error {
/* PUBLISHED DEAL CHECKS */
/* Wait for published deals to become active (or expire) */

Expand Down Expand Up @@ -111,7 +111,7 @@ func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway) error {
/* Wait for publish at "good-enough" finality */

{
cdm := ributil.CurrentDealInfoManager{CDAPI: gw}
cdm := ributil.CurrentDealInfoManager{CDAPI: fcdu}

toCheck, err := r.db.PublishingDeals()
if err != nil {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (r *ribs) runDealCheckLoop(ctx context.Context, gw api.Gateway) error {

// check market deal state

// Inactive, expired deal cleanup
// Inactive, unpublished, expired deal cleanup
{
head, err := gw.ChainHead(ctx) // todo lookback
if err != nil {
Expand Down
124 changes: 124 additions & 0 deletions ributil/arcbs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package ributil

import (
"context"
"github.com/filecoin-project/lotus/blockstore"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
block "github.com/ipfs/go-libipfs/blocks"
"strings"
)

type ArcBlockstore struct {
write blockstore.Blockstore
cache *lru.ARCCache[cid.Cid, block.Block]
}

var CacheBstoreSize = (2048 << 20) / 16000 // 2G with average block size of 16KB

func ARCStore(base blockstore.Blockstore) *ArcBlockstore {
c, _ := lru.NewARC[cid.Cid, block.Block](CacheBstoreSize)

bs := &ArcBlockstore{
write: base,

cache: c,
}
return bs
}

var (
_ blockstore.Blockstore = (*ArcBlockstore)(nil)
_ blockstore.Viewer = (*ArcBlockstore)(nil)
)

func (bs *ArcBlockstore) Flush(ctx context.Context) error {
return bs.write.Flush(ctx)
}

func (bs *ArcBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return bs.write.AllKeysChan(ctx)
}

func (bs *ArcBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
bs.cache.Remove(c)
return bs.write.DeleteBlock(ctx, c)
}

func (bs *ArcBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
for _, c := range cids {
bs.cache.Remove(c)
}
return bs.write.DeleteMany(ctx, cids)
}

func (bs *ArcBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
if blk, ok := bs.cache.Get(c); ok {
return callback(blk.RawData())
}

return bs.write.View(ctx, c, func(bytes []byte) error {
blk, err := block.NewBlockWithCid(bytes, c)
if err != nil {
return err
}
bs.cache.Add(c, blk)

return callback(bytes)
})
}

func (bs *ArcBlockstore) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
if blk, ok := bs.cache.Get(c); ok {
return blk, nil
}

return bs.write.Get(ctx, c)
}

func (bs *ArcBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
if blk, ok := bs.cache.Get(c); ok {
return len(blk.RawData()), nil
}

b, err := bs.Get(ctx, c)
if err != nil {
if strings.Contains(err.Error(), "ipld: could not find") {
return 0, &ipld.ErrNotFound{Cid: c}
}
return 0, err
}

return len(b.RawData()), nil
}

func (bs *ArcBlockstore) Put(ctx context.Context, blk block.Block) error {
bs.cache.Add(blk.Cid(), blk)

return nil
}

func (bs *ArcBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
if bs.cache.Contains(c) {
return true, nil
}

return bs.write.Has(ctx, c)
}

func (bs *ArcBlockstore) HashOnRead(hor bool) {
bs.write.HashOnRead(hor)
}

func (bs *ArcBlockstore) PutMany(ctx context.Context, blks []block.Block) error {
for _, blk := range blks {
if bs.cache.Contains(blk.Cid()) {
continue
}

bs.cache.Add(blk.Cid(), blk)
}

return nil
}
7 changes: 7 additions & 0 deletions ributil/chainbootstrap/mainnet.pi
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN
/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa
/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb
/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt
/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ
/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ
/dns4/elastic.dag.house/tcp/443/wss/p2p/QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC
2 changes: 1 addition & 1 deletion ributil/currentdealinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (mgr *CurrentDealInfoManager) FindCloseMsgTipset(ctx context.Context, tsk t
return headTs.Key(), nil
}

// load 15 tipsets back to curTs
// load /step/ tipsets back to curTs
headTs = curTs
toLoad := curTs.Height() - step
curTs, err = mgr.CDAPI.ChainGetTipSetByHeight(ctx, toLoad, curTs.Key())
Expand Down
108 changes: 108 additions & 0 deletions ributil/fallbackstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package ributil

import (
"context"
"github.com/filecoin-project/lotus/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"golang.org/x/xerrors"
"strings"
"sync"
"time"
)

type FallbackStore struct {
blockstore.Blockstore

lk sync.RWMutex
// missFn is the function that will be invoked on a local miss to pull the
// block from elsewhere.
missFn func(context.Context, cid.Cid) (blocks.Block, error)
}

func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blocks.Block, error)) {
fbs.lk.Lock()
defer fbs.lk.Unlock()

fbs.missFn = missFn
}

func (fbs *FallbackStore) getFallback(ctx context.Context, c cid.Cid) (blocks.Block, error) {
log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c)
fbs.lk.RLock()
defer fbs.lk.RUnlock()

if fbs.missFn == nil {
// FallbackStore wasn't configured yet (chainstore/bitswap aren't up yet)
// Wait for a bit and retry
fbs.lk.RUnlock()
time.Sleep(5 * time.Second)
fbs.lk.RLock()

if fbs.missFn == nil {
log.Errorw("fallbackstore: missFn not configured yet")
return nil, ipld.ErrNotFound{Cid: c}
}
}

ctx, cancel := context.WithTimeout(ctx, 120*time.Second)
defer cancel()

b, err := fbs.missFn(ctx, c)
if err != nil {
return nil, err
}

// chain bitswap puts blocks in temp blockstore which is cleaned up
// every few min (to drop any messages we fetched but don't want)
// in this case we want to keep this block around
if err := fbs.Put(ctx, b); err != nil {
return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err)
}
return b, nil
}

func (fbs *FallbackStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
b, err := fbs.Blockstore.Get(ctx, c)
switch {
case err == nil:
return b, nil
case ipld.IsNotFound(err) || strings.Contains(err.Error(), "ipld: could not find"):
return fbs.getFallback(ctx, c)
default:
return b, xerrors.Errorf("fbs get: %w", err)
}
}

func (fbs *FallbackStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
sz, err := fbs.Blockstore.GetSize(ctx, c)
switch {
case err == nil:
return sz, nil
case ipld.IsNotFound(err) || strings.Contains(err.Error(), "ipld: could not find"):
b, err := fbs.getFallback(ctx, c)
if err != nil {
return 0, err
}
return len(b.RawData()), nil
default:
return sz, xerrors.Errorf("fbs getsize: %w", err)
}
}

func (fbs *FallbackStore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
err := fbs.Blockstore.View(ctx, c, callback)
switch {
case err == nil:
return nil
case ipld.IsNotFound(err) || strings.Contains(err.Error(), "ipld: could not find"):
b, err := fbs.getFallback(ctx, c)
if err != nil {
return err
}
return callback(b.RawData())
default:
return xerrors.Errorf("fbs view: %w", err)
}
}
Loading