Skip to content

Commit 974c8b6

Browse files
committed
lz4: start read from offset
1 parent 8de5174 commit 974c8b6

File tree

2 files changed

+81
-25
lines changed

2 files changed

+81
-25
lines changed

plugin/input/file/provider.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ type jobProvider struct {
6060
}
6161

6262
type Job struct {
63-
file *os.File
64-
inode inodeID
65-
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
66-
filename string
67-
symlink string
68-
curOffset int64 // offset to not call Seek() everytime
69-
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
63+
file *os.File
64+
mimeType string
65+
isCompressed bool
66+
inode inodeID
67+
sourceID pipeline.SourceID // some value to distinguish jobs with same inode
68+
filename string
69+
symlink string
70+
curOffset int64 // offset to not call Seek() everytime
71+
tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
7072

7173
ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
7274
lastEventSeq uint64
@@ -83,10 +85,15 @@ type Job struct {
8385
mu *sync.Mutex
8486
}
8587

86-
func (j *Job) seek(offset int64, whence int, hint string) int64 {
87-
n, err := j.file.Seek(offset, whence)
88-
if err != nil {
89-
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
88+
func (j *Job) seek(offset int64, whence int, hint string) (n int64) {
89+
var err error
90+
if !j.isCompressed {
91+
n, err = j.file.Seek(offset, whence)
92+
if err != nil {
93+
logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
94+
}
95+
} else {
96+
n = 0
9097
}
9198
j.curOffset = n
9299

@@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
354361
}
355362
}
356363

364+
func isCompressed(mimeType string) bool {
365+
return mimeType == "application/x-lz4"
366+
}
367+
357368
func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) {
358369
sourceID := sourceIDByStat(stat, symlink)
359370

@@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
370381
}
371382

372383
inode := getInode(stat)
384+
mimeType := getMimeType(filename)
385+
373386
job := &Job{
374-
file: file,
375-
inode: inode,
376-
filename: filename,
377-
symlink: symlink,
378-
sourceID: sourceID,
387+
file: file,
388+
isCompressed: isCompressed(mimeType),
389+
mimeType: mimeType,
390+
inode: inode,
391+
filename: filename,
392+
symlink: symlink,
393+
sourceID: sourceID,
379394

380395
isVirgin: true,
381396
isDone: true,

plugin/input/file/worker.go

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66
"mime"
77
"os"
8+
"os/exec"
89
"path/filepath"
910
"strings"
1011

@@ -93,11 +94,24 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
9394
}
9495
}
9596

96-
mimeType := getMimeType(file.Name())
9797
var reader io.Reader
98-
99-
if mimeType == "application/x-lz4" {
98+
if job.mimeType == "application/x-lz4" {
99+
if isNotFileBeingWritten(file.Name()) {
100+
logger.Error("cannot lock file", zap.String("filename", file.Name()))
101+
break
102+
}
100103
lz4Reader := lz4.NewReader(file)
104+
if len(offsets) > 0 {
105+
for lastOffset+int64(readBufferSize) < offsets[0].Offset {
106+
n, err := lz4Reader.Read(readBuf)
107+
if err != nil {
108+
if err == io.EOF {
109+
break // End of file reached
110+
}
111+
}
112+
lastOffset += int64(n)
113+
}
114+
}
101115
reader = lz4Reader
102116
} else {
103117
reader = file
@@ -200,22 +214,49 @@ func getMimeType(filename string) string {
200214
return mimeType
201215
}
202216

217+
func isNotFileBeingWritten(filePath string) bool {
218+
// Run the lsof command to check open file descriptors
219+
cmd := exec.Command("lsof", filePath)
220+
output, err := cmd.Output()
221+
if err != nil {
222+
return false // Error running lsof
223+
}
224+
225+
// Check the output for write access
226+
lines := strings.Split(string(output), "\n")
227+
for _, line := range lines {
228+
// Check if the line contains 'w' indicating write access
229+
if strings.Contains(line, "w") {
230+
return true // File is being written to
231+
}
232+
}
233+
234+
return false // File is not being written to
235+
}
236+
203237
func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error {
204238
stat, err := file.Stat()
205239
if err != nil {
206240
return err
207241
}
208242

209-
// files truncated from time to time, after logs from file was processed.
210-
// Position > stat.Size() means that data was truncated and
211-
// caret pointer must be moved to start of file.
212-
if totalOffset > stat.Size() {
213-
jobProvider.truncateJob(job)
243+
if !job.isCompressed {
244+
// files truncated from time to time, after logs from file was processed.
245+
// Position > stat.Size() means that data was truncated and
246+
// caret pointer must be moved to start of file.
247+
if totalOffset > stat.Size() {
248+
jobProvider.truncateJob(job)
249+
}
214250
}
215-
216251
// Mark job as done till new lines has appeared.
217252
jobProvider.doneJob(job)
218253

254+
if job.isCompressed {
255+
job.mu.Lock()
256+
file.Close()
257+
jobProvider.deleteJobAndUnlock(job)
258+
}
259+
219260
return nil
220261
}
221262

0 commit comments

Comments
 (0)