Skip to content

Commit ce7fc7b

Browse files
wswsmaoilyee
andcommittedDec 13, 2024·
Add fuse-manager
Signed-off-by: abushwang <[email protected]> Co-authored-by: Zuti He <[email protected]>
1 parent ff392c1 commit ce7fc7b

File tree

17 files changed

+1828
-124
lines changed

17 files changed

+1828
-124
lines changed
 

‎Makefile

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ REVISION=$(shell git rev-parse HEAD)$(shell if ! git diff --no-ext-diff --quiet
2424
GO_BUILD_LDFLAGS ?= -s -w
2525
GO_LD_FLAGS=-ldflags '$(GO_BUILD_LDFLAGS) -X $(PKG)/version.Version=$(VERSION) -X $(PKG)/version.Revision=$(REVISION) $(GO_EXTRA_LDFLAGS)'
2626

27-
CMD=containerd-stargz-grpc ctr-remote stargz-store
27+
CMD=containerd-stargz-grpc ctr-remote stargz-store stargz-fuse-manager
2828

2929
CMD_BINARIES=$(addprefix $(PREFIX),$(CMD))
3030

@@ -48,6 +48,9 @@ stargz-store: FORCE
4848
stargz-store-helper: FORCE
4949
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-store/helper
5050

51+
stargz-fuse-manager: FORCE
52+
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(PREFIX)$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) -v ./stargz-fuse-manager
53+
5154
check:
5255
@echo "$@"
5356
@GO111MODULE=$(GO111MODULE_VALUE) $(shell go env GOPATH)/bin/golangci-lint run
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fsopts
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"path/filepath"
24+
25+
"github.com/containerd/log"
26+
dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db"
27+
ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs"
28+
"github.com/containerd/stargz-snapshotter/fs"
29+
"github.com/containerd/stargz-snapshotter/metadata"
30+
memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory"
31+
bolt "go.etcd.io/bbolt"
32+
)
33+
34+
type Config struct {
35+
EnableIpfs bool
36+
MetadataStore string
37+
}
38+
39+
const (
40+
memoryMetadataType = "memory"
41+
dbMetadataType = "db"
42+
)
43+
44+
func ConfigFsOpts(ctx context.Context, rootDir string, config *Config) ([]fs.Option, error) {
45+
fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)}
46+
47+
if config.EnableIpfs {
48+
fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler)))
49+
}
50+
51+
mt, err := getMetadataStore(rootDir, config)
52+
if err != nil {
53+
return nil, fmt.Errorf("failed to configure metadata store: %w", err)
54+
}
55+
fsOpts = append(fsOpts, fs.WithMetadataStore(mt))
56+
57+
return fsOpts, nil
58+
}
59+
60+
func getMetadataStore(rootDir string, config *Config) (metadata.Store, error) {
61+
switch config.MetadataStore {
62+
case "", memoryMetadataType:
63+
return memorymetadata.NewReader, nil
64+
case dbMetadataType:
65+
bOpts := bolt.Options{
66+
NoFreelistSync: true,
67+
InitialMmapSize: 64 * 1024 * 1024,
68+
FreelistType: bolt.FreelistMapType,
69+
}
70+
db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts)
71+
if err != nil {
72+
return nil, err
73+
}
74+
return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) {
75+
return dbmetadata.NewReader(db, sr, opts...)
76+
}, nil
77+
default:
78+
return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v",
79+
config.MetadataStore, memoryMetadataType, dbMetadataType)
80+
}
81+
}

‎cmd/containerd-stargz-grpc/main.go

+87-100
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,32 @@ import (
2020
"context"
2121
"flag"
2222
"fmt"
23-
"io"
2423
golog "log"
2524
"math/rand"
2625
"net"
2726
"net/http"
2827
"os"
28+
"os/exec"
2929
"os/signal"
3030
"path/filepath"
3131
"time"
3232

3333
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
3434
"github.com/containerd/containerd/v2/contrib/snapshotservice"
3535
"github.com/containerd/containerd/v2/core/snapshots"
36-
"github.com/containerd/containerd/v2/defaults"
37-
"github.com/containerd/containerd/v2/pkg/dialer"
3836
"github.com/containerd/containerd/v2/pkg/sys"
3937
"github.com/containerd/log"
40-
dbmetadata "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/db"
41-
ipfs "github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/ipfs"
42-
"github.com/containerd/stargz-snapshotter/fs"
43-
"github.com/containerd/stargz-snapshotter/metadata"
44-
memorymetadata "github.com/containerd/stargz-snapshotter/metadata/memory"
38+
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
39+
"github.com/containerd/stargz-snapshotter/fusemanager"
4540
"github.com/containerd/stargz-snapshotter/service"
46-
"github.com/containerd/stargz-snapshotter/service/keychain/cri"
47-
"github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig"
48-
"github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig"
49-
"github.com/containerd/stargz-snapshotter/service/resolver"
41+
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
42+
snbase "github.com/containerd/stargz-snapshotter/snapshot"
5043
"github.com/containerd/stargz-snapshotter/version"
5144
sddaemon "github.com/coreos/go-systemd/v22/daemon"
5245
metrics "github.com/docker/go-metrics"
5346
"github.com/pelletier/go-toml"
54-
bolt "go.etcd.io/bbolt"
5547
"golang.org/x/sys/unix"
5648
"google.golang.org/grpc"
57-
"google.golang.org/grpc/backoff"
58-
"google.golang.org/grpc/credentials/insecure"
59-
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
6049
)
6150

