Skip to content

Commit 8e9fa7d

Browse files
cursoragent0xJacky
andcommitted
Refactor: Improve incremental log indexing logic
Co-authored-by: jacky-943572677 <[email protected]>
1 parent f95836f commit 8e9fa7d

File tree

2 files changed

+151
-10
lines changed

2 files changed

+151
-10
lines changed

internal/cron/incremental_indexing.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,16 @@ import (
77

88
"github.com/0xJacky/Nginx-UI/internal/nginx_log"
99
"github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
10+
"github.com/0xJacky/Nginx-UI/model"
1011
"github.com/go-co-op/gocron/v2"
1112
"github.com/uozi-tech/cosy/logger"
1213
)
1314

15+
// logIndexProvider provides access to stored per-file index metadata.
16+
type logIndexProvider interface {
17+
GetLogIndex(path string) (*model.NginxLogIndex, error)
18+
}
19+
1420
// setupIncrementalIndexingJob sets up the periodic incremental log indexing job
1521
func setupIncrementalIndexingJob(s gocron.Scheduler) (gocron.Job, error) {
1622
logger.Info("Setting up incremental log indexing job")
@@ -42,6 +48,12 @@ func performIncrementalIndexing() {
4248
return
4349
}
4450

51+
persistence := logFileManager.GetPersistence()
52+
if persistence == nil {
53+
logger.Warn("Persistence manager not available for incremental indexing")
54+
return
55+
}
56+
4557
// Get modern indexer
4658
modernIndexer := nginx_log.GetIndexer()
4759
if modernIndexer == nil {
@@ -64,7 +76,7 @@ func performIncrementalIndexing() {
6476
changedCount := 0
6577
for _, log := range allLogs {
6678
// Check if file needs incremental indexing
67-
if needsIncrementalIndexing(log) {
79+
if needsIncrementalIndexing(log, persistence) {
6880
if err := queueIncrementalIndexing(log.Path, modernIndexer, logFileManager); err != nil {
6981
logger.Errorf("Failed to queue incremental indexing for %s: %v", log.Path, err)
7082
} else {
@@ -81,7 +93,7 @@ func performIncrementalIndexing() {
8193
}
8294

8395
// needsIncrementalIndexing checks if a log file needs incremental indexing
84-
func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex) bool {
96+
func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex, persistence logIndexProvider) bool {
8597
// Skip if already indexing or queued
8698
if log.IndexStatus == string(indexer.IndexStatusIndexing) ||
8799
log.IndexStatus == string(indexer.IndexStatusQueued) {
@@ -99,22 +111,43 @@ func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex) bool {
99111
return false
100112
}
101113

102-
// Check if file has been modified since last index
103114
fileModTime := fileInfo.ModTime()
104115
fileSize := fileInfo.Size()
116+
117+
if persistence != nil {
118+
if logIndex, err := persistence.GetLogIndex(log.Path); err == nil {
119+
if logIndex.NeedsIndexing(fileModTime, fileSize) {
120+
logger.Debugf("File %s needs incremental indexing based on persisted metadata", log.Path)
121+
return true
122+
}
123+
return false
124+
} else {
125+
logger.Debugf("Could not load persisted metadata for %s: %v", log.Path, err)
126+
}
127+
}
128+
129+
// Fallback: use aggregated data cautiously by clamping the stored size so grouped entries
130+
// do not trigger false positives when rotation files are aggregated together.
105131
lastModified := time.Unix(log.LastModified, 0)
132+
lastSize := log.LastSize
133+
if lastSize == 0 || lastSize > fileSize {
134+
lastSize = fileSize
135+
}
136+
137+
// If the file was never indexed, queue it once.
138+
if log.LastIndexed == 0 {
139+
return true
140+
}
106141

107-
// File was modified after last index and size increased
108-
if fileModTime.After(lastModified) && fileSize > log.LastSize {
109-
logger.Debugf("File %s needs incremental indexing: mod_time=%s, size=%d",
142+
if fileModTime.After(lastModified) && fileSize > lastSize {
143+
logger.Debugf("File %s needs incremental indexing (fallback path): mod_time=%s, size=%d",
110144
log.Path, fileModTime.Format("2006-01-02 15:04:05"), fileSize)
111145
return true
112146
}
113147

114-
// File size decreased - might be file rotation
115-
if fileSize < log.LastSize {
116-
logger.Debugf("File %s needs full re-indexing due to size decrease: old_size=%d, new_size=%d",
117-
log.Path, log.LastSize, fileSize)
148+
if fileSize < lastSize {
149+
logger.Debugf("File %s needs full re-indexing (fallback path) due to size decrease: old_size=%d, new_size=%d",
150+
log.Path, lastSize, fileSize)
118151
return true
119152
}
120153

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package cron
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"testing"
7+
"time"
8+
9+
"github.com/0xJacky/Nginx-UI/internal/nginx_log"
10+
"github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
11+
"github.com/0xJacky/Nginx-UI/model"
12+
)
13+
14+
type stubLogIndexProvider struct {
15+
idx *model.NginxLogIndex
16+
err error
17+
}
18+
19+
func (s stubLogIndexProvider) GetLogIndex(path string) (*model.NginxLogIndex, error) {
20+
if s.err != nil {
21+
return nil, s.err
22+
}
23+
if s.idx != nil {
24+
s.idx.Path = path
25+
}
26+
return s.idx, nil
27+
}
28+
29+
func TestNeedsIncrementalIndexingSkipsWhenUnchanged(t *testing.T) {
30+
dir := t.TempDir()
31+
logPath := filepath.Join(dir, "access.log")
32+
if err := os.WriteFile(logPath, []byte("initial\n"), 0o644); err != nil {
33+
t.Fatalf("write temp log: %v", err)
34+
}
35+
36+
info, err := os.Stat(logPath)
37+
if err != nil {
38+
t.Fatalf("stat temp log: %v", err)
39+
}
40+
41+
persisted := &model.NginxLogIndex{
42+
Path: logPath,
43+
LastModified: info.ModTime(),
44+
LastSize: info.Size(),
45+
LastIndexed: time.Now(),
46+
}
47+
48+
logData := &nginx_log.NginxLogWithIndex{
49+
Path: logPath,
50+
Type: "access",
51+
IndexStatus: string(indexer.IndexStatusIndexed),
52+
LastModified: info.ModTime().Unix(),
53+
LastSize: info.Size() * 10, // simulate grouped size inflation
54+
LastIndexed: time.Now().Unix(),
55+
}
56+
57+
if needsIncrementalIndexing(logData, stubLogIndexProvider{idx: persisted}) {
58+
t.Fatalf("expected no incremental indexing when file metadata is unchanged")
59+
}
60+
}
61+
62+
func TestNeedsIncrementalIndexingDetectsGrowth(t *testing.T) {
63+
dir := t.TempDir()
64+
logPath := filepath.Join(dir, "access.log")
65+
if err := os.WriteFile(logPath, []byte("initial\n"), 0o644); err != nil {
66+
t.Fatalf("write temp log: %v", err)
67+
}
68+
69+
initialInfo, err := os.Stat(logPath)
70+
if err != nil {
71+
t.Fatalf("stat temp log: %v", err)
72+
}
73+
74+
persisted := &model.NginxLogIndex{
75+
Path: logPath,
76+
LastModified: initialInfo.ModTime().Add(-time.Minute),
77+
LastSize: initialInfo.Size(),
78+
LastIndexed: time.Now().Add(-time.Minute),
79+
}
80+
81+
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY, 0)
82+
if err != nil {
83+
t.Fatalf("open temp log: %v", err)
84+
}
85+
if _, err := f.WriteString("more data\n"); err != nil {
86+
f.Close()
87+
t.Fatalf("append temp log: %v", err)
88+
}
89+
_ = f.Close()
90+
91+
finalInfo, err := os.Stat(logPath)
92+
if err != nil {
93+
t.Fatalf("restat temp log: %v", err)
94+
}
95+
96+
logData := &nginx_log.NginxLogWithIndex{
97+
Path: logPath,
98+
Type: "access",
99+
IndexStatus: string(indexer.IndexStatusIndexed),
100+
LastModified: finalInfo.ModTime().Unix(),
101+
LastSize: initialInfo.Size(),
102+
LastIndexed: time.Now().Unix(),
103+
}
104+
105+
if !needsIncrementalIndexing(logData, stubLogIndexProvider{idx: persisted}) {
106+
t.Fatalf("expected incremental indexing when file grew")
107+
}
108+
}

0 commit comments

Comments
 (0)