Skip to content

Commit 3c66950

Browse files
committed
Disable Metrics
1 parent b123dce commit 3c66950

9 files changed

Lines changed: 154 additions & 118 deletions

File tree

p2p/kademlia/dht.go

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ const (
4242
delKeysCountThreshold = 10
4343
lowSpaceThreshold = 50 // GB
4444
batchRetrieveSize = 1000
45-
storeSameSymbolsBatchConcurrency = 3
46-
fetchSymbolsBatchConcurrency = 6
47-
minimumDataStoreSuccessRate = 75.0
45+
46+
storeSameSymbolsBatchConcurrency = 3
47+
fetchSymbolsBatchConcurrency = 6
48+
minimumDataStoreSuccessRate = 75.0
4849

4950
maxIterations = 4
5051
macConcurrentNetworkStoreCalls = 16
@@ -124,7 +125,7 @@ func (s *DHT) ConnPoolSnapshot() map[string]int64 {
124125

125126
// Options contains configuration options for the queries node
126127
type Options struct {
127-
ID []byte
128+
ID []byte
128129

129130
// The queries IPv4 or IPv6 address
130131
IP string
@@ -139,8 +140,11 @@ type Options struct {
139140
// Lumera client for interacting with the blockchain
140141
LumeraClient lumera.Client
141142

142-
// Keyring for credentials
143-
Keyring keyring.Keyring
143+
// Keyring for credentials
144+
Keyring keyring.Keyring
145+
146+
// MetricsDisabled gates DHT-level metrics emission (p2pmetrics hooks and snapshots)
147+
MetricsDisabled bool
144148
}
145149

146150
// NewDHT returns a new DHT node
@@ -739,7 +743,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
739743
return nil, fmt.Errorf("fetch and add local keys: %v", err)
740744
}
741745
// Report how many were found locally, for event metrics
742-
p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount))
746+
if !s.options.MetricsDisabled {
747+
p2pmetrics.ReportFoundLocal(p2pmetrics.TaskIDFromContext(ctx), int(foundLocalCount))
748+
}
743749
if foundLocalCount >= required {
744750
return result, nil
745751
}
@@ -788,7 +794,9 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
788794
// Record batch retrieve stats for internal DHT snapshot window
789795
s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Since(start))
790796
// Also feed retrieve counts into the per-task collector for stream events
791-
p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds())
797+
if !s.options.MetricsDisabled {
798+
p2pmetrics.SetRetrieveBatchSummary(p2pmetrics.TaskIDFromContext(ctx), len(keys), int(required), int(foundLocalCount), netFound, time.Since(start).Milliseconds())
799+
}
792800

793801
return result, nil
794802
}
@@ -946,14 +954,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node,
946954
}
947955
mu.Unlock()
948956
// record failed RPC per-node
949-
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
950-
IP: node.IP,
951-
Address: node.String(),
952-
Keys: 0,
953-
Success: false,
954-
Error: err.Error(),
955-
DurationMS: time.Since(callStart).Milliseconds(),
956-
})
957+
if !s.options.MetricsDisabled {
958+
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
959+
IP: node.IP,
960+
Address: node.String(),
961+
Keys: 0,
962+
Success: false,
963+
Error: err.Error(),
964+
DurationMS: time.Since(callStart).Milliseconds(),
965+
})
966+
}
957967
return
958968
}
959969

@@ -976,14 +986,16 @@ func (s *DHT) iterateBatchGetValues(ctx context.Context, nodes map[string]*Node,
976986
}
977987

978988
// record successful RPC per-node (returned may be 0). Success is true when no error.
979-
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
980-
IP: node.IP,
981-
Address: node.String(),
982-
Keys: returned,
983-
Success: true,
984-
Error: "",
985-
DurationMS: time.Since(callStart).Milliseconds(),
986-
})
989+
if !s.options.MetricsDisabled {
990+
p2pmetrics.RecordRetrieve(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
991+
IP: node.IP,
992+
Address: node.String(),
993+
Keys: returned,
994+
Success: true,
995+
Error: "",
996+
DurationMS: time.Since(callStart).Milliseconds(),
997+
})
998+
}
987999
}(node, nodeID)
9881000
}
9891001

@@ -1713,14 +1725,16 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
17131725
}
17141726

