diff --git a/client/director_test.go b/client/director_test.go index 6d8ebabe6..ec30aa62e 100644 --- a/client/director_test.go +++ b/client/director_test.go @@ -227,6 +227,58 @@ func TestQueryDirector(t *testing.T) { } } +// TestQueryDirectorCacheMode verifies that the cacheMode flag controls which +// director endpoint a request is routed through. An embedded cache fetching +// from origins uses cacheMode=true and must hit /api/v1.0/director/origin/..., +// while a site-local cache (which appears to the federation as a client and +// fetches from other caches) uses cacheMode=false and must hit the director's +// default shortcut endpoint at the bare object path. +func TestQueryDirectorCacheMode(t *testing.T) { + t.Cleanup(test_utils.SetupTestLogging(t)) + server_utils.ResetTestState() + defer server_utils.ResetTestState() + require.NoError(t, param.Client_DirectorRetries.Set(1)) + + testCases := []struct { + name string + cacheMode bool + expectedPath string + }{ + { + name: "embedded cache mode routes to origin endpoint", + cacheMode: true, + expectedPath: "/api/v1.0/director/origin/foo/bar", + }, + { + name: "client (site-local) mode routes to shortcut endpoint", + cacheMode: false, + expectedPath: "/foo/bar", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var requestedPath string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestedPath = r.URL.Path + w.WriteHeader(http.StatusTemporaryRedirect) + })) + defer server.Close() + + pUrl := pelican_url.PelicanURL{ + FedInfo: pelican_url.FederationDiscovery{ + DirectorEndpoint: server.URL, + }, + Path: "/foo/bar", + } + + _, _, err := queryDirector(context.Background(), "GET", &pUrl, "", tc.cacheMode) + require.NoError(t, err) + assert.Equal(t, tc.expectedPath, requestedPath) + }) + } +} + func TestGetDirectorInfoForPath(t *testing.T) { t.Cleanup(test_utils.SetupTestLogging(t)) server_utils.ResetTestState() diff --git a/client/handle_http.go b/client/handle_http.go index 9f4db00a2..b634bc410 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -1132,19 +1132,21 @@ func WithMetadataChannel(ch chan<- TransferMetadata) TransferOption { return option.New(identTransferOptionMetadataChannel{}, ch) } -// WithCacheEmbeddedClientMode sets the client into "cache-embedded" -// mode. In this mode, the client queries the director's origin -// endpoint (/api/v1.0/director/origin/…) instead of the default -// shortcut endpoint. This causes the director to redirect to origins -// rather than to caches, which is the correct behaviour when the -// transfer client is itself embedded inside a cache process. +// WithCacheEmbeddedClientMode controls whether the client runs in +// "cache-embedded" mode. When enabled, the client queries the +// director's origin endpoint (/api/v1.0/director/origin/…) instead of +// the default shortcut endpoint. This causes the director to redirect +// to origins rather than to caches, which is the correct behaviour when +// the transfer client is itself embedded inside a cache process. // -// Without this option, a GET for /test/file.txt is routed through the -// director's shortcut middleware, which redirects to a cache. With -// this option, the same GET is explicitly routed to the origin -// endpoint so the cache can fetch from the origin. -func WithCacheEmbeddedClientMode() TransferOption { - return option.New(identTransferOptionCacheEmbeddedClientMode{}, true) +// With it enabled, a GET for /test/file.txt is explicitly routed to the +// origin endpoint so the cache can fetch from the origin. When disabled +// (enabled=false), the same GET is routed through the director's shortcut +// middleware, which redirects to a cache — the correct behaviour for a +// site-local cache, which appears to the federation as a client and +// fetches from other caches rather than directly from origins. +func WithCacheEmbeddedClientMode(enabled bool) TransferOption { + return option.New(identTransferOptionCacheEmbeddedClientMode{}, enabled) } // WithRequestId sets a caller-supplied request ID that is propagated as diff --git a/cmd/cache_chaos.go b/cmd/cache_chaos.go new file mode 100644 index 000000000..6528ad6fd --- /dev/null +++ b/cmd/cache_chaos.go @@ -0,0 +1,235 @@ +//go:build server + +/*************************************************************** + * + * Copyright (C) 2026, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package main + +import ( + "encoding/json" + "fmt" + "net/url" + "strconv" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + + "github.com/pelicanplatform/pelican/local_cache" +) + +var ( + chaosEtag string + chaosInstance string + chaosJSON bool + chaosBlock uint32 + chaosBytes int + chaosChunk int + chaosDropBytes int64 + + cacheChaosCmd = &cobra.Command{ + Use: "chaos", + Short: "Inject corruption into cached objects (fault-injection testing)", + Long: `Deliberately corrupt cached object data to exercise the cache's +integrity-detection paths (the read-time AES-GCM check and the periodic +data-integrity scan). + +This is a destructive testing/"chaos engineering" tool. It operates against a +running cache server through its admin API, so the corruption is applied +in-process (BadgerDB is single-process, so a CLI cannot touch the cache's +database directly while the server holds it open). + +The endpoint is only available when the cache server is started with +Cache.EnableChaosAPI set to true.`, + SilenceUsage: true, + } + + cacheChaosCorruptCmd = &cobra.Command{ + Use: "corrupt ", + Short: "Flip bytes in a cached object's block", + Long: `Flip bytes in the on-disk (encrypted) representation of a block so that +its authentication tag no longer validates. The cache detects this on the next +cold read of the block or during the periodic data scan, which invalidates and +re-fetches the object. + +By default the latest cached version is targeted; use --etag or --instance to +select a specific version. + +Examples: + pelican cache chaos corrupt pelican://my-federation/data/file.dat + pelican cache chaos corrupt /data/file.dat --block 3 --bytes 32 + pelican cache chaos corrupt --instance --block 0`, + Args: cobra.MaximumNArgs(1), + RunE: runCacheChaosCorrupt, + SilenceUsage: true, + } + + cacheChaosTruncateCmd = &cobra.Command{ + Use: "truncate ", + Short: "Truncate a cached object's on-disk data", + Long: `Remove bytes from the end of one of a cached object's on-disk chunk files, +dropping trailing block(s). The cache detects the missing/short data on a cold +read or during the data scan. + +By default the last chunk is truncated by one block; use --chunk and +--drop-bytes to control which chunk and how much is removed. + +Examples: + pelican cache chaos truncate pelican://my-federation/data/file.dat + pelican cache chaos truncate /data/file.dat --drop-bytes 65536 + pelican cache chaos truncate --instance --chunk 0`, + Args: cobra.MaximumNArgs(1), + RunE: runCacheChaosTruncate, + SilenceUsage: true, + } +) + +func init() { + cacheCmd.AddCommand(cacheChaosCmd) + cacheChaosCmd.AddCommand(cacheChaosCorruptCmd) + cacheChaosCmd.AddCommand(cacheChaosTruncateCmd) + + cacheChaosCmd.PersistentFlags().StringVar(&chaosEtag, "etag", "", "Select the object version by ETag (default: latest)") + cacheChaosCmd.PersistentFlags().StringVar(&chaosInstance, "instance", "", "Select the object version by instance hash") + cacheChaosCmd.PersistentFlags().BoolVar(&chaosJSON, "json", false, "Output in JSON format") + cacheChaosCmd.PersistentFlags().StringVarP(&introspectToken, "token", "t", "", "Path to admin token file (auto-generated if not provided)") + + cacheChaosCorruptCmd.Flags().Uint32Var(&chaosBlock, "block", 0, "Zero-based block number to corrupt") + cacheChaosCorruptCmd.Flags().IntVar(&chaosBytes, "bytes", 0, "Number of bytes to flip (default: the authentication-tag size)") + + cacheChaosTruncateCmd.Flags().IntVar(&chaosChunk, "chunk", -1, "Chunk index to truncate (default: the last chunk)") + cacheChaosTruncateCmd.Flags().Int64Var(&chaosDropBytes, "drop-bytes", 0, "Bytes to remove from the end of the chunk file (default: one block)") +} + +// chaosServerURL returns the running cache server's URL, or an error if the +// cache is not running. The chaos API operates exclusively against a live +// cache server (there is no offline mode: BadgerDB is single-process). +func chaosServerURL() (string, error) { + if err := initIntrospectConfig(); err != nil { + return "", errors.Wrap(err, "failed to initialize cache server config") + } + serverURL := discoverServerURL() + if serverURL == "" { + return "", errors.New("could not find a running cache server; the chaos API operates against a running cache (ensure the cache is up and started with Cache.EnableChaosAPI=true)") + } + return serverURL, nil +} + +// chaosObjectQuery builds the object-selection query parameters shared by the +// corrupt and truncate subcommands. +func chaosObjectQuery(objectURL string) (url.Values, error) { + q := url.Values{} + if chaosInstance != "" { + q.Set("instance", chaosInstance) + } else if objectURL != "" { + q.Set("url", objectURL) + if chaosEtag != "" { + q.Set("etag", chaosEtag) + } + } else { + return nil, errors.New("either or --instance is required") + } + return q, nil +} + +func postChaos(query url.Values) (*local_cache.ChaosResult, error) { + serverURL, err := chaosServerURL() + if err != nil { + return nil, err + } + body, err := introspectHTTPPost(serverURL, "/api/v1.0/cache/introspect/chaos", query) + if err != nil { + return nil, err + } + var result local_cache.ChaosResult + if err := json.Unmarshal(body, &result); err != nil { + return nil, errors.Wrap(err, "failed to parse response") + } + return &result, nil +} + +func printChaosResult(result *local_cache.ChaosResult) error { + if chaosJSON { + data, err := json.MarshalIndent(result, "", " ") + if err != nil { + return errors.Wrap(err, "failed to marshal result") + } + fmt.Println(string(data)) + return nil + } + + fmt.Printf("Injected %s into cached object:\n", result.Operation) + fmt.Printf(" Instance: %s\n", result.InstanceHash) + if result.SourceURL != "" { + fmt.Printf(" Source URL: %s\n", result.SourceURL) + } + if result.ETag != "" { + fmt.Printf(" ETag: %s\n", result.ETag) + } + fmt.Printf(" Chunk file: %s (chunk %d)\n", result.FilePath, result.ChunkIndex) + switch result.Operation { + case "corrupt-block": + fmt.Printf(" Block: %d (disk offset %d, flipped %d byte(s))\n", result.BlockNum, result.DiskOffset, result.BytesChanged) + case "truncate": + fmt.Printf(" Truncated: %d -> %d bytes\n", result.OldFileSize, result.NewFileSize) + } + return nil +} + +func runCacheChaosCorrupt(cmd *cobra.Command, args []string) error { + objectURL := "" + if len(args) > 0 { + objectURL = args[0] + } + query, err := chaosObjectQuery(objectURL) + if err != nil { + return err + } + query.Set("op", "corrupt") + query.Set("block", strconv.FormatUint(uint64(chaosBlock), 10)) + if chaosBytes > 0 { + query.Set("bytes", strconv.Itoa(chaosBytes)) + } + + result, err := postChaos(query) + if err != nil { + return errors.Wrap(err, "failed to corrupt block") + } + return printChaosResult(result) +} + +func runCacheChaosTruncate(cmd *cobra.Command, args []string) error { + objectURL := "" + if len(args) > 0 { + objectURL = args[0] + } + query, err := chaosObjectQuery(objectURL) + if err != nil { + return err + } + query.Set("op", "truncate") + query.Set("chunk", strconv.Itoa(chaosChunk)) + if chaosDropBytes > 0 { + query.Set("drop-bytes", strconv.FormatInt(chaosDropBytes, 10)) + } + + result, err := postChaos(query) + if err != nil { + return errors.Wrap(err, "failed to truncate object") + } + return printChaosResult(result) +} diff --git a/config/parameter_defaults.go b/config/parameter_defaults.go index e974e833f..bedc0835a 100644 --- a/config/parameter_defaults.go +++ b/config/parameter_defaults.go @@ -43,6 +43,10 @@ func SetParameterDefaults(v *viper.Viper, isRoot bool, isOSDF bool) { v.SetDefault(param.Cache_BlocksToPrefetch.GetName(), 0) // Cache.ConcurrencyDegradedThreshold v.SetDefault(param.Cache_ConcurrencyDegradedThreshold.GetName(), 90) + // Cache.DataScanMode + v.SetDefault(param.Cache_DataScanMode.GetName(), "all") + // Cache.DataScanResampleInterval + v.SetDefault(param.Cache_DataScanResampleInterval.GetName(), 100) // Cache.DefaultCacheTimeout v.SetDefault(param.Cache_DefaultCacheTimeout.GetName(), "9.5s") // Cache.DirectorTest @@ -51,6 +55,8 @@ func SetParameterDefaults(v *viper.Viper, isRoot bool, isOSDF bool) { v.SetDefault(param.Cache_DisableClientX509.GetName(), true) // Cache.EnableBroker v.SetDefault(param.Cache_EnableBroker.GetName(), true) + // Cache.EnableChaosAPI + v.SetDefault(param.Cache_EnableChaosAPI.GetName(), false) // Cache.EnableEvictionMonitoring v.SetDefault(param.Cache_EnableEvictionMonitoring.GetName(), true) // Cache.EnableLotman diff --git a/docs/parameters.yaml b/docs/parameters.yaml index b70d6e8e5..b2f81768c 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -2660,6 +2660,61 @@ type: bool default: true components: ["cache"] --- +name: Cache.EnableChaosAPI +description: |+ + Enable the V2 cache's chaos/fault-injection admin API. + + When true, the cache server registers an admin-authenticated endpoint + (POST /api/v1.0/cache/introspect/chaos) that deliberately corrupts or truncates + the on-disk data of a cached object. This is used by `pelican cache chaos` to + exercise the cache's integrity-detection paths. + + This is a destructive testing feature and is disabled by default; it should + only be enabled in test/staging environments. +type: bool +default: false +components: ["cache"] +hidden: true +--- +name: Cache.DataScanMode +description: |+ + Controls how the V2 (persistent) cache's periodic data-integrity scan re-verifies on-disk object data. + + Valid values: + - "all" (default): every scan cycle re-reads and re-checksums all complete objects. Recommended when the cache + relies on Pelican for at-rest integrity. + - "once": each object's on-disk data is read back and checksummed exactly once (recording the checksum in the cache + database, and comparing it against the origin's reported checksum when one is available). Objects that have already + been verified are skipped on subsequent scan cycles. This is intended for deployments whose underlying storage + already guarantees at-rest integrity (for example ZFS with scrubbing), where repeatedly re-reading every object is + wasteful but recording an initial baseline checksum is still desired. + + Important: "once" mode trusts the underlying storage to detect bitrot after the initial check. The cache cannot verify + that the storage actually scrubs, so it emits a loud startup warning and exports the `pelican_cache_data_scan_mode_once` + metric (set to 1) whenever this mode is active. To retain a floor of at-rest detection even in this mode, the scan still + re-verifies a small fraction of already-checked objects each cycle (see ${Cache.DataScanResampleInterval}). + + Note that "once" mode cannot detect corruption that was present at ingest (before the first scan): the first scan records + whatever is on disk as the baseline. Detecting bad-at-ingest data requires a full-file or origin checksum on the + download/completion path, which is independent of this setting. + + Unrecognized values are treated as "all". +type: string +default: all +components: ["cache"] +--- +name: Cache.DataScanResampleInterval +description: |+ + When ${Cache.DataScanMode} is "once", the data-integrity scan still re-verifies roughly 1 in N already-checked objects + on each cycle, so there is a nonzero floor of at-rest corruption detection rather than none. This is the value of N. + + The default of 100 re-checks about 1% of already-verified objects per scan cycle. Set to 0 to disable re-sampling + entirely (pure "once": each object is verified exactly once and never re-checked). Has no effect when + ${Cache.DataScanMode} is "all". +type: int +default: 100 +components: ["cache"] +--- name: Cache.EvictionMonitoringInterval description: |+ The interval at which the eviction monitoring will be reported. diff --git a/e2e_fed_tests/persistent_cache_site_local_test.go b/e2e_fed_tests/persistent_cache_site_local_test.go new file mode 100644 index 000000000..383df11f9 --- /dev/null +++ b/e2e_fed_tests/persistent_cache_site_local_test.go @@ -0,0 +1,339 @@ +//go:build !windows + +/*************************************************************** + * + * Copyright (C) 2026, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package fed_tests + +import ( + "bytes" + "context" + "fmt" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "regexp" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" + + "github.com/pelicanplatform/pelican/client" + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/fed_test_utils" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/server_utils" + "github.com/pelicanplatform/pelican/test_utils" + "github.com/pelicanplatform/pelican/token" + "github.com/pelicanplatform/pelican/token_scopes" +) + +// webEngineAddrRe matches the address the web engine binds to, e.g. +// "Starting web engine at address my-host:55309". The child is told to bind +// port 0 (any free port), and pelican rewrites Server.WebPort to the chosen +// value before logging this line — so reading it back avoids racily +// pre-selecting a port that another process could grab before the child binds. +var webEngineAddrRe = regexp.MustCompile(`web engine at address \S*?:(\d+)`) + +// upstreamCacheHasObject probes a cache via HTTP HEAD with +// Cache-Control: only-if-cached (RFC 7234 §5.2.1.7): the cache returns 200 if +// it already holds the object and 504 if it does not, without ever contacting +// the origin. This lets the test inspect cache contents through the public +// cache API rather than poking at on-disk storage. +// +// cacheDataBase is the cache's /api/v1.0/cache/data/ endpoint. +// It returns (true, nil) when the cache holds the object, (false, nil) when it +// does not, and a non-nil error on transport failures or an unexpected status. +// Returning an error (rather than failing the test) keeps it safe to call from +// inside a require.Eventually closure, which runs on a separate goroutine. +func upstreamCacheHasObject(ctx context.Context, httpClient *http.Client, cacheDataBase, objectPath, token string) (bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodHead, cacheDataBase+objectPath, nil) + if err != nil { + return false, err + } + req.Header.Set("Cache-Control", "only-if-cached") + req.Header.Set("Authorization", "Bearer "+token) + resp, err := httpClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusOK: + return true, nil + case http.StatusGatewayTimeout: + return false, nil + default: + return false, fmt.Errorf("unexpected only-if-cached status %d for %s", resp.StatusCode, objectPath) + } +} + +// TestPersistentCacheSiteLocalFetchesFromCache verifies the site-local-mode +// behaviour of the V2 (persistent) cache: a site-local cache must appear to the +// federation as a client and fetch objects from other caches rather than +// directly from origins (matching the V1 XRD_PELICANDIRECTORYQUERYMODE=cache +// behaviour). +// +// Topology: +// - An in-process federation (director + origin + an advertised V2 cache). +// This advertised cache is the "upstream" cache. +// - A separate `pelican cache serve` child process running a V2 cache with +// Cache.EnableSiteLocalMode=true. It is NOT advertised to the director. +// +// The test downloads a public object through the site-local cache and then +// asserts (via Cache-Control: only-if-cached) that the *upstream* cache also +// ended up holding the object. That can only happen if the site-local cache +// fetched the object through the upstream cache (client mode); if it had +// fetched directly from the origin (the pre-fix embedded-cache behaviour), the +// upstream cache would never have seen the object. +func TestPersistentCacheSiteLocalFetchesFromCache(t *testing.T) { + t.Cleanup(test_utils.SetupTestLogging(t)) + server_utils.ResetTestState() + defer server_utils.ResetTestState() + + // Build the pelican binary used for the site-local cache child process. + cliPath := getPelicanBinary(t) + + // Enable the persistent cache for the in-process (upstream) cache. + require.NoError(t, param.Cache_EnableV2.Set(true)) + + // Start the federation: director + origin + advertised V2 cache. + ft := fed_test_utils.NewFedTest(t, persistentCacheConfig) + require.NotNil(t, ft) + require.Greater(t, len(ft.Exports), 0, "Federation should have at least one export") + + // Capture the configuration the child process needs to inherit from the + // running federation. By reusing the federation's CA and host certificate + // (the host certificate is for the hostname and is not port-specific) the + // child cache is mutually trusted with the test process and the rest of the + // federation. + hostname := param.Server_Hostname.GetString() + discoveryUrl := param.Federation_DiscoveryUrl.GetString() + caCert := param.Server_TLSCACertificateFile.GetString() + caKey := param.Server_TLSCAKey.GetString() + tlsCert := param.Server_TLSCertificateChain.GetString() + tlsKey := param.Server_TLSKey.GetString() + issuerKeysDir := param.IssuerKeysDirectory.GetString() + + // Directories for the site-local cache child process. The child binds an + // arbitrary free port (Server.WebPort: 0); its actual URL is discovered + // from its output once it starts. + childDir := t.TempDir() + childStorage := filepath.Join(childDir, "storage") + + // Build a config file for the child cache. It inherits federation + // discovery and TLS material from the parent and overrides ports, + // storage locations and the cache flags under test. + childConfig := map[string]any{ + "Federation": map[string]any{ + "DiscoveryUrl": discoveryUrl, + }, + "Logging": map[string]any{ + "Level": "debug", + }, + "TLSSkipVerify": false, + "ConfigDir": childDir, + "RuntimeDir": childDir, + "IssuerKeysDirectory": issuerKeysDir, + "Server": map[string]any{ + "Hostname": hostname, + // Bind any free port; pelican rewrites WebPort/ExternalWebUrl to + // the chosen value, which we read back from the child's output. + "WebPort": 0, + "ExternalWebUrl": "https://" + hostname, + "EnableUI": false, + "TLSCACertificateFile": caCert, + "TLSCAKey": caKey, + "TLSCertificateChain": tlsCert, + "TLSKey": tlsKey, + }, + "Cache": map[string]any{ + "EnableV2": true, + "EnableSiteLocalMode": true, + "StorageLocation": childStorage, + "DbLocation": filepath.Join(childDir, "cache.sqlite"), + "RunLocation": filepath.Join(childDir, "run"), + "Port": 0, + "EnableLotman": false, + "EnableEvictionMonitoring": false, + "SelfTest": false, + }, + } + + childConfigBytes, err := yaml.Marshal(childConfig) + require.NoError(t, err) + childConfigPath := filepath.Join(childDir, "site-local-cache.yaml") + require.NoError(t, os.WriteFile(childConfigPath, childConfigBytes, 0644)) + + // Launch the site-local cache child process. It is tied to the federation + // context so it is torn down when the test's context is cancelled. + var childOutput bytes.Buffer + var outputMu sync.Mutex + cmd := exec.CommandContext(ft.Ctx, cliPath, "cache", "serve", "--config", childConfigPath) + cmd.Env = os.Environ() + cmd.Stdout = &lockedWriter{w: &childOutput, mu: &outputMu} + cmd.Stderr = cmd.Stdout + require.NoError(t, cmd.Start(), "failed to start site-local cache process") + t.Cleanup(func() { + if cmd.Process != nil { + _ = cmd.Process.Kill() + _, _ = cmd.Process.Wait() + } + outputMu.Lock() + defer outputMu.Unlock() + if t.Failed() { + t.Logf("site-local cache process output:\n%s", childOutput.String()) + } + }) + childOutputSnapshot := func() string { + outputMu.Lock() + defer outputMu.Unlock() + return childOutput.String() + } + + // Discover the port the child bound to by reading it back from its output, + // then build its URL. This avoids pre-selecting a port (which would race + // with other processes between selection and the child's bind). + var childCacheUrl string + require.Eventually(t, func() bool { + m := webEngineAddrRe.FindStringSubmatch(childOutputSnapshot()) + if m == nil { + return false + } + childCacheUrl = fmt.Sprintf("https://%s:%s", hostname, m[1]) + return true + }, 60*time.Second, 200*time.Millisecond, "site-local cache never reported its web port") + + // Wait for the site-local cache's object-serving handlers to be registered. + // The /api/v1.0/cache/stats endpoint is registered by RegisterCacheHandlers, + // so a 200 there means the cache (not just the bare web engine, which serves + // /api/v1.0/health much earlier) is ready to serve objects. + httpClient := &http.Client{Transport: config.GetTransport()} + readyUrl := childCacheUrl + "/api/v1.0/cache/stats" + require.Eventually(t, func() bool { + req, reqErr := http.NewRequestWithContext(ft.Ctx, http.MethodGet, readyUrl, nil) + if reqErr != nil { + return false + } + resp, doErr := httpClient.Do(req) + if doErr != nil { + return false + } + defer resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, 90*time.Second, 500*time.Millisecond, "site-local cache did not become ready") + + // Mint a read token for the object (signed by the federation's issuer key, + // which both caches validate via the issuer's published JWKS). + readToken := mintReadToken(t) + + // The upstream cache's public object endpoint. We probe it with + // only-if-cached to inspect its contents without triggering an origin fetch. + // The cache keys its federation off the director/web URL host (which, in + // this harness, differs from the separate discovery server's host), so that + // is the value used in the /cache/data/ path segment. + discoveryHost := hostnameFromDiscovery(t, discoveryUrl) + webUrl := param.Server_ExternalWebUrl.GetString() + webHost := hostnameFromDiscovery(t, webUrl) + upstreamCacheData := webUrl + "/api/v1.0/cache/data/" + url.PathEscape(webHost) + const objectPath = "/test/hello_world.txt" + + // Precondition: the upstream cache must not already hold the object. + cached, err := upstreamCacheHasObject(ft.Ctx, httpClient, upstreamCacheData, objectPath, readToken) + require.NoError(t, err) + require.False(t, cached, "upstream cache should not hold the object before the download") + + // Download the public object through the site-local cache by forcing the + // client to use it as the (only) cache. Client.PreferredCaches takes a bare + // cache host:port and the client fills in the object path, so the override + // URL is just the site-local cache's host:port. + objectURL := fmt.Sprintf("pelican://%s%s", discoveryHost, objectPath) + childCacheParsed, err := url.Parse(childCacheUrl) + require.NoError(t, err) + + downloadDir := t.TempDir() + downloadFile := filepath.Join(downloadDir, "hello_world.txt") + results, err := client.DoGet(ft.Ctx, objectURL, downloadFile, false, + client.WithCaches(childCacheParsed), client.WithToken(readToken)) + require.NoError(t, err, "download through site-local cache failed") + require.NotEmpty(t, results) + + content, err := os.ReadFile(downloadFile) + require.NoError(t, err) + assert.Equal(t, "Hello, World!", string(content), "downloaded content should match origin") + + // The decisive assertion: the upstream cache must now hold the object, + // proving the site-local cache fetched it through the upstream cache rather + // than directly from the origin (the latter being the pre-fix embedded-cache + // behaviour). Caching upstream is asynchronous, so poll via only-if-cached. + require.Eventually(t, func() bool { + has, probeErr := upstreamCacheHasObject(ft.Ctx, httpClient, upstreamCacheData, objectPath, readToken) + return probeErr == nil && has + }, 30*time.Second, 500*time.Millisecond, + "upstream cache never received the object; site-local cache appears to have fetched directly from the origin") +} + +// mintReadToken creates a short-lived WLCG read token signed by the running +// federation's issuer key. Unlike getTempTokenForTest it does NOT reset +// IssuerKeysDirectory, so the token is signed by the key the federation +// actually published (and which the child cache can therefore verify). +func mintReadToken(t testing.TB) string { + t.Helper() + issuer, err := config.GetServerIssuerURL() + require.NoError(t, err) + + tokConf := token.NewWLCGToken() + tokConf.Lifetime = 5 * time.Minute + tokConf.Issuer = issuer + tokConf.Subject = "test" + tokConf.AddAudienceAny() + readScope, err := token_scopes.Wlcg_Storage_Read.Path("/") + require.NoError(t, err) + tokConf.AddScopes(readScope) + + tkn, err := tokConf.CreateToken() + require.NoError(t, err) + return tkn +} + +// hostnameFromDiscovery extracts the host:port of the federation from its +// discovery URL so object URLs can be addressed against it. +func hostnameFromDiscovery(t testing.TB, discoveryUrl string) string { + t.Helper() + u, err := url.Parse(discoveryUrl) + require.NoError(t, err) + return u.Host +} + +// lockedWriter serialises writes to an underlying buffer so the child +// process's stdout and stderr can share one buffer safely. +type lockedWriter struct { + w *bytes.Buffer + mu *sync.Mutex +} + +func (lw *lockedWriter) Write(p []byte) (int, error) { + lw.mu.Lock() + defer lw.mu.Unlock() + return lw.w.Write(p) +} diff --git a/launchers/cache_serve.go b/launchers/cache_serve.go index 935974a04..ec9e7e34c 100644 --- a/launchers/cache_serve.go +++ b/launchers/cache_serve.go @@ -92,6 +92,16 @@ func cacheServeWithPersistentCache(ctx context.Context, engine *gin.Engine, egrp return nil, err } + // The persistent cache has no XRootD process to launch the monitoring + // shoveler (which the XRootD-based cache does via xrootd config), so start + // it here when enabled. This lets the in-process XRootD-style monitoring + // packets emitted on each served request reach the configured collectors. + if param.Shoveler_Enable.GetBool() { + if _, err := metrics.LaunchShoveler(ctx, egrp); err != nil { + return nil, errors.Wrap(err, "failed to launch monitoring shoveler for persistent cache") + } + } + cache.RegisterCacheAPI(engine, ctx, egrp) cacheServer := &cache.CacheServer{} diff --git a/local_cache/block_fetcher.go b/local_cache/block_fetcher.go index 7eee82272..07c994c4f 100644 --- a/local_cache/block_fetcher.go +++ b/local_cache/block_fetcher.go @@ -175,7 +175,7 @@ func NewBlockFetcherV2( // Create a dedicated TransferClient so this fetcher's doFetch goroutines // have their own Results() channel and cannot steal results intended for // other callers sharing the same TransferEngine. - tc, err := te.NewClient(client.WithAcquireToken(false), client.WithCacheEmbeddedClientMode()) + tc, err := te.NewClient(client.WithAcquireToken(false), client.WithCacheEmbeddedClientMode(useEmbeddedCacheMode())) if err != nil { return nil, errors.Wrap(err, "failed to create transfer client for block fetcher") } @@ -531,9 +531,11 @@ func (bf *BlockFetcherV2) doFetch(ctx context.Context, op *fetchOperation, key f return } sourceURL.Scheme = "pelican" - // The client's cache mode (set on the transfer client) causes - // queryDirector to route through the director's origin endpoint, - // so origins that disable direct clients are reachable. + // The client's cache mode (set on the transfer client, see + // useEmbeddedCacheMode) causes queryDirector to route through the + // director's origin endpoint, so origins that disable direct clients are + // reachable. In site-local mode embedded mode is disabled and the + // director redirects us to other caches instead. // Build transfer options with a byte range so we only download the // blocks we actually need instead of the entire object. diff --git a/local_cache/cache_internal_test.go b/local_cache/cache_internal_test.go index 6ee73a30d..92c4998d8 100644 --- a/local_cache/cache_internal_test.go +++ b/local_cache/cache_internal_test.go @@ -24,10 +24,31 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/server_utils" "github.com/pelicanplatform/pelican/token_scopes" ) +// TestUseEmbeddedCacheMode verifies that the embedded transfer client only runs +// in cache-embedded mode (fetching directly from origins) when the cache is a +// normal federation member. In site-local mode the cache appears to the +// federation as a client and must fetch from other caches, so embedded mode is +// disabled. +func TestUseEmbeddedCacheMode(t *testing.T) { + server_utils.ResetTestState() + defer server_utils.ResetTestState() + + // Default (federation member): embedded mode is on. + assert.True(t, useEmbeddedCacheMode()) + + // Site-local mode: embedded mode is off so the director redirects us to + // other caches rather than origins. + require.NoError(t, param.Cache_EnableSiteLocalMode.Set(true)) + assert.False(t, useEmbeddedCacheMode()) +} + func TestCalcResources(t *testing.T) { tests := []struct { scopes token_scopes.ResourceScope diff --git a/local_cache/chaos.go b/local_cache/chaos.go new file mode 100644 index 000000000..a9ac3d343 --- /dev/null +++ b/local_cache/chaos.go @@ -0,0 +1,312 @@ +//go:build !windows + +/*************************************************************** + * + * Copyright (C) 2026, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package local_cache + +import ( + "os" + "path/filepath" + "strings" + + "github.com/pkg/errors" +) + +// isHexHash reports whether h consists solely of hexadecimal characters. +// Instance/object hashes are hex-encoded HMAC-SHA256 digests, so validating +// this before using a hash to build a filesystem path prevents path traversal +// from a caller-supplied --instance value. +func isHexHash(h InstanceHash) bool { + if len(h) == 0 { + return false + } + for _, c := range h { + switch { + case c >= '0' && c <= '9', c >= 'a' && c <= 'f', c >= 'A' && c <= 'F': + default: + return false + } + } + return true +} + +// ChaosInjector injects corruption into a running cache's already-open +// database and storage, for fault-injection ("chaos") testing of the cache's +// integrity-detection paths. +// +// BadgerDB is single-process, so corruption must be performed in-process by the +// cache server itself (a CLI cannot open the database while the server holds +// it). The injector therefore wraps the live handles and does not own them — +// there is nothing to close. +type ChaosInjector struct { + db *CacheDB + storage *StorageManager +} + +// NewChaosInjector wraps a live cache's database and storage manager. +func NewChaosInjector(db *CacheDB, storage *StorageManager) *ChaosInjector { + return &ChaosInjector{db: db, storage: storage} +} + +// ChaosResult describes a corruption injected into a cached object by the +// chaos-testing helpers. It is intended for fault-injection testing of the +// cache's integrity scan and read-time corruption detection. +type ChaosResult struct { + InstanceHash string `json:"instance_hash"` + SourceURL string `json:"source_url,omitempty"` + ETag string `json:"etag,omitempty"` + Operation string `json:"operation"` // "corrupt-block" or "truncate" + ChunkIndex int `json:"chunk_index"` + FilePath string `json:"file_path"` + BlockNum int64 `json:"block_num,omitempty"` // global block number (corrupt-block) + DiskOffset int64 `json:"disk_offset"` + BytesChanged int `json:"bytes_changed,omitempty"` + OldFileSize int64 `json:"old_file_size"` + NewFileSize int64 `json:"new_file_size"` +} + +// resolveInstanceHash resolves an object instance from either an explicit +// instance hash, or an object URL plus optional ETag (defaulting to the latest +// cached version). It returns the instance hash and its metadata. +func (ci *ChaosInjector) resolveInstanceHash(objectURL, etag, instanceHash string) (InstanceHash, *CacheMetadata, error) { + var hash InstanceHash + if instanceHash != "" { + hash = InstanceHash(instanceHash) + } else { + normalized := NormalizePelicanURL(objectURL) + if normalized == "" { + return "", nil, errors.New("either an object URL or an instance hash is required") + } + objectHash := ci.db.ObjectHash(normalized) + if etag == "" { + var found bool + var err error + etag, found, err = ci.db.GetLatestETag(objectHash) + if err != nil { + return "", nil, errors.Wrap(err, "failed to get latest ETag") + } + if !found { + return "", nil, errors.New("no cached version found for this object") + } + } + hash = ci.db.InstanceHash(etag, objectHash) + } + + // Guard against path traversal from a caller-supplied instance hash before + // the hash is ever used to construct a filesystem path. + if !isHexHash(hash) { + return "", nil, errors.Errorf("invalid instance hash %q: must be hexadecimal", hash) + } + + meta, err := ci.storage.GetMetadata(hash) + if err != nil { + return "", nil, errors.Wrap(err, "failed to read object metadata") + } + if meta == nil { + return "", nil, errors.Errorf("no cached object found for instance %s", hash) + } + return hash, meta, nil +} + +// safeChunkPath resolves the on-disk chunk file path and verifies it stays +// within its storage directory, defending against path traversal. +func (ci *ChaosInjector) safeChunkPath(storageID StorageID, hash InstanceHash, chunkIndex int) (string, error) { + root, ok := ci.storage.GetDirs()[storageID] + if !ok { + return "", errors.Errorf("unknown storage id %d", storageID) + } + cleanRoot := filepath.Clean(root) + cleanPath := filepath.Clean(ci.storage.getChunkPath(storageID, hash, chunkIndex)) + if cleanPath != cleanRoot && !strings.HasPrefix(cleanPath, cleanRoot+string(os.PathSeparator)) { + return "", errors.Errorf("resolved chunk path %q escapes storage directory %q", cleanPath, cleanRoot) + } + return cleanPath, nil +} + +// chunkFileForBlock maps a global block number to the on-disk chunk file that +// stores it and the byte offset of the (encrypted) block within that file. +func (ci *ChaosInjector) chunkFileForBlock(hash InstanceHash, meta *CacheMetadata, blockNum uint32) (chunkPath string, chunkIndex int, diskOffset int64, err error) { + contentOffset := int64(blockNum) * BlockDataSize + if contentOffset >= meta.ContentLength { + return "", 0, 0, errors.Errorf("block %d is past the end of the object (%d block(s), %d bytes)", + blockNum, CalculateBlockCount(meta.ContentLength), meta.ContentLength) + } + + chunkIndex = ContentOffsetToChunk(contentOffset, meta.ChunkSizeCode) + storageID := meta.GetChunkStorageID(chunkIndex) + if storageID == StorageIDInline { + return "", 0, 0, errors.Errorf("chunk %d is not yet allocated on disk", chunkIndex) + } + chunkPath, err = ci.safeChunkPath(storageID, hash, chunkIndex) + if err != nil { + return "", 0, 0, err + } + + // The on-disk offset is the (zero-based) block index within this chunk file + // times the encrypted block size. + localBlock := uint32(OffsetInChunk(contentOffset, meta.ChunkSizeCode) / BlockDataSize) + diskOffset = BlockOffset(localBlock) + return chunkPath, chunkIndex, diskOffset, nil +} + +// CorruptBlock flips the first numBytes bytes of the on-disk (encrypted) +// representation of the given block. This makes the block's AES-GCM +// authentication tag fail to validate, which the cache detects on the next +// read of that block or during the periodic data-integrity scan. +// +// blockNum is a global, zero-based block number. numBytes <= 0 defaults to the +// authentication-tag size (the minimum needed to guarantee detection). +// +// Detection is not necessarily immediate: a block whose plaintext is still in +// the cache's in-memory caches will continue to read successfully until those +// entries are evicted; the corruption is caught on the next cold read of the +// block, the periodic data-integrity scan, or after a restart. +func (ci *ChaosInjector) CorruptBlock(objectURL, etag, instanceHash string, blockNum uint32, numBytes int) (*ChaosResult, error) { + hash, meta, err := ci.resolveInstanceHash(objectURL, etag, instanceHash) + if err != nil { + return nil, err + } + if meta.IsInline() { + return nil, errors.New("object is stored inline in the database; chaos injection only supports disk-backed objects") + } + + chunkPath, chunkIndex, diskOffset, err := ci.chunkFileForBlock(hash, meta, blockNum) + if err != nil { + return nil, err + } + + if numBytes <= 0 { + numBytes = AuthTagSize + } + if numBytes > BlockTotalSize { + numBytes = BlockTotalSize + } + + f, err := os.OpenFile(chunkPath, os.O_RDWR, 0) + if err != nil { + return nil, errors.Wrapf(err, "failed to open chunk file %s", chunkPath) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + return nil, errors.Wrap(err, "failed to stat chunk file") + } + if diskOffset >= info.Size() { + return nil, errors.Errorf("block %d is not present on disk (chunk file is %d bytes)", blockNum, info.Size()) + } + // Do not read past EOF for a short final block. + if diskOffset+int64(numBytes) > info.Size() { + numBytes = int(info.Size() - diskOffset) + } + + // numBytes is bounded by BlockTotalSize above, so a fixed-size stack buffer + // suffices and avoids a caller-influenced dynamic allocation. + var blockBuf [BlockTotalSize]byte + buf := blockBuf[:numBytes] + if _, err := f.ReadAt(buf, diskOffset); err != nil { + return nil, errors.Wrap(err, "failed to read block bytes") + } + for i := range buf { + buf[i] ^= 0xFF + } + if _, err := f.WriteAt(buf, diskOffset); err != nil { + return nil, errors.Wrap(err, "failed to write corrupted bytes") + } + + return &ChaosResult{ + InstanceHash: string(hash), + SourceURL: meta.SourceURL, + ETag: meta.ETag, + Operation: "corrupt-block", + ChunkIndex: chunkIndex, + FilePath: chunkPath, + BlockNum: int64(blockNum), + DiskOffset: diskOffset, + BytesChanged: numBytes, + OldFileSize: info.Size(), + NewFileSize: info.Size(), + }, nil +} + +// TruncateObject removes dropBytes bytes from the end of one of the object's +// on-disk chunk files (the last chunk by default, when chunkIndex < 0). This +// drops trailing block(s), which the cache detects on a cold read or during +// the data scan. dropBytes <= 0 defaults to a single encrypted block +// (BlockTotalSize). +func (ci *ChaosInjector) TruncateObject(objectURL, etag, instanceHash string, chunkIndex int, dropBytes int64) (*ChaosResult, error) { + hash, meta, err := ci.resolveInstanceHash(objectURL, etag, instanceHash) + if err != nil { + return nil, err + } + if meta.IsInline() { + return nil, errors.New("object is stored inline in the database; chaos injection only supports disk-backed objects") + } + + chunkCount := meta.ChunkCount() + if chunkIndex < 0 { + chunkIndex = chunkCount - 1 + } + if chunkIndex >= chunkCount { + return nil, errors.Errorf("chunk %d is out of range (object has %d chunk(s))", chunkIndex, chunkCount) + } + storageID := meta.GetChunkStorageID(chunkIndex) + if storageID == StorageIDInline { + return nil, errors.Errorf("chunk %d is not yet allocated on disk", chunkIndex) + } + chunkPath, err := ci.safeChunkPath(storageID, hash, chunkIndex) + if err != nil { + return nil, err + } + + if dropBytes <= 0 { + dropBytes = BlockTotalSize + } + + f, err := os.OpenFile(chunkPath, os.O_RDWR, 0) + if err != nil { + return nil, errors.Wrapf(err, "failed to open chunk file %s", chunkPath) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + return nil, errors.Wrap(err, "failed to stat chunk file") + } + oldSize := info.Size() + newSize := oldSize - dropBytes + if newSize < 0 { + newSize = 0 + } + if err := f.Truncate(newSize); err != nil { + return nil, errors.Wrap(err, "failed to truncate chunk file") + } + + return &ChaosResult{ + InstanceHash: string(hash), + SourceURL: meta.SourceURL, + ETag: meta.ETag, + Operation: "truncate", + ChunkIndex: chunkIndex, + FilePath: chunkPath, + DiskOffset: newSize, + OldFileSize: oldSize, + NewFileSize: newSize, + }, nil +} diff --git a/local_cache/chaos_test.go b/local_cache/chaos_test.go new file mode 100644 index 000000000..de336d893 --- /dev/null +++ b/local_cache/chaos_test.go @@ -0,0 +1,125 @@ +//go:build !windows + +/*************************************************************** + * + * Copyright (C) 2026, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package local_cache + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// newChaosTestCache opens a read-write cache (database + storage) and returns +// it along with a disk storage ID. The chaos injector wraps these same live +// handles, mirroring how the cache server uses it in-process. +func newChaosTestCache(t *testing.T) (*CacheDB, *StorageManager, StorageID) { + t.Helper() + InitIssuerKeyForTests(t) + tmpDir := t.TempDir() + ctx := context.Background() + + db, err := NewCacheDB(ctx, tmpDir) + require.NoError(t, err) + t.Cleanup(func() { _ = db.Close() }) + + egrp, _ := errgroup.WithContext(ctx) + storage, err := NewStorageManager(db, []string{tmpDir}, 0, egrp) + require.NoError(t, err) + t.Cleanup(storage.Close) + + var diskID StorageID + for id := range storage.GetDirs() { + diskID = id + } + return db, storage, diskID +} + +// TestChaosCorruptBlock verifies that CorruptBlock flips on-disk bytes such +// that the targeted block can no longer be read back (its authentication tag +// fails). Block 1 is corrupted and read cold (it is never read before +// corruption, so it is not served from the in-memory plaintext cache). +func TestChaosCorruptBlock(t *testing.T) { + db, storage, diskID := newChaosTestCache(t) + ctx := context.Background() + + data := make([]byte, 2*BlockDataSize) + for i := range data { + data[i] = byte(i % 251) + } + hash := InstanceHash("abab000000000000000000000000000000000000000000000000000000000001") + storeTestObject(t, ctx, storage, hash, data, diskID, NamespaceID(1)) + + ci := NewChaosInjector(db, storage) + res, err := ci.CorruptBlock("", "", string(hash), 1, 0) + require.NoError(t, err) + assert.Equal(t, "corrupt-block", res.Operation) + assert.Equal(t, int64(1), res.BlockNum) + assert.Equal(t, AuthTagSize, res.BytesChanged) + assert.Equal(t, res.OldFileSize, res.NewFileSize) + + // Block 1 (never read before corruption) now fails to decrypt. + _, err = storage.ReadBlocks(hash, BlockDataSize, BlockDataSize) + require.Error(t, err, "reading a corrupted block should fail the authentication check") +} + +// TestChaosTruncateObject verifies that TruncateObject drops trailing blocks so +// that a removed block can no longer be read back. +func TestChaosTruncateObject(t *testing.T) { + db, storage, diskID := newChaosTestCache(t) + ctx := context.Background() + + data := make([]byte, 2*BlockDataSize) + for i := range data { + data[i] = byte(i % 251) + } + hash := InstanceHash("baba000000000000000000000000000000000000000000000000000000000001") + storeTestObject(t, ctx, storage, hash, data, diskID, NamespaceID(1)) + + ci := NewChaosInjector(db, storage) + res, err := ci.TruncateObject("", "", string(hash), -1, 0) + require.NoError(t, err) + assert.Equal(t, "truncate", res.Operation) + assert.Equal(t, res.OldFileSize-BlockTotalSize, res.NewFileSize, + "default truncation drops one encrypted block") + + // The dropped (last) block can no longer be read. + _, err = storage.ReadBlocks(hash, BlockDataSize, BlockDataSize) + require.Error(t, err, "reading a truncated-away block should fail") +} + +// TestChaosInlineRejected verifies that chaos injection refuses inline objects, +// which live in the database rather than on disk. +func TestChaosInlineRejected(t *testing.T) { + db, storage, _ := newChaosTestCache(t) + ctx := context.Background() + + hash := InstanceHash("acdc000000000000000000000000000000000000000000000000000000000001") + small := []byte("small inline payload") + meta := &CacheMetadata{ContentLength: int64(len(small)), StorageID: StorageIDInline, NamespaceID: NamespaceID(1)} + require.NoError(t, storage.StoreInline(ctx, hash, meta, small)) + + ci := NewChaosInjector(db, storage) + _, err := ci.CorruptBlock("", "", string(hash), 0, 0) + require.Error(t, err) + assert.Contains(t, err.Error(), "inline") +} diff --git a/local_cache/consistency.go b/local_cache/consistency.go index 00b78cbec..4701d99db 100644 --- a/local_cache/consistency.go +++ b/local_cache/consistency.go @@ -54,6 +54,10 @@ var ( errChannelFull = errors.New("channel_full") errScanDone = errors.New("scan_done") errChecksumSkipped = errors.New("checksum_skipped") + // errChecksumAlreadyVerified is returned (in "once" data-scan mode) for an + // object whose on-disk data has already been verified, so it is skipped + // without being re-read. + errChecksumAlreadyVerified = errors.New("checksum_already_verified") metadataScanInconsistentObjects = promauto.NewCounter(prometheus.CounterOpts{ Name: "pelican_cache_metadata_scan_inconsistent_objects_total", @@ -103,6 +107,19 @@ var ( Name: "pelican_cache_data_scan_bytes_processed_total", Help: "Total bytes processed during data scans", }) + // dataScanModeOnce is 1 when the data scan is in "once" mode (verify each + // object once then skip), which relies on the underlying storage for + // ongoing at-rest integrity. Exported so a misconfigured cache is visible. + dataScanModeOnce = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pelican_cache_data_scan_mode_once", + Help: "1 when the data-integrity scan is in \"once\" mode (Cache.DataScanMode=once); 0 for full re-verification each cycle", + }) + // chaosAPIEnabled is 1 when the destructive chaos/fault-injection admin API + // is registered (Cache.EnableChaosAPI). It should be 0 in production. + chaosAPIEnabled = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pelican_cache_chaos_api_enabled", + Help: "1 when the cache chaos/fault-injection admin API is enabled (Cache.EnableChaosAPI); should be 0 in production", + }) ) // ConsistencyChecker verifies cache consistency between database and disk. @@ -117,6 +134,8 @@ type ConsistencyChecker struct { dataScanBytesPerSec int64 // Max bytes per second for data scanning minAgeForCleanup time.Duration // Minimum age before cleanup to avoid races checksumTypes []ChecksumType // Checksum algorithms to calculate/verify + skipVerifiedData bool // When true, the data scan verifies each object once then skips it + resampleInterval int // In skip mode, re-verify ~1/N already-verified objects (0 disables) // Statistics stats ConsistencyStats @@ -142,6 +161,18 @@ type ConsistencyConfig struct { // ChecksumTypes specifies which checksums to calculate and verify. // When empty, defaults to []ChecksumType{ChecksumSHA256}. ChecksumTypes []ChecksumType + // SkipVerifiedData, when true, makes the data scan verify each object's + // on-disk data exactly once (recording the checksum and comparing it + // against the origin's reported value when available) and then skip that + // object on subsequent scans. Intended for storage backends that already + // guarantee at-rest integrity (e.g. ZFS). When false (default) every scan + // re-verifies all objects. + SkipVerifiedData bool + // ResampleInterval, when SkipVerifiedData is true, makes the scan still + // re-verify roughly 1 in N already-verified objects each cycle, so there is + // a nonzero floor of at-rest corruption detection even in "once" mode. + // 0 disables re-sampling (pure once). + ResampleInterval int } // ConsistencyStats holds statistics from consistency checks @@ -185,15 +216,34 @@ func NewConsistencyChecker(db *CacheDB, storage *StorageManager, config Consiste metadataScanLastStartTime.Set(now) dataScanLastStartTime.Set(now) - return &ConsistencyChecker{ + cc := &ConsistencyChecker{ db: db, storage: storage, metadataScanLimiter: limiter, dataScanBytesPerSec: config.DataScanBytesPerSec, minAgeForCleanup: config.MinAgeForCleanup, checksumTypes: checksumTypes, + skipVerifiedData: config.SkipVerifiedData, + resampleInterval: config.ResampleInterval, stopCh: make(chan struct{}), } + + // Surface the data-scan mode so a misconfigured cache is visible: emit a + // loud warning and export a metric when running in "once" mode, which + // relies on the underlying storage (e.g. ZFS scrubbing) for ongoing at-rest + // integrity rather than re-reading every object. + if config.SkipVerifiedData { + dataScanModeOnce.Set(1) + if config.ResampleInterval > 0 { + log.Warnf("Cache data-integrity scan is in \"once\" mode: each object's on-disk data is verified once and then skipped (re-sampling ~1/%d per cycle). This relies on the underlying storage to detect at-rest corruption; ensure it scrubs (e.g. ZFS).", config.ResampleInterval) + } else { + log.Warn("Cache data-integrity scan is in \"once\" mode with re-sampling disabled: each object's on-disk data is verified once and then never re-checked. This relies entirely on the underlying storage to detect at-rest corruption; ensure it scrubs (e.g. ZFS).") + } + } else { + dataScanModeOnce.Set(0) + } + + return cc } // Start begins the background consistency checking goroutines @@ -1235,8 +1285,8 @@ func (cc *ConsistencyChecker) processBatchForDataScan( prevObjects := *objectsVerified if err := cc.verifyObjectChecksum(ctx, item.instanceHash, item.meta, bytesLimiter, checksumMismatches, inconsistentBytes, bytesVerified, objectsVerified); err != nil { - if errors.Is(err, errChecksumSkipped) { - continue // Incomplete object — don't count as verified + if errors.Is(err, errChecksumSkipped) || errors.Is(err, errChecksumAlreadyVerified) { + continue // Incomplete or already-verified object — don't count as verified } sl.WithError(err).WithField("instanceHash", item.instanceHash).Warn("Error verifying object") } @@ -1283,11 +1333,23 @@ func (cc *ConsistencyChecker) verifyObjectChecksum( } } + // In "verify once" mode, skip objects whose on-disk data has already been + // read back and verified. The expensive read+hash is avoided entirely. + // A small fraction (~1/resampleInterval) is still re-verified each cycle so + // there is a nonzero floor of at-rest corruption detection rather than zero. + if cc.skipVerifiedData && !meta.DataVerified.IsZero() { + if cc.resampleInterval <= 0 || rand.Intn(cc.resampleInterval) != 0 { + return errChecksumAlreadyVerified + } + // Otherwise fall through and re-verify this object (re-sampled). + } + // If no checksums available, calculate and store them if len(meta.Checksums) == 0 { if err := cc.calculateAndStoreChecksums(ctx, instanceHash, meta, bytesLimiter); err != nil { return err } + cc.markDataVerified(instanceHash) *objectsVerified++ return nil } @@ -1323,10 +1385,25 @@ func (cc *ConsistencyChecker) verifyObjectChecksum( } *bytesVerified += verified + cc.markDataVerified(instanceHash) *objectsVerified++ return nil } +// markDataVerified records that an object's on-disk data has just been read +// back and checksum-verified. It is only persisted in "verify once" mode, +// where the recorded timestamp causes subsequent scans to skip the object; +// in the default mode it is a no-op to avoid a metadata write per object per +// scan cycle. +func (cc *ConsistencyChecker) markDataVerified(instanceHash InstanceHash) { + if !cc.skipVerifiedData { + return + } + if err := cc.db.MergeMetadata(instanceHash, &CacheMetadata{DataVerified: time.Now()}); err != nil { + log.Warnf("Failed to record data-verified timestamp for %s: %v", instanceHash, err) + } +} + // hashObjectData reads all object data through the storage manager and // writes it to every hasher. Returns the number of bytes hashed. // This is shared between verifyObjectChecksum and calculateAndStoreChecksums diff --git a/local_cache/database.go b/local_cache/database.go index 00481e36d..2449ecd20 100644 --- a/local_cache/database.go +++ b/local_cache/database.go @@ -417,6 +417,9 @@ func mergeMetadataFields(existing, incoming *CacheMetadata) error { if incoming.Completed.After(existing.Completed) { existing.Completed = incoming.Completed } + if incoming.DataVerified.After(existing.DataVerified) { + existing.DataVerified = incoming.DataVerified + } // --- Additive: Checksums --- existing.Checksums = mergeChecksums(existing.Checksums, incoming.Checksums) diff --git a/local_cache/monitoring_test.go b/local_cache/monitoring_test.go new file mode 100644 index 000000000..d1522933e --- /dev/null +++ b/local_cache/monitoring_test.go @@ -0,0 +1,117 @@ +//go:build !windows + +/*************************************************************** + * + * Copyright (C) 2026, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package local_cache + +import ( + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pelicanplatform/pelican/metrics" + "github.com/pelicanplatform/pelican/param" +) + +func drainMonitorChan(ch chan []byte) { + for { + select { + case <-ch: + default: + return + } + } +} + +// TestEmitTransferMonitoring verifies that serving a GET emits XRootD-style +// monitoring packets to the shoveler's internal channel when monitoring is +// enabled. +func TestEmitTransferMonitoring(t *testing.T) { + require.NoError(t, param.Shoveler_Enable.Set(true)) + defer func() { require.NoError(t, param.Shoveler_Enable.Set(false)) }() + + ch := metrics.GetInternalMonitorChan() + drainMonitorChan(ch) + + pc := &PersistentCache{} + req := httptest.NewRequest("GET", "http://cache.example/test/foo.dat", nil) + req.Header.Set("User-Agent", "pelican-client/7.0 project/myproj") + req.RemoteAddr = "192.0.2.10:54321" + + pc.emitTransferMonitoring(req, "/test/foo.dat", 4096, time.Now().Add(-time.Second), "") + + select { + case pkt := <-ch: + assert.NotEmpty(t, pkt, "expected a non-empty monitoring packet") + case <-time.After(2 * time.Second): + t.Fatal("expected a monitoring packet to be emitted") + } +} + +// TestEmitTransferMonitoring_NoOp verifies no packets are emitted when the +// shoveler is disabled or when nothing was served. +func TestEmitTransferMonitoring_NoOp(t *testing.T) { + ch := metrics.GetInternalMonitorChan() + + pc := &PersistentCache{} + req := httptest.NewRequest("GET", "http://cache.example/test/foo.dat", nil) + + // Shoveler disabled (default): no packets even with bytes served. + require.NoError(t, param.Shoveler_Enable.Set(false)) + drainMonitorChan(ch) + pc.emitTransferMonitoring(req, "/test/foo.dat", 4096, time.Now(), "") + + // Shoveler enabled but zero bytes served (e.g. a 304): no packets. + require.NoError(t, param.Shoveler_Enable.Set(true)) + defer func() { require.NoError(t, param.Shoveler_Enable.Set(false)) }() + pc.emitTransferMonitoring(req, "/test/foo.dat", 0, time.Now(), "") + + select { + case <-ch: + t.Fatal("did not expect a monitoring packet") + case <-time.After(200 * time.Millisecond): + } +} + +func TestCacheClientIP(t *testing.T) { + tests := []struct { + name string + remoteAddr string + headers map[string]string + expected string + }{ + {"remote addr", "192.0.2.5:1234", nil, "192.0.2.5"}, + {"x-forwarded-for single", "10.0.0.1:1", map[string]string{"X-Forwarded-For": "203.0.113.7"}, "203.0.113.7"}, + {"x-forwarded-for list", "10.0.0.1:1", map[string]string{"X-Forwarded-For": "203.0.113.7, 10.0.0.1"}, "203.0.113.7"}, + {"x-real-ip", "10.0.0.1:1", map[string]string{"X-Real-IP": "198.51.100.2"}, "198.51.100.2"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + req := httptest.NewRequest("GET", "http://cache/test/x", nil) + req.RemoteAddr = tc.remoteAddr + for k, v := range tc.headers { + req.Header.Set(k, v) + } + assert.Equal(t, tc.expected, cacheClientIP(req)) + }) + } +} diff --git a/local_cache/monitoring_udp_test.go b/local_cache/monitoring_udp_test.go new file mode 100644 index 000000000..a9d95d0d7 --- /dev/null +++ b/local_cache/monitoring_udp_test.go @@ -0,0 +1,188 @@ +//go:build !windows + +/*************************************************************** + * + * Copyright (C) 2026, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package local_cache + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/metrics" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/pelican_url" + "github.com/pelicanplatform/pelican/server_structs" + "github.com/pelicanplatform/pelican/server_utils" +) + +// TestCacheMonitoringUDPCapture is a full handler-level end-to-end test: it +// stands up a real persistent cache, serves an HTTP GET through serveObject, +// runs the real monitoring shoveler forwarding to a UDP collector, and asserts +// that the XRootD-style monitoring packets are captured off the wire. +// +// It deliberately avoids the (slow) e2e_fed_tests federation harness: the cache +// is built with DeferConfig (no director fetch) against a stub federation, and +// a public namespace is injected directly so a tokenless GET is served. +func TestCacheMonitoringUDPCapture(t *testing.T) { + server_utils.ResetTestState() + t.Cleanup(server_utils.ResetTestState) + InitIssuerKeyForTests(t) // must follow ResetTestState, which clears the issuer key dir + + ctx, cancel := context.WithCancel(context.Background()) + egrp, _ := errgroup.WithContext(ctx) + t.Cleanup(func() { + cancel() + _ = egrp.Wait() + }) + + // Stub federation so NewPersistentCache resolves offline (no discovery). + config.SetFederation(pelican_url.FederationDiscovery{ + DiscoveryEndpoint: "https://cache.example:8443", + DirectorEndpoint: "https://cache.example:8443", + }) + + // UDP collector that stands in for a monitoring aggregator. + collector, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1")}) + require.NoError(t, err) + t.Cleanup(func() { _ = collector.Close() }) + collectorPort := collector.LocalAddr().(*net.UDPAddr).Port + + // Configure and launch the real shoveler, forwarding to our collector. The + // message queue is required by config but we only use the UDP-forwarding + // path; the STOMP backend points at an unreachable URL and merely retries in + // the background. + require.NoError(t, param.Logging_Level.Set("error")) // configShoveler parses this + require.NoError(t, param.Shoveler_Enable.Set(true)) + require.NoError(t, param.Shoveler_MessageQueueProtocol.Set("stomp")) + require.NoError(t, param.Shoveler_URL.Set("stomp://127.0.0.1:1")) + require.NoError(t, param.Shoveler_OutputDestinations.Set([]string{fmt.Sprintf("127.0.0.1:%d", collectorPort)})) + // Let the shoveler's own UDP listener bind an arbitrary free port. + require.NoError(t, param.Shoveler_PortLower.Set(0)) + require.NoError(t, param.Shoveler_PortHigher.Set(1)) + _, err = metrics.LaunchShoveler(ctx, egrp) + require.NoError(t, err) + + // Build a real persistent cache offline. DeferConfig skips the initial + // director namespace fetch; everything else (db, storage, transfer engine, + // authorizer) is wired normally. + tmpDir := t.TempDir() + pc, err := NewPersistentCache(ctx, egrp, PersistentCacheConfig{ + Mode: CacheModeServer, + BaseDir: tmpDir, + StorageDirs: []StorageDirConfig{{Path: tmpDir}}, + DeferConfig: true, + }) + require.NoError(t, err) + t.Cleanup(func() { _ = pc.Close() }) + + // Inject a public namespace so the tokenless GET authorizes. + require.NoError(t, pc.ac.updateConfig([]server_structs.NamespaceAdV2{{ + Path: "/test", + Caps: server_structs.Capabilities{PublicReads: true, Reads: true}, + }})) + + // Pre-store a cached object under the instance hash that resolveObject will + // look up for this path (latest ETag is empty since none is recorded). + const objectPath = "/test/hello_world.txt" + const etag = "monitoring-test-etag" + normalized := pc.normalizePath(objectPath) + objectHash := pc.db.ObjectHash(normalized) + instanceHash := pc.db.InstanceHash(etag, objectHash) + var diskID StorageID + for id := range pc.storage.GetDirs() { + diskID = id + } + data := bytes.Repeat([]byte("monitoring-udp-capture-test\n"), 500) // ~14 KiB, multiple blocks + storeTestObject(t, ctx, pc.storage, instanceHash, data, diskID, NamespaceID(1)) + // Register the latest-ETag mapping so resolveObject treats this as a cache + // hit (it only loads metadata when a latest ETag is recorded). + require.NoError(t, pc.db.SetLatestETag(objectHash, etag, time.Now())) + + // Serve the cache's object handler over real HTTP. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + pc.serveObject(w, r) + })) + t.Cleanup(srv.Close) + + // Issue the GET that should produce monitoring packets. + resp, err := http.Get(srv.URL + objectPath) + require.NoError(t, err) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + _ = resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode, "GET failed: %s", string(body)) + require.Equal(t, data, body, "served body should match the cached object") + + // Capture forwarded datagrams from the collector and decode the raw XRootD + // monitoring packets out of the shoveler's JSON envelope. + var rawPackets [][]byte + deadline := time.Now().Add(5 * time.Second) + require.NoError(t, collector.SetReadDeadline(deadline)) + buf := make([]byte, 65536) + sawUser, sawFStream, sawPath := false, false, false + for { + n, _, rerr := collector.ReadFromUDP(buf) + if rerr != nil { + break // deadline reached + } + var env struct { + Data string `json:"data"` + } + if jsonErr := json.Unmarshal(buf[:n], &env); jsonErr != nil { + continue + } + pkt, decErr := base64.StdEncoding.DecodeString(env.Data) + if decErr != nil || len(pkt) == 0 { + continue + } + rawPackets = append(rawPackets, pkt) + switch pkt[0] { + case 'u': + sawUser = true + case 'f': + sawFStream = true + if bytes.Contains(pkt, []byte(objectPath)) { + sawPath = true + } + } + if sawUser && sawFStream && sawPath { + break + } + require.NoError(t, collector.SetReadDeadline(deadline)) + } + + require.NotEmpty(t, rawPackets, "expected to capture monitoring packets over UDP") + assert.True(t, sawUser, "expected a user-login ('u') packet") + assert.True(t, sawFStream, "expected an f-stream ('f') packet") + assert.True(t, sawPath, "expected the object path to appear in an f-stream packet") +} diff --git a/local_cache/persistent_cache.go b/local_cache/persistent_cache.go index ee91f9b28..2029d8fc1 100644 --- a/local_cache/persistent_cache.go +++ b/local_cache/persistent_cache.go @@ -349,6 +349,18 @@ const ( CacheModeServer ) +// useEmbeddedCacheMode reports whether the embedded transfer client should +// run in cache-embedded mode, i.e. route its upstream fetches through the +// director's origin endpoint and pull directly from origins. +// +// In site-local mode (Cache.EnableSiteLocalMode) the cache deliberately does +// not join the federation and instead appears to it as a client, fetching +// objects from other caches rather than from origins. In that case we +// disable embedded mode so the director redirects us to caches. +func useEmbeddedCacheMode() bool { + return !param.Cache_EnableSiteLocalMode.GetBool() +} + // PersistentCacheConfig holds configuration for the persistent cache type PersistentCacheConfig struct { // Mode selects which configuration namespace to use for defaults. @@ -569,9 +581,16 @@ func NewPersistentCache(ctx context.Context, egrp *errgroup.Group, cfg Persisten // Safe: this runs during single-threaded init, before any downloads. storage.chooseDir = eviction.ChooseDiskStorage - // Initialize consistency checker + // Initialize consistency checker. In "once" data-scan mode the periodic + // integrity scan verifies each object's on-disk data a single time and + // then skips it, for deployments whose storage already guarantees at-rest + // integrity (e.g. ZFS). Any value other than "once" keeps the default + // behaviour of re-verifying every object on each scan cycle. + scanOnce := strings.EqualFold(param.Cache_DataScanMode.GetString(), "once") consistency := NewConsistencyChecker(db, storage, ConsistencyConfig{ MinAgeForCleanup: -1, // Use default grace period + SkipVerifiedData: scanOnce, + ResampleInterval: param.Cache_DataScanResampleInterval.GetInt(), }) // Get federation info @@ -1273,7 +1292,7 @@ func (pc *PersistentCache) HeadObject(objectPath, token string) (*HeadResult, er opts := []client.TransferOption{ client.WithToken(token), - client.WithCacheEmbeddedClientMode(), + client.WithCacheEmbeddedClientMode(useEmbeddedCacheMode()), client.WithRequestChecksums(client.KnownChecksumTypes()), } if ft := pc.getFedToken(); ft != "" { @@ -1392,7 +1411,7 @@ func (pc *PersistentCache) stat(objectPath, token string, cachedOnly bool) (uint dUrl.Path = objectPath dUrl.Scheme = "pelican" - opts := []client.TransferOption{client.WithToken(token), client.WithCacheEmbeddedClientMode()} + opts := []client.TransferOption{client.WithToken(token), client.WithCacheEmbeddedClientMode(useEmbeddedCacheMode())} if ft := pc.getFedToken(); ft != "" { opts = append(opts, client.WithFedToken(pc.fedTokenAsProvider())) } @@ -1451,7 +1470,7 @@ func (pc *PersistentCache) doInitObjectFromStat( } dUrl.Scheme = "pelican" - opts := []client.TransferOption{client.WithToken(token), client.WithCacheEmbeddedClientMode()} + opts := []client.TransferOption{client.WithToken(token), client.WithCacheEmbeddedClientMode(useEmbeddedCacheMode())} if ft := pc.getFedToken(); ft != "" { opts = append(opts, client.WithFedToken(pc.fedTokenAsProvider())) } @@ -1718,10 +1737,12 @@ func (pc *PersistentCache) performDownload(ctx context.Context, dl *persistentDo return errors.Wrap(err, "invalid source URL") } - // Route the request through the director's origin endpoint so that the - // director redirects us to the origin. The client's cache mode causes - // queryDirector to use the /api/v1.0/director/origin/ prefix, avoiding - // the need for the origin to have the DirectReads capability. + // By default, route the request through the director's origin endpoint so + // that the director redirects us to the origin. The client's cache mode + // (see useEmbeddedCacheMode) causes queryDirector to use the + // /api/v1.0/director/origin/ prefix, avoiding the need for the origin to + // have the DirectReads capability. In site-local mode embedded mode is + // disabled and the director instead redirects us to other caches. sourceURL.Scheme = "pelican" // Pass the user token and federation token as separate options. @@ -1752,7 +1773,7 @@ func (pc *PersistentCache) performDownload(ctx context.Context, dl *persistentDo } // Create per-request transfer client - tc, err := pc.te.NewClient(client.WithAcquireToken(false), client.WithCallback(progressCallback), client.WithCacheEmbeddedClientMode()) + tc, err := pc.te.NewClient(client.WithAcquireToken(false), client.WithCallback(progressCallback), client.WithCacheEmbeddedClientMode(useEmbeddedCacheMode())) if err != nil { return errors.Wrap(err, "failed to create transfer client") } @@ -1958,7 +1979,7 @@ func (pc *PersistentCache) performDownload(ctx context.Context, dl *persistentDo // size instead of blocking for the entire download. statOpts := []client.TransferOption{ client.WithToken(userToken), - client.WithCacheEmbeddedClientMode(), + client.WithCacheEmbeddedClientMode(useEmbeddedCacheMode()), } if fedTP != nil { statOpts = append(statOpts, client.WithFedToken(fedTP)) diff --git a/local_cache/persistent_cache_api.go b/local_cache/persistent_cache_api.go index 344910312..54e894ad7 100644 --- a/local_cache/persistent_cache_api.go +++ b/local_cache/persistent_cache_api.go @@ -27,6 +27,7 @@ import ( "fmt" "io" "io/fs" + "math" "net" "net/http" "net/url" @@ -45,10 +46,12 @@ import ( "github.com/pelicanplatform/pelican/client" "github.com/pelicanplatform/pelican/config" "github.com/pelicanplatform/pelican/error_codes" + "github.com/pelicanplatform/pelican/metrics" "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/token" "github.com/pelicanplatform/pelican/token_scopes" + "github.com/pelicanplatform/pelican/utils" "github.com/pelicanplatform/pelican/web_ui" ) @@ -123,8 +126,9 @@ func isConnectionError(err error) bool { // chunked transfer-encoding, which is required for HTTP/1.1 trailers. type trailerWriter struct { http.ResponseWriter - writeErr *error - sendTrailer bool + writeErr *error + sendTrailer bool + bytesWritten int64 } func (tw *trailerWriter) Header() http.Header { @@ -142,6 +146,7 @@ func (tw *trailerWriter) WriteHeader(code int) { func (tw *trailerWriter) Write(p []byte) (int, error) { n, err := tw.ResponseWriter.Write(p) + tw.bytesWritten += int64(n) if err != nil && *tw.writeErr == nil { *tw.writeErr = err } @@ -539,7 +544,8 @@ func (pc *PersistentCache) serveObject(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) var writeErr error - if _, err := io.Copy(w, reader); err != nil { + nServed, err := io.Copy(w, reader) + if err != nil { writeErr = err } if sendTrailer { @@ -549,6 +555,7 @@ func (pc *PersistentCache) serveObject(w http.ResponseWriter, r *http.Request) { } w.Header().Set("X-Transfer-Status", trailerVal) } + pc.emitTransferMonitoring(r, objectPath, nServed, startTime, bearerToken) reqLog.WithFields(log.Fields{ "status": 200, "cache": "pass-through", @@ -616,12 +623,66 @@ func (pc *PersistentCache) serveObject(w http.ResponseWriter, r *http.Request) { if meta != nil && !meta.Completed.IsZero() && meta.Completed.Before(startTime) { cacheStatus = "hit" } + pc.emitTransferMonitoring(r, objectPath, wrappedWriter.bytesWritten, startTime, bearerToken) reqLog.WithFields(log.Fields{ "cache": cacheStatus, "duration": time.Since(startTime).Round(time.Millisecond).String(), }).Info("Request complete") } +// emitTransferMonitoring emits an XRootD-style monitoring record for a served +// GET, mirroring what the POSIXv2 origin emits. It is a no-op when the +// monitoring shoveler is disabled or when no bytes were served (e.g. a 304). +// The packets flow to the shoveler's internal channel and on to the configured +// monitoring collectors. +func (pc *PersistentCache) emitTransferMonitoring(r *http.Request, objectPath string, bytesServed int64, start time.Time, bearerToken string) { + if bytesServed <= 0 || !param.Shoveler_Enable.GetBool() { + return + } + + event := metrics.TransferEvent{ + Path: objectPath, + ReadBytes: bytesServed, + ReadOps: 1, + ClientIP: cacheClientIP(r), + AuthProtocol: "https", + UserAgent: r.UserAgent(), + Project: utils.ExtractProjectFromUserAgent(r.Header.Values("User-Agent")), + StartTime: start, + EndTime: time.Now(), + } + + // Best-effort user attribution from the (already-authorized) token. The + // token signature is not re-verified here; it is used only for monitoring. + if bearerToken != "" { + if tok, err := token.UnsafeParseClaims(bearerToken); err == nil { + event.UserDN = tok.Subject() + event.Issuer = tok.Issuer() + } + } + + metrics.EmitTransferEvent(event) +} + +// cacheClientIP extracts the client IP from a request, preferring the +// X-Forwarded-For / X-Real-IP headers set by a fronting proxy and falling back +// to the connection's remote address. +func cacheClientIP(r *http.Request) string { + if xff := r.Header.Get("X-Forwarded-For"); xff != "" { + if idx := strings.IndexByte(xff, ','); idx >= 0 { + return strings.TrimSpace(xff[:idx]) + } + return strings.TrimSpace(xff) + } + if xrip := r.Header.Get("X-Real-IP"); xrip != "" { + return strings.TrimSpace(xrip) + } + if host, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { + return host + } + return r.RemoteAddr +} + // servePropfindFromCache synthesizes a WebDAV multistatus response from // cached metadata. It returns true if the response was served, false if // the object is not in the cache and the caller should fall through to @@ -1441,6 +1502,17 @@ func (pc *PersistentCache) RegisterCacheHandlers(engine *gin.Engine, directorEna adminIntrospect.POST("/consistency", pc.introspectConsistencyHandler) log.Info("Cache introspection API registered at /api/v1.0/cache/introspect/") + // Register the destructive chaos/fault-injection endpoint only when + // explicitly enabled (Cache.EnableChaosAPI), since it deliberately corrupts + // cached data. It is admin-authenticated like the rest of introspect. + if param.Cache_EnableChaosAPI.GetBool() { + adminIntrospect.POST("/chaos", pc.introspectChaosHandler) + chaosAPIEnabled.Set(1) + log.Warn("Cache chaos/fault-injection API is ENABLED at /api/v1.0/cache/introspect/chaos (Cache.EnableChaosAPI); this can corrupt cached data and should only be used for testing") + } else { + chaosAPIEnabled.Set(0) + } + return nil } @@ -1701,6 +1773,81 @@ func (pc *PersistentCache) introspectVerifyHandler(c *gin.Context) { c.JSON(http.StatusOK, result) } +// introspectChaosHandler injects corruption into a cached object for +// fault-injection testing. It is only registered when Cache.EnableChaosAPI is +// true and is admin-authenticated. +// +// POST /api/v1.0/cache/introspect/chaos?op=corrupt|truncate&url=...&etag=...&instance=... +// +// corrupt: &block=&bytes= +// truncate: &chunk=&drop-bytes= +func (pc *PersistentCache) introspectChaosHandler(c *gin.Context) { + op := c.Query("op") + objectURL := c.Query("url") + etag := c.Query("etag") + instance := c.Query("instance") + + if objectURL == "" && instance == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "url or instance query parameter is required"}) + return + } + + // atoiQuery parses a query parameter as an integer and enforces an explicit + // [min, max] range, so the value can be safely narrowed below. + atoiQuery := func(name string, def, min, max int64) (int64, error) { + v := c.Query(name) + if v == "" { + return def, nil + } + n, perr := strconv.ParseInt(v, 10, 64) + if perr != nil { + return 0, perr + } + if n < min || n > max { + return 0, errors.Errorf("%s=%d is out of range [%d, %d]", name, n, min, max) + } + return n, nil + } + + injector := NewChaosInjector(pc.db, pc.storage) + + var result *ChaosResult + var err error + switch op { + case "corrupt": + var block, nbytes int64 + if block, err = atoiQuery("block", 0, 0, math.MaxUint32); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if nbytes, err = atoiQuery("bytes", 0, 0, BlockTotalSize); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + result, err = injector.CorruptBlock(objectURL, etag, instance, uint32(block), int(nbytes)) + case "truncate": + var chunk, drop int64 + if chunk, err = atoiQuery("chunk", -1, -1, math.MaxInt32); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if drop, err = atoiQuery("drop-bytes", 0, 0, math.MaxInt64); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + result, err = injector.TruncateObject(objectURL, etag, instance, int(chunk), drop) + default: + c.JSON(http.StatusBadRequest, gin.H{"error": "op must be 'corrupt' or 'truncate'"}) + return + } + + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, result) +} + // getBlockSummaryLive computes block status using the live database. func (pc *PersistentCache) getBlockSummaryLive(instanceHash InstanceHash, contentLength int64) (*BlockSummary, error) { if contentLength <= 0 { diff --git a/local_cache/persistent_cache_test.go b/local_cache/persistent_cache_test.go index 3b8095a31..c804ef918 100644 --- a/local_cache/persistent_cache_test.go +++ b/local_cache/persistent_cache_test.go @@ -1856,6 +1856,164 @@ func TestDataScan_MissingChecksum(t *testing.T) { "stored checksum should match SHA-256 of original data") } +// TestDataScan_SkipVerifiedData verifies the "verify once" data-scan mode +// (SkipVerifiedData): the first scan reads back and checksums the object, +// records a DataVerified timestamp, and counts it as verified; subsequent +// scans skip the already-verified object instead of re-reading it. +func TestDataScan_SkipVerifiedData(t *testing.T) { + InitIssuerKeyForTests(t) + tmpDir := t.TempDir() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := NewCacheDB(ctx, tmpDir) + require.NoError(t, err) + defer db.Close() + + egrp, _ := errgroup.WithContext(ctx) + storage, err := NewStorageManager(db, []string{tmpDir}, 0, egrp) + require.NoError(t, err) + defer storage.Close() + + var diskID StorageID + for id := range storage.GetDirs() { + diskID = id + } + + data := make([]byte, BlockDataSize+500) + for i := range data { + data[i] = byte(i % 211) + } + hash := InstanceHash("dddd000000000000000000000000000000000000000000000000000000000001") + storeTestObject(t, ctx, storage, hash, data, diskID, NamespaceID(1)) + + checker := NewConsistencyChecker(db, storage, ConsistencyConfig{ + MetadataScanActiveMs: 1000, + DataScanBytesPerSec: 1 << 30, + MinAgeForCleanup: 0, + ChecksumTypes: []ChecksumType{ChecksumSHA256}, + SkipVerifiedData: true, + }) + + // First scan: the object has no checksum, so it is read back, checksummed, + // marked verified, and counted. + require.NoError(t, checker.RunDataScan(ctx, nil)) + assert.Equal(t, int64(1), checker.GetStats().ObjectsVerified, "first scan should verify the object once") + + meta, err := storage.GetMetadata(hash) + require.NoError(t, err) + require.Len(t, meta.Checksums, 1, "first scan should record a checksum") + require.False(t, meta.DataVerified.IsZero(), "first scan should record a DataVerified timestamp") + firstVerified := meta.DataVerified + + // Second scan: the object is already verified, so it must be skipped — the + // verified counter does not advance and DataVerified is unchanged. + require.NoError(t, checker.RunDataScan(ctx, nil)) + assert.Equal(t, int64(1), checker.GetStats().ObjectsVerified, "second scan should skip the already-verified object") + + meta, err = storage.GetMetadata(hash) + require.NoError(t, err) + assert.Equal(t, firstVerified, meta.DataVerified, "DataVerified should not change on a skipped scan") +} + +// TestDataScan_SkipVerifiedResample confirms that "once" mode still re-verifies +// already-checked objects when ResampleInterval is set, providing a floor of +// at-rest corruption detection rather than zero. With ResampleInterval=1 every +// already-verified object is re-checked, so a second scan re-verifies it. +func TestDataScan_SkipVerifiedResample(t *testing.T) { + InitIssuerKeyForTests(t) + tmpDir := t.TempDir() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := NewCacheDB(ctx, tmpDir) + require.NoError(t, err) + defer db.Close() + + egrp, _ := errgroup.WithContext(ctx) + storage, err := NewStorageManager(db, []string{tmpDir}, 0, egrp) + require.NoError(t, err) + defer storage.Close() + + var diskID StorageID + for id := range storage.GetDirs() { + diskID = id + } + + data := make([]byte, BlockDataSize+500) + for i := range data { + data[i] = byte(i % 211) + } + hash := InstanceHash("eded000000000000000000000000000000000000000000000000000000000001") + storeTestObject(t, ctx, storage, hash, data, diskID, NamespaceID(1)) + + checker := NewConsistencyChecker(db, storage, ConsistencyConfig{ + MetadataScanActiveMs: 1000, + DataScanBytesPerSec: 1 << 30, + MinAgeForCleanup: 0, + ChecksumTypes: []ChecksumType{ChecksumSHA256}, + SkipVerifiedData: true, + ResampleInterval: 1, // re-verify every already-checked object + }) + + require.NoError(t, checker.RunDataScan(ctx, nil)) + require.NoError(t, checker.RunDataScan(ctx, nil)) + // With ResampleInterval=1 the object is re-verified on both scans rather + // than being permanently skipped after the first. + assert.Equal(t, int64(2), checker.GetStats().ObjectsVerified, + "ResampleInterval=1 should re-verify the object every cycle") +} + +// TestDataScan_AllModeReverifies confirms that the default (non-skip) mode +// re-verifies an object on every scan, in contrast to SkipVerifiedData. +func TestDataScan_AllModeReverifies(t *testing.T) { + InitIssuerKeyForTests(t) + tmpDir := t.TempDir() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := NewCacheDB(ctx, tmpDir) + require.NoError(t, err) + defer db.Close() + + egrp, _ := errgroup.WithContext(ctx) + storage, err := NewStorageManager(db, []string{tmpDir}, 0, egrp) + require.NoError(t, err) + defer storage.Close() + + var diskID StorageID + for id := range storage.GetDirs() { + diskID = id + } + + data := make([]byte, BlockDataSize+500) + for i := range data { + data[i] = byte(i % 211) + } + hash := InstanceHash("cccc000000000000000000000000000000000000000000000000000000000001") + storeTestObject(t, ctx, storage, hash, data, diskID, NamespaceID(1)) + + checker := NewConsistencyChecker(db, storage, ConsistencyConfig{ + MetadataScanActiveMs: 1000, + DataScanBytesPerSec: 1 << 30, + MinAgeForCleanup: 0, + ChecksumTypes: []ChecksumType{ChecksumSHA256}, + // SkipVerifiedData defaults to false ("all" mode). + }) + + require.NoError(t, checker.RunDataScan(ctx, nil)) + require.NoError(t, checker.RunDataScan(ctx, nil)) + assert.Equal(t, int64(2), checker.GetStats().ObjectsVerified, "all mode should re-verify on every scan") + + // In all mode the DataVerified timestamp is never recorded. + meta, err := storage.GetMetadata(hash) + require.NoError(t, err) + assert.True(t, meta.DataVerified.IsZero(), "all mode should not record DataVerified") +} + // TestMultiDirStoragePlacement verifies that multi-directory storage works // end-to-end: objects land in different directories, per-directory size stats // are correct, and storage IDs are persisted in metadata. diff --git a/local_cache/schema.go b/local_cache/schema.go index 2a88354c7..2455f8f2b 100644 --- a/local_cache/schema.go +++ b/local_cache/schema.go @@ -322,7 +322,8 @@ type Checksum struct { // are reconciled: // // - Max-time: LastModified, LastValidated, LastAccessTime, Expires, -// Completed — only advance forward (keep the later timestamp). +// Completed, DataVerified — only advance forward (keep the later +// timestamp). // - Additive: Checksums — union by algorithm; prefer OriginVerified. // - Last-writer-wins: ContentType, ContentLength, VaryHeaders, // CCFlags, CCMaxAge — the incoming value always replaces the old one. @@ -341,6 +342,7 @@ type CacheMetadata struct { LastValidated time.Time `msgpack:"lv"` // When we last validated with origin Completed time.Time `msgpack:"c"` // When download was completed Checksums []Checksum `msgpack:"ck,omitempty"` // Object checksums + DataVerified time.Time `msgpack:"dv,omitempty"` // When the on-disk data was last read back and checksum-verified // Identification fields ContentType string `msgpack:"ct"` // MIME type diff --git a/param/parameters.go b/param/parameters.go index 17bc25c29..136f1c9c7 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -104,11 +104,14 @@ var runtimeConfigurableMap = map[string]bool{ "Cache.ConcurrencyDegradedThreshold": false, "Cache.DataLocation": false, "Cache.DataLocations": false, + "Cache.DataScanMode": false, + "Cache.DataScanResampleInterval": false, "Cache.DbLocation": false, "Cache.DefaultCacheTimeout": false, "Cache.DirectorTest": false, "Cache.DisableClientX509": false, "Cache.EnableBroker": false, + "Cache.EnableChaosAPI": false, "Cache.EnableEvictionMonitoring": false, "Cache.EnableLotman": false, "Cache.EnableOIDC": false, @@ -559,6 +562,7 @@ func paramNameToEnvVar(paramName string) string { var stringAccessors = map[string]func(*Config) string{ "Cache.ClientStatisticsLocation": func(c *Config) string { return c.Cache.ClientStatisticsLocation }, "Cache.DataLocation": func(c *Config) string { return c.Cache.DataLocation }, + "Cache.DataScanMode": func(c *Config) string { return c.Cache.DataScanMode }, "Cache.DbLocation": func(c *Config) string { return c.Cache.DbLocation }, "Cache.ExportLocation": func(c *Config) string { return c.Cache.ExportLocation }, "Cache.FedTokenLocation": func(c *Config) string { return c.Cache.FedTokenLocation }, @@ -834,6 +838,7 @@ var intAccessors = map[string]func(*Config) int{ "Cache.BlocksToPrefetch": func(c *Config) int { return c.Cache.BlocksToPrefetch }, "Cache.Concurrency": func(c *Config) int { return c.Cache.Concurrency }, "Cache.ConcurrencyDegradedThreshold": func(c *Config) int { return c.Cache.ConcurrencyDegradedThreshold }, + "Cache.DataScanResampleInterval": func(c *Config) int { return c.Cache.DataScanResampleInterval }, "Cache.EvictionMonitoringMaxDepth": func(c *Config) int { return c.Cache.EvictionMonitoringMaxDepth }, "Cache.Port": func(c *Config) int { return c.Cache.Port }, "ClientAgent.HistoryRetentionDays": func(c *Config) int { return c.ClientAgent.HistoryRetentionDays }, @@ -958,6 +963,7 @@ var boolAccessors = map[string]func(*Config) bool{ "Cache.DirectorTest": func(c *Config) bool { return c.Cache.DirectorTest }, "Cache.DisableClientX509": func(c *Config) bool { return c.Cache.DisableClientX509 }, "Cache.EnableBroker": func(c *Config) bool { return c.Cache.EnableBroker }, + "Cache.EnableChaosAPI": func(c *Config) bool { return c.Cache.EnableChaosAPI }, "Cache.EnableEvictionMonitoring": func(c *Config) bool { return c.Cache.EnableEvictionMonitoring }, "Cache.EnableLotman": func(c *Config) bool { return c.Cache.EnableLotman }, "Cache.EnableOIDC": func(c *Config) bool { return c.Cache.EnableOIDC }, @@ -1235,11 +1241,14 @@ var allParameterNames = []string{ "Cache.ConcurrencyDegradedThreshold", "Cache.DataLocation", "Cache.DataLocations", + "Cache.DataScanMode", + "Cache.DataScanResampleInterval", "Cache.DbLocation", "Cache.DefaultCacheTimeout", "Cache.DirectorTest", "Cache.DisableClientX509", "Cache.EnableBroker", + "Cache.EnableChaosAPI", "Cache.EnableEvictionMonitoring", "Cache.EnableLotman", "Cache.EnableOIDC", @@ -1663,6 +1672,7 @@ var allParameterNames = []string{ var ( Cache_ClientStatisticsLocation = StringParam{"Cache.ClientStatisticsLocation"} Cache_DataLocation = StringParam{"Cache.DataLocation"} + Cache_DataScanMode = StringParam{"Cache.DataScanMode"} Cache_DbLocation = StringParam{"Cache.DbLocation"} Cache_ExportLocation = StringParam{"Cache.ExportLocation"} Cache_FedTokenLocation = StringParam{"Cache.FedTokenLocation"} @@ -1882,6 +1892,7 @@ var ( Cache_BlocksToPrefetch = IntParam{"Cache.BlocksToPrefetch"} Cache_Concurrency = IntParam{"Cache.Concurrency"} Cache_ConcurrencyDegradedThreshold = IntParam{"Cache.ConcurrencyDegradedThreshold"} + Cache_DataScanResampleInterval = IntParam{"Cache.DataScanResampleInterval"} Cache_EvictionMonitoringMaxDepth = IntParam{"Cache.EvictionMonitoringMaxDepth"} Cache_Port = IntParam{"Cache.Port"} ClientAgent_HistoryRetentionDays = IntParam{"ClientAgent.HistoryRetentionDays"} @@ -1941,6 +1952,7 @@ var ( Cache_DirectorTest = BoolParam{"Cache.DirectorTest"} Cache_DisableClientX509 = BoolParam{"Cache.DisableClientX509"} Cache_EnableBroker = BoolParam{"Cache.EnableBroker"} + Cache_EnableChaosAPI = BoolParam{"Cache.EnableChaosAPI"} Cache_EnableEvictionMonitoring = BoolParam{"Cache.EnableEvictionMonitoring"} Cache_EnableLotman = BoolParam{"Cache.EnableLotman"} Cache_EnableOIDC = BoolParam{"Cache.EnableOIDC"} @@ -2127,6 +2139,7 @@ func init() { paramByName = map[string]Param{ "Cache.ClientStatisticsLocation": Cache_ClientStatisticsLocation, "Cache.DataLocation": Cache_DataLocation, + "Cache.DataScanMode": Cache_DataScanMode, "Cache.DbLocation": Cache_DbLocation, "Cache.ExportLocation": Cache_ExportLocation, "Cache.FedTokenLocation": Cache_FedTokenLocation, @@ -2340,6 +2353,7 @@ func init() { "Cache.BlocksToPrefetch": Cache_BlocksToPrefetch, "Cache.Concurrency": Cache_Concurrency, "Cache.ConcurrencyDegradedThreshold": Cache_ConcurrencyDegradedThreshold, + "Cache.DataScanResampleInterval": Cache_DataScanResampleInterval, "Cache.EvictionMonitoringMaxDepth": Cache_EvictionMonitoringMaxDepth, "Cache.Port": Cache_Port, "ClientAgent.HistoryRetentionDays": ClientAgent_HistoryRetentionDays, @@ -2393,6 +2407,7 @@ func init() { "Cache.DirectorTest": Cache_DirectorTest, "Cache.DisableClientX509": Cache_DisableClientX509, "Cache.EnableBroker": Cache_EnableBroker, + "Cache.EnableChaosAPI": Cache_EnableChaosAPI, "Cache.EnableEvictionMonitoring": Cache_EnableEvictionMonitoring, "Cache.EnableLotman": Cache_EnableLotman, "Cache.EnableOIDC": Cache_EnableOIDC, diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 03e8a087a..98ce7996c 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -34,11 +34,14 @@ type Config struct { ConcurrencyDegradedThreshold int `mapstructure:"concurrencydegradedthreshold" yaml:"ConcurrencyDegradedThreshold"` DataLocation string `mapstructure:"datalocation" yaml:"DataLocation"` DataLocations []string `mapstructure:"datalocations" yaml:"DataLocations"` + DataScanMode string `mapstructure:"datascanmode" yaml:"DataScanMode"` + DataScanResampleInterval int `mapstructure:"datascanresampleinterval" yaml:"DataScanResampleInterval"` DbLocation string `mapstructure:"dblocation" yaml:"DbLocation"` DefaultCacheTimeout time.Duration `mapstructure:"defaultcachetimeout" yaml:"DefaultCacheTimeout"` DirectorTest bool `mapstructure:"directortest" yaml:"DirectorTest"` DisableClientX509 bool `mapstructure:"disableclientx509" yaml:"DisableClientX509"` EnableBroker bool `mapstructure:"enablebroker" yaml:"EnableBroker"` + EnableChaosAPI bool `mapstructure:"enablechaosapi" yaml:"EnableChaosAPI"` EnableEvictionMonitoring bool `mapstructure:"enableevictionmonitoring" yaml:"EnableEvictionMonitoring"` EnableLotman bool `mapstructure:"enablelotman" yaml:"EnableLotman"` EnableOIDC bool `mapstructure:"enableoidc" yaml:"EnableOIDC"` @@ -520,11 +523,14 @@ type configWithType struct { ConcurrencyDegradedThreshold struct { Type string; Value int } DataLocation struct { Type string; Value string } DataLocations struct { Type string; Value []string } + DataScanMode struct { Type string; Value string } + DataScanResampleInterval struct { Type string; Value int } DbLocation struct { Type string; Value string } DefaultCacheTimeout struct { Type string; Value time.Duration } DirectorTest struct { Type string; Value bool } DisableClientX509 struct { Type string; Value bool } EnableBroker struct { Type string; Value bool } + EnableChaosAPI struct { Type string; Value bool } EnableEvictionMonitoring struct { Type string; Value bool } EnableLotman struct { Type string; Value bool } EnableOIDC struct { Type string; Value bool }