6251
const (
@@ -65,14 +54,19 @@ const (
6554
defaultLogLevel = log.InfoLevel
6655
defaultRootDir = "/var/lib/containerd-stargz-grpc"
6756
defaultImageServiceAddress = "/run/containerd/containerd.sock"
57+
defaultFuseManagerAddress = "/run/containerd-stargz-grpc/fuse-manager.sock"
58+
59+
fuseManagerBin = "stargz-fuse-manager"
60+
fuseManagerAddress = "fuse-manager.sock"
6861
)
6962

7063
var (
71-
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
72-
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
73-
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
74-
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
75-
printVersion = flag.Bool("version", false, "print the version")
64+
address = flag.String("address", defaultAddress, "address for the snapshotter's GRPC server")
65+
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
66+
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
67+
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
68+
detachFuseManager = flag.Bool("detach-fuse-manager", false, "whether detach fusemanager or not")
69+
printVersion = flag.Bool("version", false, "print the version")
7670
)
7771

7872
type snapshotterConfig struct {
@@ -92,6 +86,11 @@ type snapshotterConfig struct {
9286

9387
// MetadataStore is the type of the metadata store to use.
9488
MetadataStore string `toml:"metadata_store" default:"memory"`
89+
// FuseManagerAddress is address for the fusemanager's GRPC server
90+
FuseManagerAddress string `toml:"fusemanager_address"`
91+
92+
// FuseManagerPath is path to the fusemanager's executable
93+
FuseManagerPath string `toml:"fusemanager_path"`
9594
}
9695

9796
func main() {
@@ -140,51 +139,84 @@ func main() {
140139
}
141140

142141
// Configure keychain
143-
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)}
144-
if config.Config.KubeconfigKeychainConfig.EnableKeychain {
145-
var opts []kubeconfig.Option
146-
if kcp := config.Config.KubeconfigKeychainConfig.KubeconfigPath; kcp != "" {
147-
opts = append(opts, kubeconfig.WithKubeconfigPath(kcp))
148-
}
149-
credsFuncs = append(credsFuncs, kubeconfig.NewKubeconfigKeychain(ctx, opts...))
142+
keyChainConfig := keychainconfig.Config{
143+
EnableKubeKeychain: config.Config.KubeconfigKeychainConfig.EnableKeychain,
144+
EnableCRIKeychain: config.Config.CRIKeychainConfig.EnableKeychain,
145+
KubeconfigPath: config.Config.KubeconfigKeychainConfig.KubeconfigPath,
146+
DefaultImageServiceAddress: defaultImageServiceAddress,
147+
ImageServicePath: config.CRIKeychainConfig.ImageServicePath,
150148
}
151-
if config.Config.CRIKeychainConfig.EnableKeychain {
152-
// connects to the backend CRI service (defaults to containerd socket)
153-
criAddr := defaultImageServiceAddress
154-
if cp := config.CRIKeychainConfig.ImageServicePath; cp != "" {
155-
criAddr = cp
156-
}
157-
connectCRI := func() (runtime.ImageServiceClient, error) {
158-
conn, err := newCRIConn(criAddr)
149+
150+
var rs snapshots.Snapshotter
151+
if *detachFuseManager {
152+
fmPath := config.FuseManagerPath
153+
if fmPath == "" {
154+
var err error
155+
fmPath, err = exec.LookPath(fuseManagerBin)
159156
if err != nil {
160-
return nil, err
157+
log.G(ctx).WithError(err).Fatalf("failed to find fusemanager bin")
161158
}
162-
return runtime.NewImageServiceClient(conn), nil
163159
}
164-
f, criServer := cri.NewCRIKeychain(ctx, connectCRI)
165-
runtime.RegisterImageServiceServer(rpc, criServer)
166-
credsFuncs = append(credsFuncs, f)
167-
}
168-
fsOpts := []fs.Option{fs.WithMetricsLogLevel(log.InfoLevel)}
169-
if config.IPFS {
170-
fsOpts = append(fsOpts, fs.WithResolveHandler("ipfs", new(ipfs.ResolveHandler)))
171-
}
172-
mt, err := getMetadataStore(*rootDir, config)
173-
if err != nil {
174-
log.G(ctx).WithError(err).Fatalf("failed to configure metadata store")
175-
}
176-
fsOpts = append(fsOpts, fs.WithMetadataStore(mt))
177-
rs, err := service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
178-
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))
179-
if err != nil {
180-
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
160+
fmAddr := config.FuseManagerAddress
161+
if fmAddr == "" {
162+
fmAddr = defaultFuseManagerAddress
163+
}
164+
165+
if !filepath.IsAbs(fmAddr) {
166+
log.G(ctx).WithError(err).Fatalf("fuse manager address must be an absolute path: %s", fmAddr)
167+
}
168+
err := fusemanager.StartFuseManager(ctx, fmPath, fmAddr, filepath.Join(*rootDir, "fusestore.db"), *logLevel, filepath.Join(*rootDir, "stargz-fuse-manager.log"))
169+
if err != nil {
170+
log.G(ctx).WithError(err).Fatalf("failed to start fusemanager")
171+
}
172+
173+
fuseManagerConfig := fusemanager.Config{
174+
Config: &config.Config,
175+
IPFS: config.IPFS,
176+
MetadataStore: config.MetadataStore,
177+
DefaultImageServiceAddress: defaultImageServiceAddress,
178+
}
179+
180+
fs, err := fusemanager.NewManagerClient(ctx, *rootDir, fmAddr, &fuseManagerConfig)
181+
if err != nil {
182+
log.G(ctx).WithError(err).Fatalf("failed to configure fusemanager")
183+
}
184+
rs, err = snbase.NewSnapshotter(ctx, filepath.Join(*rootDir, "snapshotter"), fs, snbase.AsynchronousRemove)
185+
if err != nil {
186+
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
187+
}
188+
log.G(ctx).Infof("Start snapshotter with fusemanager mode")
189+
} else {
190+
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig)
191+
if err != nil {
192+
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
193+
}
194+
195+
fsConfig := fsopts.Config{
196+
EnableIpfs: config.IPFS,
197+
MetadataStore: config.MetadataStore,
198+
}
199+
fsOpts, err := fsopts.ConfigFsOpts(ctx, *rootDir, &fsConfig)
200+
if err != nil {
201+
log.G(ctx).WithError(err).Fatalf("failed to configure fs config")
202+
}
203+
204+
rs, err = service.NewStargzSnapshotterService(ctx, *rootDir, &config.Config,
205+
service.WithCredsFuncs(credsFuncs...), service.WithFilesystemOptions(fsOpts...))
206+
if err != nil {
207+
log.G(ctx).WithError(err).Fatalf("failed to configure snapshotter")
208+
}
181209
}
182210

183211
cleanup, err := serve(ctx, rpc, *address, rs, config)
184212
if err != nil {
185213
log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter")
186214
}
187215

216+
// TODO: In detach mode, rs is taken over by fusemanager,
217+
// but client will send unmount request to fusemanager,
218+
// and fusemanager need get mount info from local db to
219+
// determine its behavior
188220
if cleanup {
189221
log.G(ctx).Debug("Closing the snapshotter")
190222
rs.Close()
@@ -275,48 +307,3 @@ func serve(ctx context.Context, rpc *grpc.Server, addr string, rs snapshots.Snap
275307
}
276308
return false, nil
277309
}
278-
279-
const (
280-
memoryMetadataType = "memory"
281-
dbMetadataType = "db"
282-
)
283-
284-
func getMetadataStore(rootDir string, config snapshotterConfig) (metadata.Store, error) {
285-
switch config.MetadataStore {
286-
case "", memoryMetadataType:
287-
return memorymetadata.NewReader, nil
288-
case dbMetadataType:
289-
bOpts := bolt.Options{
290-
NoFreelistSync: true,
291-
InitialMmapSize: 64 * 1024 * 1024,
292-
FreelistType: bolt.FreelistMapType,
293-
}
294-
db, err := bolt.Open(filepath.Join(rootDir, "metadata.db"), 0600, &bOpts)
295-
if err != nil {
296-
return nil, err
297-
}
298-
return func(sr *io.SectionReader, opts ...metadata.Option) (metadata.Reader, error) {
299-
return dbmetadata.NewReader(db, sr, opts...)
300-
}, nil
301-
default:
302-
return nil, fmt.Errorf("unknown metadata store type: %v; must be %v or %v",
303-
config.MetadataStore, memoryMetadataType, dbMetadataType)
304-
}
305-
}
306-
307-
func newCRIConn(criAddr string) (*grpc.ClientConn, error) {
308-
// TODO: make gRPC options configurable from config.toml
309-
backoffConfig := backoff.DefaultConfig
310-
backoffConfig.MaxDelay = 3 * time.Second
311-
connParams := grpc.ConnectParams{
312-
Backoff: backoffConfig,
313-
}
314-
gopts := []grpc.DialOption{
315-
grpc.WithTransportCredentials(insecure.NewCredentials()),
316-
grpc.WithConnectParams(connParams),
317-
grpc.WithContextDialer(dialer.ContextDialer),
318-
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
319-
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
320-
}
321-
return grpc.NewClient(dialer.DialAddress(criAddr), gopts...)
322-
}

‎cmd/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ require (
2626
golang.org/x/sync v0.9.0
2727
golang.org/x/sys v0.26.0
2828
google.golang.org/grpc v1.68.0
29-
k8s.io/cri-api v0.32.0-alpha.0
3029
)
3130

3231
require (
@@ -141,6 +140,7 @@ require (
141140
k8s.io/api v0.31.2 // indirect
142141
k8s.io/apimachinery v0.31.2 // indirect
143142
k8s.io/client-go v0.31.2 // indirect
143+
k8s.io/cri-api v0.32.0-alpha.0 // indirect
144144
k8s.io/klog/v2 v2.130.1 // indirect
145145
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
146146
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect

‎cmd/stargz-fuse-manager/main.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
21+
fusemanager "github.com/containerd/stargz-snapshotter/fusemanager"
22+
"github.com/containerd/stargz-snapshotter/service"
23+
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
24+
)
25+
26+
func init() {
27+
fusemanager.RegisterConfigFunc(func(cc *fusemanager.ConfigContext) ([]service.Option, error) {
28+
fsConfig := fsopts.Config{
29+
EnableIpfs: cc.Config.IPFS,
30+
MetadataStore: cc.Config.MetadataStore,
31+
}
32+
fsOpts, err := fsopts.ConfigFsOpts(cc.Ctx, cc.RootDir, &fsConfig)
33+
if err != nil {
34+
return nil, err
35+
}
36+
return []service.Option{service.WithFilesystemOptions(fsOpts...)}, nil
37+
})
38+
39+
fusemanager.RegisterConfigFunc(func(cc *fusemanager.ConfigContext) ([]service.Option, error) {
40+
keyChainConfig := keychainconfig.Config{
41+
EnableKubeKeychain: cc.Config.Config.KubeconfigKeychainConfig.EnableKeychain,
42+
EnableCRIKeychain: cc.Config.Config.CRIKeychainConfig.EnableKeychain,
43+
KubeconfigPath: cc.Config.Config.KubeconfigKeychainConfig.KubeconfigPath,
44+
DefaultImageServiceAddress: cc.Config.DefaultImageServiceAddress,
45+
ImageServicePath: cc.Config.Config.CRIKeychainConfig.ImageServicePath,
46+
}
47+
credsFuncs, err := keychainconfig.ConfigKeychain(cc.Ctx, cc.Server, &keyChainConfig)
48+
if err != nil {
49+
return nil, err
50+
}
51+
return []service.Option{service.WithCredsFuncs(credsFuncs...)}, nil
52+
})
53+
}
54+
55+
func main() {
56+
fusemanager.Run()
57+
}

‎docs/overview.md

+23
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,29 @@ root@1d43741b8d29:/go# cat /.stargz-snapshotter/*
9595
{"digest":"sha256:f077511be7d385c17ba88980379c5cd0aab7068844dffa7a1cefbf68cc3daea3","size":580,"fetchedSize":580,"fetchedPercent":100}
9696
```
9797

98+
## Fuse Manager
99+
100+
The fuse manager is designed to maintain the availability of running containers by managing the lifecycle of FUSE mountpoints independently from the stargz snapshotter.
101+
102+
### Fuse Manager Overview
103+
Remote snapshots are mounted using FUSE, and its filesystem processes are attached to the stargz snapshotter. If the stargz snapshotter restarts (due to configuration changes or crashes), all filesystem processes will be killed and restarted, which causes the remount of FUSE mountpoints, making running containers unavailable.
104+
105+
To avoid this, we use a fuse daemon called the fuse manager to handle filesystem processes. The fuse manager is responsible for mounting and unmounting remote snapshotters. Its process is detached from the stargz snapshotter main process to an independent one in a shim-like way during the snapshotter's startup. This design ensures that the restart of the snapshotter won't affect the filesystem processes it manages, keeping mountpoints and running containers available during the restart. However, it is important to note that the restart of the fuse manager itself triggers a remount, so it is recommended to keep the fuse manager running in a good state.
106+
107+
You can enable the fuse manager by adding the flag `--detach-fuse-manager=true` to the stargz snapshotter.
108+
109+
### Upgrading Fuse Manager
110+
111+
When upgrading the fuse manager, it's recommended to follow these steps:
112+
113+
1. Stop the containers using the stargz snapshotter.
114+
2. Stop the fuse manager and containerd-stargz-grpc process.
115+
3. Upgrade the your binary.
116+
4. Restart the containerd-stargz-grpc process.
117+
5. Restart the containers.
118+
119+
This ensures a clean upgrade without impacting running containers.
120+
98121
## Registry-related configuration
99122

100123
You can configure stargz snapshotter for accessing registries with custom configurations.

‎fs/fs.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ const (
7272
)
7373

7474
var fusermountBin = []string{"fusermount", "fusermount3"}
75+
var (
76+
nsLock = sync.Mutex{}
77+
78+
ns *metrics.Namespace
79+
metricsCtr *layermetrics.Controller
80+
)
7581

7682
type Option func(*options)
7783

@@ -160,18 +166,20 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
160166
return nil, fmt.Errorf("failed to setup resolver: %w", err)
161167
}
162168

163-
var ns *metrics.Namespace
164-
if !cfg.NoPrometheus {
169+
nsLock.Lock()
170+
defer nsLock.Unlock()
171+
172+
if !cfg.NoPrometheus && ns == nil {
165173
ns = metrics.NewNamespace("stargz", "fs", nil)
166174
logLevel := log.DebugLevel
167175
if fsOpts.metricsLogLevel != nil {
168176
logLevel = *fsOpts.metricsLogLevel
169177
}
170178
commonmetrics.Register(logLevel) // Register common metrics. This will happen only once.
179+
metrics.Register(ns) // Register layer metrics.
171180
}
172-
c := layermetrics.NewLayerMetrics(ns)
173-
if ns != nil {
174-
metrics.Register(ns) // Register layer metrics.
181+
if metricsCtr == nil {
182+
metricsCtr = layermetrics.NewLayerMetrics(ns)
175183
}
176184

177185
return &filesystem{
@@ -185,7 +193,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
185193
backgroundTaskManager: tm,
186194
allowNoVerification: cfg.AllowNoVerification,
187195
disableVerification: cfg.DisableVerification,
188-
metricsController: c,
196+
metricsController: metricsCtr,
189197
attrTimeout: attrTimeout,
190198
entryTimeout: entryTimeout,
191199
}, nil

‎fusemanager/api/api.pb.go

+566
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎fusemanager/api/api.proto

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
syntax = "proto3";
18+
19+
option go_package = "github.com/stargz-snapshotter/fusemanager/api";
20+
21+
package fusemanager;
22+
23+
service StargzFuseManagerService {
24+
rpc Status (StatusRequest) returns (StatusResponse);
25+
rpc Init (InitRequest) returns (Response);
26+
rpc Mount (MountRequest) returns (Response);
27+
rpc Check (CheckRequest) returns (Response);
28+
rpc Unmount (UnmountRequest) returns (Response);
29+
}
30+
31+
message StatusRequest {
32+
}
33+
34+
message InitRequest {
35+
string root = 1;
36+
bytes config = 2;
37+
}
38+
39+
message MountRequest {
40+
string mountpoint = 1;
41+
map<string, string> labels = 2;
42+
}
43+
44+
message CheckRequest {
45+
string mountpoint = 1;
46+
map<string, string> labels = 2;
47+
}
48+
49+
message UnmountRequest {
50+
string mountpoint = 1;
51+
}
52+
53+
message StatusResponse {
54+
int32 status = 1;
55+
}
56+
57+
message Response {
58+
}

‎fusemanager/api/generate.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
//go:generate protoc --gogo_out=paths=source_relative,plugins=grpc:. api.proto

‎fusemanager/client.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fusemanager
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
24+
"github.com/containerd/containerd/v2/defaults"
25+
"github.com/containerd/containerd/v2/pkg/dialer"
26+
"github.com/containerd/log"
27+
"google.golang.org/grpc"
28+
"google.golang.org/grpc/backoff"
29+
"google.golang.org/grpc/credentials/insecure"
30+
31+
pb "github.com/containerd/stargz-snapshotter/fusemanager/api"
32+
"github.com/containerd/stargz-snapshotter/snapshot"
33+
)
34+
35+
type Client struct {
36+
client pb.StargzFuseManagerServiceClient
37+
}
38+
39+
func NewManagerClient(ctx context.Context, root, socket string, config *Config) (snapshot.FileSystem, error) {
40+
grpcCli, err := newClient(socket)
41+
if err != nil {
42+
return nil, err
43+
}
44+
45+
client := &Client{
46+
client: grpcCli,
47+
}
48+
49+
err = client.init(ctx, root, config)
50+
if err != nil {
51+
return nil, err
52+
}
53+
54+
return client, nil
55+
}
56+
57+
func newClient(socket string) (pb.StargzFuseManagerServiceClient, error) {
58+
connParams := grpc.ConnectParams{
59+
Backoff: backoff.DefaultConfig,
60+
}
61+
gopts := []grpc.DialOption{
62+
grpc.WithTransportCredentials(insecure.NewCredentials()),
63+
grpc.WithConnectParams(connParams),
64+
grpc.WithContextDialer(dialer.ContextDialer),
65+
grpc.WithDefaultCallOptions(
66+
grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize),
67+
grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize),
68+
),
69+
}
70+
71+
conn, err := grpc.NewClient(fmt.Sprintf("unix://%s", socket), gopts...)
72+
if err != nil {
73+
return nil, err
74+
}
75+
76+
return pb.NewStargzFuseManagerServiceClient(conn), nil
77+
}
78+
79+
func (cli *Client) init(ctx context.Context, root string, config *Config) error {
80+
configBytes, err := json.Marshal(config)
81+
if err != nil {
82+
return err
83+
}
84+
85+
req := &pb.InitRequest{
86+
Root: root,
87+
Config: configBytes,
88+
}
89+
90+
_, err = cli.client.Init(ctx, req)
91+
if err != nil {
92+
log.G(ctx).WithError(err).Errorf("failed to call Init")
93+
return err
94+
}
95+
96+
return nil
97+
}
98+
99+
func (cli *Client) Mount(ctx context.Context, mountpoint string, labels map[string]string) error {
100+
req := &pb.MountRequest{
101+
Mountpoint: mountpoint,
102+
Labels: labels,
103+
}
104+
105+
_, err := cli.client.Mount(ctx, req)
106+
if err != nil {
107+
log.G(ctx).WithError(err).Errorf("failed to call Mount")
108+
return err
109+
}
110+
111+
return nil
112+
}
113+
114+
func (cli *Client) Check(ctx context.Context, mountpoint string, labels map[string]string) error {
115+
req := &pb.CheckRequest{
116+
Mountpoint: mountpoint,
117+
Labels: labels,
118+
}
119+
120+
_, err := cli.client.Check(ctx, req)
121+
if err != nil {
122+
log.G(ctx).WithError(err).Errorf("failed to call Check")
123+
return err
124+
}
125+
126+
return nil
127+
}
128+
129+
func (cli *Client) Unmount(ctx context.Context, mountpoint string) error {
130+
req := &pb.UnmountRequest{
131+
Mountpoint: mountpoint,
132+
}
133+
134+
_, err := cli.client.Unmount(ctx, req)
135+
if err != nil {
136+
log.G(ctx).WithError(err).Errorf("failed to call Unmount")
137+
return err
138+
}
139+
140+
return nil
141+
}

‎fusemanager/fusemanager.go

+255
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fusemanager
18+
19+
import (
20+
"context"
21+
"flag"
22+
"fmt"
23+
golog "log"
24+
"net"
25+
"os"
26+
"os/exec"
27+
"os/signal"
28+
"path/filepath"
29+
"syscall"
30+
31+
"github.com/containerd/log"
32+
"github.com/sirupsen/logrus"
33+
"golang.org/x/sys/unix"
34+
"google.golang.org/grpc"
35+
36+
pb "github.com/containerd/stargz-snapshotter/fusemanager/api"
37+
"github.com/containerd/stargz-snapshotter/version"
38+
)
39+
40+
var (
41+
debugFlag bool
42+
versionFlag bool
43+
fuseStoreAddr string
44+
address string
45+
logLevel string
46+
logPath string
47+
action string
48+
)
49+
50+
func parseFlags() {
51+
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
52+
flag.BoolVar(&versionFlag, "v", false, "show the fusemanager version and exit")
53+
flag.StringVar(&action, "action", "", "action of fusemanager")
54+
flag.StringVar(&fuseStoreAddr, "fusestore-path", "/var/lib/containerd-stargz-grpc/fusestore.db", "address for the fusemanager's store")
55+
flag.StringVar(&address, "address", "/run/containerd-stargz-grpc/fuse-manager.sock", "address for the fusemanager's gRPC socket")
56+
flag.StringVar(&logLevel, "log-level", logrus.InfoLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
57+
flag.StringVar(&logPath, "log-path", "", "path to fusemanager's logs, no log recorded if empty")
58+
59+
flag.Parse()
60+
}
61+
62+
func Run() {
63+
if err := run(); err != nil {
64+
fmt.Fprintf(os.Stderr, "failed to run fusemanager: %v", err)
65+
os.Exit(1)
66+
}
67+
}
68+
69+
func run() error {
70+
parseFlags()
71+
if versionFlag {
72+
fmt.Printf("%s:\n", os.Args[0])
73+
fmt.Println(" Version: ", version.Version)
74+
fmt.Println(" Revision:", version.Revision)
75+
fmt.Println("")
76+
return nil
77+
}
78+
79+
if fuseStoreAddr == "" || address == "" {
80+
return fmt.Errorf("fusemanager fusestore and socket path cannot be empty")
81+
}
82+
83+
ctx := log.WithLogger(context.Background(), log.L)
84+
85+
switch action {
86+
case "start":
87+
return startNew(ctx, logPath, address, fuseStoreAddr, logLevel)
88+
default:
89+
return runFuseManager(ctx)
90+
}
91+
}
92+
93+
func startNew(ctx context.Context, logPath, address, fusestore, logLevel string) error {
94+
self, err := os.Executable()
95+
if err != nil {
96+
return err
97+
}
98+
99+
cwd, err := os.Getwd()
100+
if err != nil {
101+
return err
102+
}
103+
104+
args := []string{
105+
"-address", address,
106+
"-fusestore-path", fusestore,
107+
"-log-level", logLevel,
108+
}
109+
110+
// we use shim-like approach to start new fusemanager process by self-invoking in the background
111+
// and detach it from parent
112+
cmd := exec.CommandContext(ctx, self, args...)
113+
cmd.Dir = cwd
114+
cmd.SysProcAttr = &syscall.SysProcAttr{
115+
Setpgid: true,
116+
}
117+
118+
if logPath != "" {
119+
err := os.Remove(logPath)
120+
if err != nil && !os.IsNotExist(err) {
121+
return err
122+
}
123+
file, err := os.Create(logPath)
124+
if err != nil {
125+
return err
126+
}
127+
cmd.Stdout = file
128+
cmd.Stderr = file
129+
}
130+
131+
if err := cmd.Start(); err != nil {
132+
return err
133+
}
134+
go cmd.Wait()
135+
136+
if ready, err := waitUntilReady(ctx); err != nil || !ready {
137+
if err != nil {
138+
return fmt.Errorf("failed to start new fusemanager: %w", err)
139+
}
140+
if !ready {
141+
return fmt.Errorf("failed to start new fusemanager, fusemanager not ready")
142+
}
143+
}
144+
145+
return nil
146+
}
147+
148+
// waitUntilReady waits until fusemanager is ready to accept requests
149+
func waitUntilReady(ctx context.Context) (bool, error) {
150+
grpcCli, err := newClient(address)
151+
if err != nil {
152+
return false, err
153+
}
154+
155+
resp, err := grpcCli.Status(ctx, &pb.StatusRequest{})
156+
if err != nil {
157+
log.G(ctx).WithError(err).Errorf("failed to call Status")
158+
return false, err
159+
}
160+
161+
if resp.Status == FuseManagerNotReady {
162+
return false, nil
163+
}
164+
165+
return true, nil
166+
}
167+
168+
func runFuseManager(ctx context.Context) error {
169+
lvl, err := logrus.ParseLevel(logLevel)
170+
if err != nil {
171+
return fmt.Errorf("failed to prepare logger: %w", err)
172+
}
173+
174+
logrus.SetLevel(lvl)
175+
logrus.SetFormatter(&logrus.JSONFormatter{
176+
TimestampFormat: log.RFC3339NanoFixed,
177+
})
178+
179+
golog.SetOutput(log.G(ctx).WriterLevel(logrus.DebugLevel))
180+
181+
// Prepare the directory for the socket
182+
if err := os.MkdirAll(filepath.Dir(address), 0700); err != nil {
183+
return fmt.Errorf("failed to create directory %s: %w", filepath.Dir(address), err)
184+
}
185+
186+
// Try to remove the socket file to avoid EADDRINUSE
187+
if err := os.Remove(address); err != nil && !os.IsNotExist(err) {
188+
return fmt.Errorf("failed to remove old socket file: %w", err)
189+
}
190+
191+
l, err := net.Listen("unix", address)
192+
if err != nil {
193+
return fmt.Errorf("failed to listen socket: %w", err)
194+
}
195+
196+
server := grpc.NewServer()
197+
fm, err := NewFuseManager(ctx, l, server, fuseStoreAddr)
198+
if err != nil {
199+
return fmt.Errorf("failed to configure manager server: %w", err)
200+
}
201+
202+
pb.RegisterStargzFuseManagerServiceServer(server, fm)
203+
204+
sigCh := make(chan os.Signal, 1)
205+
signal.Notify(sigCh, unix.SIGINT, unix.SIGTERM)
206+
go func() {
207+
sig := <-sigCh
208+
log.G(ctx).Infof("Got %v", sig)
209+
fm.server.Stop()
210+
os.Remove(address)
211+
}()
212+
213+
if err = server.Serve(l); err != nil {
214+
return fmt.Errorf("failed to serve fuse manager: %w", err)
215+
}
216+
217+
if err = fm.Close(ctx); err != nil {
218+
return fmt.Errorf("failed to close fuse manager: %w", err)
219+
}
220+
221+
return nil
222+
}
223+
224+
func StartFuseManager(ctx context.Context, executable, address, fusestore, logLevel, logPath string) error {
225+
// if socket exists, do not start it
226+
if _, err := os.Stat(address); err == nil {
227+
return nil
228+
} else if !os.IsNotExist(err) {
229+
return err
230+
}
231+
232+
if _, err := os.Stat(executable); err != nil {
233+
log.G(ctx).WithError(err).Errorf("failed to stat fusemanager binary: %s", executable)
234+
return err
235+
}
236+
237+
args := []string{
238+
"-action", "start",
239+
"-address", address,
240+
"-fusestore-path", fusestore,
241+
"-log-level", logLevel,
242+
"-log-path", logPath,
243+
}
244+
245+
cmd := exec.Command(executable, args...)
246+
if err := cmd.Start(); err != nil {
247+
return err
248+
}
249+
250+
if err := cmd.Wait(); err != nil {
251+
return err
252+
}
253+
254+
return nil
255+
}

‎fusemanager/fusestore.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fusemanager
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
23+
bolt "go.etcd.io/bbolt"
24+
25+
"github.com/containerd/stargz-snapshotter/service"
26+
)
27+
28+
var (
29+
fuseInfoBucket = []byte("fuse-info-bucket")
30+
)
31+
32+
type fuseInfo struct {
33+
Root string
34+
Mountpoint string
35+
Labels map[string]string
36+
Config service.Config
37+
}
38+
39+
func (fm *Server) storeFuseInfo(fuseInfo *fuseInfo) error {
40+
return fm.ms.Update(func(tx *bolt.Tx) error {
41+
bucket, err := tx.CreateBucketIfNotExists(fuseInfoBucket)
42+
if err != nil {
43+
return err
44+
}
45+
46+
key := []byte(fuseInfo.Mountpoint)
47+
48+
val, err := json.Marshal(fuseInfo)
49+
if err != nil {
50+
return err
51+
}
52+
53+
err = bucket.Put(key, val)
54+
if err != nil {
55+
return err
56+
}
57+
58+
return nil
59+
})
60+
}
61+
62+
func (fm *Server) removeFuseInfo(fuseInfo *fuseInfo) error {
63+
return fm.ms.Update(func(tx *bolt.Tx) error {
64+
bucket, err := tx.CreateBucketIfNotExists(fuseInfoBucket)
65+
if err != nil {
66+
return err
67+
}
68+
69+
key := []byte(fuseInfo.Mountpoint)
70+
71+
err = bucket.Delete(key)
72+
if err != nil {
73+
return err
74+
}
75+
76+
return nil
77+
})
78+
}
79+
80+
// restoreFuseInfo restores fuseInfo when Init is called, it will skip mounted
81+
// layers whose mountpoint can be found in fsMap
82+
func (fm *Server) restoreFuseInfo(ctx context.Context) error {
83+
return fm.ms.View(func(tx *bolt.Tx) error {
84+
bucket := tx.Bucket(fuseInfoBucket)
85+
if bucket == nil {
86+
return nil
87+
}
88+
89+
return bucket.ForEach(func(_, v []byte) error {
90+
mi := &fuseInfo{}
91+
err := json.Unmarshal(v, mi)
92+
if err != nil {
93+
return err
94+
}
95+
96+
return fm.mount(ctx, mi.Mountpoint, mi.Labels)
97+
})
98+
})
99+
}

‎fusemanager/service.go

+308
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fusemanager
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
"net"
24+
"os"
25+
"path/filepath"
26+
"sync"
27+
"time"
28+
29+
"github.com/containerd/log"
30+
"github.com/moby/sys/mountinfo"
31+
bolt "go.etcd.io/bbolt"
32+
"google.golang.org/grpc"
33+
34+
pb "github.com/containerd/stargz-snapshotter/fusemanager/api"
35+
"github.com/containerd/stargz-snapshotter/service"
36+
"github.com/containerd/stargz-snapshotter/snapshot"
37+
)
38+
39+
const (
40+
FuseManagerNotReady = iota
41+
FuseManagerWaitInit
42+
FuseManagerReady
43+
)
44+
45+
type Config struct {
46+
Config *service.Config
47+
IPFS bool `toml:"ipfs"`
48+
MetadataStore string `toml:"metadata_store" default:"memory"`
49+
DefaultImageServiceAddress string `json:"default_image_service_address"`
50+
}
51+
52+
type ConfigContext struct {
53+
Ctx context.Context
54+
Config *Config
55+
RootDir string
56+
Server *grpc.Server
57+
}
58+
59+
var (
60+
configFuncs []ConfigFunc
61+
configMu sync.Mutex
62+
)
63+
64+
type ConfigFunc func(cc *ConfigContext) ([]service.Option, error)
65+
66+
func RegisterConfigFunc(f ConfigFunc) {
67+
configMu.Lock()
68+
defer configMu.Unlock()
69+
configFuncs = append(configFuncs, f)
70+
}
71+
72+
type Server struct {
73+
pb.UnimplementedStargzFuseManagerServiceServer
74+
75+
lock sync.RWMutex
76+
status int32
77+
78+
listener net.Listener
79+
server *grpc.Server
80+
81+
// root is the latest root passed from containerd-stargz-grpc
82+
root string
83+
// config is the latest config passed from containerd-stargz-grpc
84+
config *Config
85+
// fsMap maps mountpoint to its filesystem instance to ensure Mount/Check/Unmount
86+
// call the proper filesystem
87+
fsMap sync.Map
88+
// curFs is filesystem created by latest config
89+
curFs snapshot.FileSystem
90+
ms *bolt.DB
91+
92+
fuseStoreAddr string
93+
}
94+
95+
func NewFuseManager(ctx context.Context, listener net.Listener, server *grpc.Server, fuseStoreAddr string) (*Server, error) {
96+
if err := os.MkdirAll(filepath.Dir(fuseStoreAddr), 0700); err != nil {
97+
return nil, fmt.Errorf("failed to create directory %q: %w", filepath.Dir(fuseStoreAddr), err)
98+
}
99+
100+
db, err := bolt.Open(fuseStoreAddr, 0666, &bolt.Options{Timeout: 10 * time.Second, ReadOnly: false})
101+
if err != nil {
102+
return nil, fmt.Errorf("failed to configure fusestore: %w", err)
103+
}
104+
105+
fm := &Server{
106+
status: FuseManagerWaitInit,
107+
lock: sync.RWMutex{},
108+
fsMap: sync.Map{},
109+
ms: db,
110+
listener: listener,
111+
server: server,
112+
fuseStoreAddr: fuseStoreAddr,
113+
}
114+
115+
return fm, nil
116+
}
117+
118+
func (fm *Server) Status(ctx context.Context, _ *pb.StatusRequest) (*pb.StatusResponse, error) {
119+
fm.lock.RLock()
120+
defer fm.lock.RUnlock()
121+
122+
return &pb.StatusResponse{
123+
Status: fm.status,
124+
}, nil
125+
}
126+
127+
func (fm *Server) Init(ctx context.Context, req *pb.InitRequest) (*pb.Response, error) {
128+
fm.lock.Lock()
129+
fm.status = FuseManagerWaitInit
130+
defer func() {
131+
fm.status = FuseManagerReady
132+
fm.lock.Unlock()
133+
}()
134+
135+
ctx = log.WithLogger(ctx, log.G(ctx))
136+
137+
config := &Config{}
138+
err := json.Unmarshal(req.Config, config)
139+
if err != nil {
140+
log.G(ctx).WithError(err).Errorf("failed to get config")
141+
return &pb.Response{}, err
142+
}
143+
fm.root = req.Root
144+
fm.config = config
145+
146+
cc := &ConfigContext{
147+
Ctx: ctx,
148+
Config: fm.config,
149+
RootDir: fm.root,
150+
Server: fm.server,
151+
}
152+
153+
var opts []service.Option
154+
for _, configFunc := range configFuncs {
155+
funcOpts, err := configFunc(cc)
156+
if err != nil {
157+
log.G(ctx).WithError(err).Errorf("failed to apply config function")
158+
return &pb.Response{}, err
159+
}
160+
opts = append(opts, funcOpts...)
161+
}
162+
163+
fs, err := service.NewFileSystem(ctx, fm.root, fm.config.Config, opts...)
164+
if err != nil {
165+
return &pb.Response{}, err
166+
}
167+
fm.curFs = fs
168+
169+
err = fm.restoreFuseInfo(ctx)
170+
if err != nil {
171+
log.G(ctx).WithError(err).Errorf("failed to restore fuse info")
172+
return &pb.Response{}, err
173+
}
174+
175+
return &pb.Response{}, nil
176+
}
177+
178+
func (fm *Server) Mount(ctx context.Context, req *pb.MountRequest) (*pb.Response, error) {
179+
fm.lock.RLock()
180+
defer fm.lock.RUnlock()
181+
if fm.status != FuseManagerReady {
182+
return &pb.Response{}, fmt.Errorf("fuse manager not ready")
183+
}
184+
185+
ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint))
186+
187+
err := fm.mount(ctx, req.Mountpoint, req.Labels)
188+
if err != nil {
189+
log.G(ctx).WithError(err).Errorf("failed to mount stargz")
190+
return &pb.Response{}, err
191+
}
192+
193+
fm.storeFuseInfo(&fuseInfo{
194+
Root: fm.root,
195+
Mountpoint: req.Mountpoint,
196+
Labels: req.Labels,
197+
Config: *fm.config.Config,
198+
})
199+
200+
return &pb.Response{}, nil
201+
}
202+
203+
func (fm *Server) Check(ctx context.Context, req *pb.CheckRequest) (*pb.Response, error) {
204+
fm.lock.RLock()
205+
defer fm.lock.RUnlock()
206+
if fm.status != FuseManagerReady {
207+
return &pb.Response{}, fmt.Errorf("fuse manager not ready")
208+
}
209+
210+
ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint))
211+
212+
obj, found := fm.fsMap.Load(req.Mountpoint)
213+
if !found {
214+
err := fmt.Errorf("failed to find filesystem of mountpoint %s", req.Mountpoint)
215+
log.G(ctx).WithError(err).Errorf("failed to check filesystem")
216+
return &pb.Response{}, err
217+
}
218+
219+
fs := obj.(snapshot.FileSystem)
220+
err := fs.Check(ctx, req.Mountpoint, req.Labels)
221+
if err != nil {
222+
log.G(ctx).WithError(err).Errorf("failed to check filesystem")
223+
return &pb.Response{}, err
224+
}
225+
226+
return &pb.Response{}, nil
227+
}
228+
229+
func (fm *Server) Unmount(ctx context.Context, req *pb.UnmountRequest) (*pb.Response, error) {
230+
fm.lock.RLock()
231+
defer fm.lock.RUnlock()
232+
if fm.status != FuseManagerReady {
233+
return &pb.Response{}, fmt.Errorf("fuse manager not ready")
234+
}
235+
236+
ctx = log.WithLogger(ctx, log.G(ctx).WithField("mountpoint", req.Mountpoint))
237+
238+
obj, found := fm.fsMap.Load(req.Mountpoint)
239+
if !found {
240+
// check whether already unmounted
241+
mounts, err := mountinfo.GetMounts(func(info *mountinfo.Info) (skip, stop bool) {
242+
if info.Mountpoint == req.Mountpoint {
243+
return false, true
244+
}
245+
return true, false
246+
})
247+
if err != nil {
248+
log.G(ctx).WithError(err).Errorf("failed to get mount info")
249+
return &pb.Response{}, err
250+
}
251+
252+
if len(mounts) <= 0 {
253+
return &pb.Response{}, nil
254+
}
255+
err = fmt.Errorf("failed to find filesystem of mountpoint %s", req.Mountpoint)
256+
log.G(ctx).WithError(err).Errorf("failed to unmount filesystem")
257+
return &pb.Response{}, err
258+
}
259+
260+
fs := obj.(snapshot.FileSystem)
261+
err := fs.Unmount(ctx, req.Mountpoint)
262+
if err != nil {
263+
log.G(ctx).WithError(err).Errorf("failed to unmount filesystem")
264+
return &pb.Response{}, err
265+
}
266+
267+
fm.fsMap.Delete(req.Mountpoint)
268+
fm.removeFuseInfo(&fuseInfo{
269+
Mountpoint: req.Mountpoint,
270+
})
271+
272+
return &pb.Response{}, nil
273+
}
274+
275+
func (fm *Server) Close(ctx context.Context) error {
276+
fm.lock.Lock()
277+
defer fm.lock.Unlock()
278+
fm.status = FuseManagerNotReady
279+
280+
err := fm.ms.Close()
281+
if err != nil {
282+
log.G(ctx).WithError(err).Errorf("failed to close fusestore")
283+
return err
284+
}
285+
286+
if err := os.Remove(fm.fuseStoreAddr); err != nil {
287+
log.G(ctx).WithError(err).Errorf("failed to remove fusestore file %s", fm.fuseStoreAddr)
288+
return err
289+
}
290+
291+
return nil
292+
}
293+
294+
func (fm *Server) mount(ctx context.Context, mountpoint string, labels map[string]string) error {
295+
// mountpoint in fsMap means layer is already mounted, skip it
296+
if _, found := fm.fsMap.Load(mountpoint); found {
297+
return nil
298+
}
299+
300+
err := fm.curFs.Mount(ctx, mountpoint, labels)
301+
if err != nil {
302+
log.G(ctx).WithError(err).Errorf("failed to mount stargz")
303+
return err
304+
}
305+
306+
fm.fsMap.Store(mountpoint, fm.curFs)
307+
return nil
308+
}

‎go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ require (
2626
github.com/opencontainers/runtime-spec v1.2.0
2727
github.com/prometheus/client_golang v1.20.5
2828
github.com/rs/xid v1.6.0
29+
github.com/sirupsen/logrus v1.9.3
30+
go.etcd.io/bbolt v1.3.11
2931
golang.org/x/sync v0.9.0
3032
golang.org/x/sys v0.26.0
3133
google.golang.org/grpc v1.68.0
@@ -90,15 +92,13 @@ require (
9092
github.com/prometheus/common v0.55.0 // indirect
9193
github.com/prometheus/procfs v0.15.1 // indirect
9294
github.com/russross/blackfriday/v2 v2.1.0 // indirect
93-
github.com/sirupsen/logrus v1.9.3 // indirect
9495
github.com/spf13/pflag v1.0.5 // indirect
9596
github.com/stretchr/testify v1.9.0 // indirect
9697
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect
9798
github.com/urfave/cli/v2 v2.27.5 // indirect
9899
github.com/vbatts/tar-split v0.11.6 // indirect
99100
github.com/x448/float16 v0.8.4 // indirect
100101
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
101-
go.etcd.io/bbolt v1.3.11 // indirect
102102
go.opencensus.io v0.24.0 // indirect
103103
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
104104
go.opentelemetry.io/otel v1.31.0 // indirect
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package keychainconfig
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"github.com/containerd/containerd/v2/defaults"
24+
"github.com/containerd/containerd/v2/pkg/dialer"
25+
"github.com/containerd/stargz-snapshotter/service/keychain/cri"
26+
"github.com/containerd/stargz-snapshotter/service/keychain/dockerconfig"
27+
"github.com/containerd/stargz-snapshotter/service/keychain/kubeconfig"
28+
"github.com/containerd/stargz-snapshotter/service/resolver"
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/backoff"
31+
"google.golang.org/grpc/credentials/insecure"
32+
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
33+
)
34+
35+
type Config struct {
36+
EnableKubeKeychain bool
37+
EnableCRIKeychain bool
38+
KubeconfigPath string
39+
DefaultImageServiceAddress string
40+
ImageServicePath string
41+
}
42+
43+
func ConfigKeychain(ctx context.Context, rpc *grpc.Server, config *Config) ([]resolver.Credential, error) {
44+
credsFuncs := []resolver.Credential{dockerconfig.NewDockerconfigKeychain(ctx)}
45+
if config.EnableKubeKeychain {
46+
var opts []kubeconfig.Option
47+
if kcp := config.KubeconfigPath; kcp != "" {
48+
opts = append(opts, kubeconfig.WithKubeconfigPath(kcp))
49+
}
50+
credsFuncs = append(credsFuncs, kubeconfig.NewKubeconfigKeychain(ctx, opts...))
51+
}
52+
if config.EnableCRIKeychain {
53+
// connects to the backend CRI service (defaults to containerd socket)
54+
criAddr := config.DefaultImageServiceAddress
55+
if cp := config.ImageServicePath; cp != "" {
56+
criAddr = cp
57+
}
58+
connectCRI := func() (runtime.ImageServiceClient, error) {
59+
conn, err := newCRIConn(criAddr)
60+
if err != nil {
61+
return nil, err
62+
}
63+
return runtime.NewImageServiceClient(conn), nil
64+
}
65+
f, criServer := cri.NewCRIKeychain(ctx, connectCRI)
66+
runtime.RegisterImageServiceServer(rpc, criServer)
67+
credsFuncs = append(credsFuncs, f)
68+
}
69+
70+
return credsFuncs, nil
71+
}
72+
73+
func newCRIConn(criAddr string) (*grpc.ClientConn, error) {
74+
// TODO: make gRPC options configurable from config.toml
75+
backoffConfig := backoff.DefaultConfig
76+
backoffConfig.MaxDelay = 3 * time.Second
77+
connParams := grpc.ConnectParams{
78+
Backoff: backoffConfig,
79+
}
80+
gopts := []grpc.DialOption{
81+
grpc.WithTransportCredentials(insecure.NewCredentials()),
82+
grpc.WithConnectParams(connParams),
83+
grpc.WithContextDialer(dialer.ContextDialer),
84+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
85+
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
86+
}
87+
return grpc.NewClient(dialer.DialAddress(criAddr), gopts...)
88+
}

‎service/service.go

+25-14
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package service
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"path/filepath"
2223

2324
"github.com/containerd/containerd/v2/core/snapshots"
@@ -30,6 +31,7 @@ import (
3031
"github.com/containerd/stargz-snapshotter/metadata"
3132
esgzexternaltoc "github.com/containerd/stargz-snapshotter/nativeconverter/estargz/externaltoc"
3233
"github.com/containerd/stargz-snapshotter/service/resolver"
34+
"github.com/containerd/stargz-snapshotter/snapshot"
3335
snbase "github.com/containerd/stargz-snapshotter/snapshot"
3436
"github.com/hashicorp/go-multierror"
3537
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -66,6 +68,27 @@ func WithFilesystemOptions(opts ...stargzfs.Option) Option {
6668

6769
// NewStargzSnapshotterService returns stargz snapshotter.
6870
func NewStargzSnapshotterService(ctx context.Context, root string, config *Config, opts ...Option) (snapshots.Snapshotter, error) {
71+
fs, err := NewFileSystem(ctx, root, config, opts...)
72+
if err != nil {
73+
return nil, fmt.Errorf("failed to configure filesystem: %w", err)
74+
}
75+
76+
var snapshotter snapshots.Snapshotter
77+
78+
snOpts := []snbase.Opt{snbase.AsynchronousRemove}
79+
if config.SnapshotterConfig.AllowInvalidMountsOnRestart {
80+
snOpts = append(snOpts, snbase.AllowInvalidMountsOnRestart)
81+
}
82+
83+
snapshotter, err = snbase.NewSnapshotter(ctx, snapshotterRoot(root), fs, snOpts...)
84+
if err != nil {
85+
return nil, fmt.Errorf("failed to create new snapshotter: %w", err)
86+
}
87+
88+
return snapshotter, nil
89+
}
90+
91+
func NewFileSystem(ctx context.Context, root string, config *Config, opts ...Option) (snapshot.FileSystem, error) {
6992
var sOpts options
7093
for _, o := range opts {
7194
o(&sOpts)
@@ -97,22 +120,10 @@ func NewStargzSnapshotterService(ctx context.Context, root string, config *Confi
97120
)
98121
fs, err := stargzfs.NewFilesystem(fsRoot(root), config.Config, fsOpts...)
99122
if err != nil {
100-
log.G(ctx).WithError(err).Fatalf("failed to configure filesystem")
101-
}
102-
103-
var snapshotter snapshots.Snapshotter
104-
105-
snOpts := []snbase.Opt{snbase.AsynchronousRemove}
106-
if config.SnapshotterConfig.AllowInvalidMountsOnRestart {
107-
snOpts = append(snOpts, snbase.AllowInvalidMountsOnRestart)
108-
}
109-
110-
snapshotter, err = snbase.NewSnapshotter(ctx, snapshotterRoot(root), fs, snOpts...)
111-
if err != nil {
112-
log.G(ctx).WithError(err).Fatalf("failed to create new snapshotter")
123+
return nil, err
113124
}
114125

115-
return snapshotter, err
126+
return fs, nil
116127
}
117128

118129
func snapshotterRoot(root string) string {

0 commit comments

Comments
 (0)
Please sign in to comment.