17151727
// Emit per-node store RPC call via metrics bridge (no P2P API coupling)
1716-
p2pmetrics.RecordStore(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
1717-
IP: nodeIP,
1718-
Address: nodeAddr,
1719-
Keys: response.KeysCount,
1720-
Success: errMsg == "" && response.Error == nil,
1721-
Error: errMsg,
1722-
DurationMS: response.DurationMS,
1723-
})
1728+
if !s.options.MetricsDisabled {
1729+
p2pmetrics.RecordStore(p2pmetrics.TaskIDFromContext(ctx), p2pmetrics.Call{
1730+
IP: nodeIP,
1731+
Address: nodeAddr,
1732+
Keys: response.KeysCount,
1733+
Success: errMsg == "" && response.Error == nil,
1734+
Error: errMsg,
1735+
DurationMS: response.DurationMS,
1736+
})
1737+
}
17241738

17251739
}
17261740

p2p/p2p.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,15 @@ type P2P interface {
4040

4141
// p2p structure to implements interface
4242
type p2p struct {
43-
store kademlia.Store // the store for kademlia network
44-
metaStore kademlia.MetaStore
45-
dht *kademlia.DHT // the kademlia network
46-
config *Config // the service configuration
47-
running bool // if the kademlia network is ready
48-
lumeraClient lumera.Client
49-
keyring keyring.Keyring // Add the keyring field
50-
rqstore rqstore.Store
43+
store kademlia.Store // the store for kademlia network
44+
metaStore kademlia.MetaStore
45+
dht *kademlia.DHT // the kademlia network
46+
config *Config // the service configuration
47+
running bool // if the kademlia network is ready
48+
lumeraClient lumera.Client
49+
keyring keyring.Keyring // Add the keyring field
50+
rqstore rqstore.Store
51+
metricsDisabled bool
5152
}
5253

5354
// Run the kademlia network
@@ -226,14 +227,15 @@ func (s *p2p) NClosestNodesWithIncludingNodeList(ctx context.Context, n int, key
226227
// configure the distributed hash table for p2p service
227228
func (s *p2p) configure(ctx context.Context) error {
228229
// new the queries storage
229-
kadOpts := &kademlia.Options{
230-
LumeraClient: s.lumeraClient,
231-
Keyring: s.keyring, // Pass the keyring
232-
BootstrapNodes: []*kademlia.Node{},
233-
IP: s.config.ListenAddress,
234-
Port: s.config.Port,
235-
ID: []byte(s.config.ID),
236-
}
230+
kadOpts := &kademlia.Options{
231+
LumeraClient: s.lumeraClient,
232+
Keyring: s.keyring, // Pass the keyring
233+
BootstrapNodes: []*kademlia.Node{},
234+
IP: s.config.ListenAddress,
235+
Port: s.config.Port,
236+
ID: []byte(s.config.ID),
237+
MetricsDisabled: s.metricsDisabled,
238+
}
237239

238240
if len(kadOpts.ID) == 0 {
239241
errors.Errorf("node id is empty")
@@ -251,25 +253,26 @@ func (s *p2p) configure(ctx context.Context) error {
251253
}
252254

253255
// New returns a new p2p instance.
254-
func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr keyring.Keyring, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) {
255-
store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst)
256-
if err != nil {
257-
return nil, errors.Errorf("new kademlia store: %w", err)
258-
}
256+
func New(ctx context.Context, config *Config, lumeraClient lumera.Client, kr keyring.Keyring, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore, metricsDisabled bool) (P2P, error) {
257+
store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst)
258+
if err != nil {
259+
return nil, errors.Errorf("new kademlia store: %w", err)
260+
}
259261

260262
meta, err := meta.NewStore(ctx, config.DataDir)
261263
if err != nil {
262264
return nil, errors.Errorf("new kademlia meta store: %w", err)
263265
}
264266

265-
return &p2p{
266-
store: store,
267-
metaStore: meta,
268-
config: config,
269-
lumeraClient: lumeraClient,
270-
keyring: kr, // Store the keyring
271-
rqstore: rqstore,
272-
}, nil
267+
return &p2p{
268+
store: store,
269+
metaStore: meta,
270+
config: config,
271+
lumeraClient: lumeraClient,
272+
keyring: kr, // Store the keyring
273+
rqstore: rqstore,
274+
metricsDisabled: metricsDisabled,
275+
}, nil
273276
}
274277

275278
// LocalStore store data into the kademlia network

supernode/cmd/start.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,14 @@ The supernode will connect to the Lumera network and begin participating in the
8484
logtrace.Fatal(ctx, "Failed to initialize RaptorQ store", logtrace.Fields{"error": err.Error()})
8585
}
8686

87-
// Initialize P2P service
88-
p2pService, err := initP2PService(ctx, appConfig, lumeraClient, kr, rqStore, nil, nil)
89-
if err != nil {
90-
logtrace.Fatal(ctx, "Failed to initialize P2P service", logtrace.Fields{"error": err.Error()})
91-
}
87+
// Manually set the disable flag at the highest level
88+
disableMetrics := true
89+
90+
// Initialize P2P service with explicit disable flag
91+
p2pService, err := initP2PService(ctx, appConfig, lumeraClient, kr, rqStore, nil, nil, disableMetrics)
92+
if err != nil {
93+
logtrace.Fatal(ctx, "Failed to initialize P2P service", logtrace.Fields{"error": err.Error()})
94+
}
9295

9396
// Initialize the supernode
9497
supernodeInstance, err := NewSupernode(ctx, appConfig, kr, p2pService, rqStore, lumeraClient)
@@ -97,18 +100,19 @@ The supernode will connect to the Lumera network and begin participating in the
97100
}
98101

