Skip to content

feat: NewDAGProvider to walk partial DAGs in offline mode #905

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ The following emojis are used to highlight certain changes:

### Added

- `provider`: added support for walking partial DAGs in offline mode [#905](https://github.com/ipfs/boxo/pull/905)
- a `KeyChanFunc` that traverses DAGs from a given root (`NewDAGProvider`).
- a `KeyChanFunc` that buffers all the CIDs in memory from another `KeyChanFunc` (`NewBufferedProvider`).
- `fetcher/impl/blockservice`: new option `SkipNotFound` for the IPLD fetcher. It will skip not found nodes when traversing the DAG. This allows offline traversal of DAGs when using, for example, an offline blockservice.
- This enables use case of providing lazy-loaded, partially local DAGs (like `ipfs files` in Kubo's MFS implementation, see [kubo#10386](https://github.com/ipfs/kubo/issues/10386))
- `gateway`: generated HTML with UnixFS directory listings now include a button for copying CIDs of child entities [#899](https://github.com/ipfs/boxo/pull/899)

### Changed
Expand All @@ -35,7 +40,7 @@ The following emojis are used to highlight certain changes:
### Security


## [v0.29.0]
## [v0.29.1]

### Changed

Expand Down
13 changes: 11 additions & 2 deletions fetcher/impl/blockservice/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/fetcher"
format "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
Expand All @@ -26,6 +27,10 @@ type FetcherConfig struct {
blockService blockservice.BlockService
NodeReifier ipld.NodeReifier
PrototypeChooser traversal.LinkTargetNodePrototypeChooser

// SkipNotFound skips "ipld: not found" errors when traversing a DAG,
// allowing to traverse incomplete DAGs or doing offline traversals.
SkipNotFound bool
}

// NewFetcherConfig creates a FetchConfig from which session may be created and nodes retrieved.
Expand All @@ -47,7 +52,7 @@ func (fc FetcherConfig) FetcherWithSession(ctx context.Context, s *blockservice.
// while we may be loading blocks remotely, they are already hash verified by the time they load
// into ipld-prime
ls.TrustedStorage = true
ls.StorageReadOpener = blockOpener(ctx, s)
ls.StorageReadOpener = blockOpener(ctx, s, fc.SkipNotFound)
ls.NodeReifier = fc.NodeReifier

protoChooser := fc.PrototypeChooser
Expand All @@ -61,6 +66,7 @@ func (fc FetcherConfig) WithReifier(nr ipld.NodeReifier) fetcher.Factory {
blockService: fc.blockService,
NodeReifier: nr,
PrototypeChooser: fc.PrototypeChooser,
SkipNotFound: fc.SkipNotFound,
}
}

Expand Down Expand Up @@ -131,7 +137,7 @@ var DefaultPrototypeChooser = func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld
return basicnode.Prototype.Any, nil
}

func blockOpener(ctx context.Context, bs *blockservice.Session) ipld.BlockReadOpener {
func blockOpener(ctx context.Context, bs *blockservice.Session, offline bool) ipld.BlockReadOpener {
return func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
cidLink, ok := lnk.(cidlink.Link)
if !ok {
Expand All @@ -140,6 +146,9 @@ func blockOpener(ctx context.Context, bs *blockservice.Session) ipld.BlockReadOp

blk, err := bs.GetBlock(ctx, cidLink.Cid)
if err != nil {
if format.IsNotFound(err) && offline {
return nil, traversal.SkipMe{}
}
return nil, err
}

Expand Down
55 changes: 55 additions & 0 deletions provider/dagprovider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package provider

import (
"context"
"fmt"

"github.com/ipfs/boxo/fetcher"
fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
)

func NewDAGProvider(root cid.Cid, fetchConfig fetcher.Factory) KeyChanFunc {
log.Debugw("DAG provider initialized", "root", root)
return func(ctx context.Context) (<-chan cid.Cid, error) {
if root == cid.Undef {
return nil, fmt.Errorf("root CID cannot be empty")
}

set := cidutil.NewStreamingSet()

go func() {
log.Debugw("DAG provider starting the walk via BlockAll", "root", root)
defer close(set.New)
session := fetchConfig.NewSession(ctx)
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: root}, func(res fetcher.FetchResult) error {
clink, ok := res.LastBlockLink.(cidlink.Link)
if ok {
// if context is cancelled, nothing is written to new()
_ = set.Visitor(ctx)(clink.Cid)
}

select {
case <-ctx.Done():
// halts traversal
return ctx.Err()
default:
}
return nil
})
if err != nil {
if _, ok := err.(traversal.SkipMe); ok {
log.Warnw("dagprovider skipped further dag traversal", "root", root)
} else {
log.Errorf("dagprovider dag traversal error from root %s: %s", root, err)
}
return
}
}()

return set.New, nil
}
}
149 changes: 149 additions & 0 deletions provider/dagprovider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package provider

import (
"context"
"testing"
"time"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/fetcher"
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/boxo/ipld/merkledag"
mdutils "github.com/ipfs/boxo/ipld/merkledag/test"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/stretchr/testify/require"
)

type dagProviderHelper struct {
Datastore datastore.Datastore
Blockstore blockstore.Blockstore
BlockService blockservice.BlockService
FetcherFactory fetcher.Factory
DAGService format.DAGService
KeyChanF KeyChanFunc
Cids []cid.Cid
}

func makeDAGProvider(t *testing.T, ctx context.Context, fanout, depth uint) *dagProviderHelper {
t.Helper()

ds := dssync.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewBlockstore(ds)
bserv := blockservice.New(bs, offline.Exchange(bs))
ipldFetcher := bsfetcher.NewFetcherConfig(bserv)
ipldFetcher.SkipNotFound = true
ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify)

dserv := merkledag.NewDAGService(bserv)
daggen := mdutils.NewDAGGenerator()

root, allCids, err := daggen.MakeDagNode(dserv.Add, fanout, depth)
require.NoError(t, err)
t.Logf("Generated %d CIDs. Root: %s", len(allCids), root)

keyChanF := NewDAGProvider(root, unixFSFetcher)
return &dagProviderHelper{
Datastore: ds,
Blockstore: bs,
BlockService: bserv,
FetcherFactory: unixFSFetcher,
DAGService: dserv,
KeyChanF: keyChanF,
Cids: allCids,
}
}

func TestNewDAGProvider(t *testing.T) {
ctx := context.Background()
dph := makeDAGProvider(t, ctx, 5, 2)
keyChanF := dph.KeyChanF
allCids := dph.Cids

cidMap := make(map[cid.Cid]struct{})
cidCh, err := keyChanF(ctx)
require.NoError(t, err)

for c := range cidCh {
cidMap[c] = struct{}{}
}

t.Logf("Collected %d CIDs", len(cidMap))

for _, c := range allCids {
if _, ok := cidMap[c]; !ok {
t.Errorf("%s not traversed", c)
}
}
require.Equal(t, len(allCids), len(cidMap), "number of traversed CIDs does not match CIDs in DAG")
}

func TestNewDAGProviderCtxCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
dph := makeDAGProvider(t, ctx, 5, 2)
keyChanF := dph.KeyChanF
cidMap := make(map[cid.Cid]struct{})
cidCh, err := keyChanF(ctx)
require.NoError(t, err)

done := make(chan struct{})
go func() {
defer close(done)
for c := range cidCh {
cidMap[c] = struct{}{}
if len(cidMap) == 3 {
time.Sleep(time.Second)
}
}

}()

time.Sleep(500 * time.Millisecond)
cancel()
<-done

t.Logf("Collected %d CIDs", len(cidMap))
require.Equal(t, 3, len(cidMap), "number of traversed CIDs when cancelling the context should be 3")
}

func TestNewDAGProviderMissingBlocks(t *testing.T) {
ctx := context.Background()
dph := makeDAGProvider(t, ctx, 1, 10)
keyChanF := dph.KeyChanF
allCids := dph.Cids

// Remove a block from the blockstore.
// since we have a single deep branch of 10 items, we will only be able
// to visit 0, 1 and 2
err := dph.Blockstore.DeleteBlock(ctx, allCids[3])
require.NoError(t, err)

cidMap := make(map[cid.Cid]struct{})
cidCh, err := keyChanF(ctx)
require.NoError(t, err)

for c := range cidCh {
cidMap[c] = struct{}{}
}

t.Logf("Collected %d CIDs", len(cidMap))

for _, c := range allCids[0:3] {
if _, ok := cidMap[c]; !ok {
t.Errorf("%s should have been traversed", c)
}
}

for _, c := range allCids[3:] {
if _, ok := cidMap[c]; ok {
t.Errorf("%s should not have been traversed", c)
}
}
}