99102
// Configure cascade service
100-
cService := cascadeService.NewCascadeService(
101-
&cascadeService.Config{
102-
Config: common.Config{
103-
SupernodeAccountAddress: appConfig.SupernodeConfig.Identity,
104-
},
105-
RqFilesDir: appConfig.GetRaptorQFilesDir(),
106-
},
107-
lumeraClient,
108-
*p2pService,
109-
codec.NewRaptorQCodec(appConfig.GetRaptorQFilesDir()),
110-
rqStore,
111-
)
103+
cService := cascadeService.NewCascadeService(
104+
&cascadeService.Config{
105+
Config: common.Config{
106+
SupernodeAccountAddress: appConfig.SupernodeConfig.Identity,
107+
},
108+
RqFilesDir: appConfig.GetRaptorQFilesDir(),
109+
MetricsDisabled: disableMetrics,
110+
},
111+
lumeraClient,
112+
*p2pService,
113+
codec.NewRaptorQCodec(appConfig.GetRaptorQFilesDir()),
114+
rqStore,
115+
)
112116

113117
// Create cascade action server
114118
cascadeActionServer := cascade.NewCascadeActionServer(cService)
@@ -190,7 +194,7 @@ func init() {
190194
}
191195

192196
// initP2PService initializes the P2P service
193-
func initP2PService(ctx context.Context, config *config.Config, lumeraClient lumera.Client, kr cKeyring.Keyring, rqStore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (*p2p.P2P, error) {
197+
func initP2PService(ctx context.Context, config *config.Config, lumeraClient lumera.Client, kr cKeyring.Keyring, rqStore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore, metricsDisabled bool) (*p2p.P2P, error) {
194198
// Get the supernode address from the keyring
195199
keyInfo, err := kr.Key(config.SupernodeConfig.KeyName)
196200
if err != nil {
@@ -206,7 +210,7 @@ func initP2PService(ctx context.Context, config *config.Config, lumeraClient lum
206210

207211
logtrace.Info(ctx, "Initializing P2P service", logtrace.Fields{"address": p2pConfig.ListenAddress, "port": p2pConfig.Port, "data_dir": p2pConfig.DataDir, "supernode_id": address.String()})
208212

209-
p2pService, err := p2p.New(ctx, p2pConfig, lumeraClient, kr, rqStore, cloud, mst)
213+
p2pService, err := p2p.New(ctx, p2pConfig, lumeraClient, kr, rqStore, cloud, mst, metricsDisabled)
210214
if err != nil {
211215
return nil, fmt.Errorf("failed to initialize p2p service: %w", err)
212216
}

supernode/services/cascade/adaptors/p2p.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ type P2PService interface {
3939

4040
// p2pImpl is the default implementation of the P2PService interface.
4141
type p2pImpl struct {
42-
p2p p2p.Client
43-
rqStore rqstore.Store
42+
p2p p2p.Client
43+
rqStore rqstore.Store
44+
metricsDisabled bool
4445
}
4546

4647
// NewP2PService returns a concrete implementation of P2PService.
47-
func NewP2PService(client p2p.Client, store rqstore.Store) P2PService {
48-
return &p2pImpl{p2p: client, rqStore: store}
48+
func NewP2PService(client p2p.Client, store rqstore.Store, metricsDisabled bool) P2PService {
49+
return &p2pImpl{p2p: client, rqStore: store, metricsDisabled: metricsDisabled}
4950
}
5051

5152
type StoreArtefactsRequest struct {
@@ -58,9 +59,11 @@ type StoreArtefactsRequest struct {
5859
func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error {
5960
logtrace.Info(ctx, "About to store artefacts (metadata + symbols)", logtrace.Fields{"taskID": req.TaskID, "id_files": len(req.IDFiles)})
6061

61-
// Enable per-node store RPC capture for this task
62-
cm.StartStoreCapture(req.TaskID)
63-
defer cm.StopStoreCapture(req.TaskID)
62+
// Optionally enable per-node store RPC capture for this task
63+
if !p.metricsDisabled {
64+
cm.StartStoreCapture(req.TaskID)
65+
defer cm.StopStoreCapture(req.TaskID)
66+
}
6467

6568
start := time.Now()
6669
firstPassSymbols, totalSymbols, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles)

supernode/services/cascade/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
type Config struct {
99
common.Config `mapstructure:",squash" json:"-"`
1010

11-
RaptorQServiceAddress string `mapstructure:"-" json:"-"`
12-
RqFilesDir string `mapstructure:"rq_files_dir" json:"rq_files_dir,omitempty"`
11+
RaptorQServiceAddress string `mapstructure:"-" json:"-"`
12+
RqFilesDir string `mapstructure:"rq_files_dir" json:"rq_files_dir,omitempty"`
13+
// MetricsDisabled toggles upload/download metrics for cascade service
14+
MetricsDisabled bool `mapstructure:"-" json:"-"`
1315
}

supernode/services/cascade/download.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,11 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
165165
}
166166
logtrace.Info(ctx, "Retrieving target-required symbols for decode", fields)
167167

168-
// Enable retrieve metrics capture for this action
169-
cm.StartRetrieveCapture(actionID)
170-
defer cm.StopRetrieveCapture(actionID)
168+
169+
if !task.config.MetricsDisabled {
170+
cm.StartRetrieveCapture(actionID)
171+
defer cm.StopRetrieveCapture(actionID)
172+
}
171173

172174
// Measure symbols batch retrieve duration
173175
retrieveStart := time.Now()
@@ -201,17 +203,22 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
201203
}
202204
decodeMS := time.Since(decodeStart).Milliseconds()
203205

204-
// Set minimal retrieve summary and emit event strictly from internal collector
205-
cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS)
206-
payload := cm.BuildDownloadEventPayloadFromCollector(actionID)
207-
if retrieve, ok := payload["retrieve"].(map[string]any); ok {
208-
retrieve["target_required_percent"] = targetRequiredPercent
209-
retrieve["target_required_count"] = targetRequiredCount
210-
retrieve["total_symbols"] = totalSymbols
211-
}
212-
if b, err := json.MarshalIndent(payload, "", " "); err == nil {
213-
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send)
214-
}
206+
// Set minimal retrieve summary and emit event strictly from internal collector
207+
if !task.config.MetricsDisabled {
208+
cm.SetRetrieveSummary(actionID, retrieveMS, decodeMS)
209+
payload := cm.BuildDownloadEventPayloadFromCollector(actionID)
210+
if retrieve, ok := payload["retrieve"].(map[string]any); ok {
211+
retrieve["target_required_percent"] = targetRequiredPercent
212+
retrieve["target_required_count"] = targetRequiredCount
213+
retrieve["total_symbols"] = totalSymbols
214+
}
215+
if b, err := json.MarshalIndent(payload, "", " "); err == nil {
216+
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), "", "", send)
217+
}
218+
} else {
219+
// Send minimal hardcoded event when metrics disabled
220+
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "Download completed (metrics disabled)", "", "", send)
221+
}
215222

216223
fileHash, err := crypto.HashFileIncrementally(decodeInfo.FilePath, 0)
217224
if err != nil {

0 commit comments

Comments
 (0)