Skip to content

Commit 10e65b7

Browse files
committed
lz4 decompress in file worker
1 parent 2dc50e6 commit 10e65b7

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ require (
3131
github.com/klauspost/compress v1.17.8
3232
github.com/minio/minio-go v6.0.14+incompatible
3333
github.com/ozontech/insane-json v0.1.9
34+
github.com/pierrec/lz4/v4 v4.1.21
3435
github.com/prometheus/client_golang v1.16.0
3536
github.com/prometheus/procfs v0.10.1
3637
github.com/rjeczalik/notify v0.9.3
@@ -118,7 +119,6 @@ require (
118119
github.com/onsi/ginkgo v1.16.5 // indirect
119120
github.com/opencontainers/runtime-spec v1.0.2 // indirect
120121
github.com/pascaldekloe/name v1.0.1 // indirect
121-
github.com/pierrec/lz4/v4 v4.1.21 // indirect
122122
github.com/pmezard/go-difflib v1.0.0 // indirect
123123
github.com/prometheus/client_model v0.3.0 // indirect
124124
github.com/prometheus/common v0.42.0 // indirect

plugin/input/file/worker.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ package file
33
import (
44
"bytes"
55
"io"
6+
"mime"
67
"os"
8+
"path/filepath"
79
"strings"
810

911
"github.com/ozontech/file.d/pipeline"
1012
"github.com/ozontech/file.d/pipeline/metadata"
1113
k8s_meta "github.com/ozontech/file.d/plugin/input/k8s/meta"
14+
"github.com/pierrec/lz4/v4"
1215

1316
"go.uber.org/zap"
1417
)
@@ -86,12 +89,22 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
8689
}
8790
}
8891

92+
mimeType := getMimeType(file.Name())
93+
var reader io.Reader
94+
95+
if mimeType == "application/x-lz4" {
96+
lz4Reader := lz4.NewReader(file)
97+
reader = lz4Reader
98+
} else {
99+
reader = file
100+
}
101+
89102
// append the data of the old work, this happens when the event was not completely written to the file
90103
// for example: {"level": "info", "message": "some...
91104
// the end of the message can be added later and will be read in this iteration
92105
accumBuf = append(accumBuf[:0], job.tail...)
93106
for {
94-
n, err := file.Read(readBuf)
107+
n, err := reader.Read(readBuf)
95108
controller.IncReadOps()
96109
// if we read to end of file it's time to check truncation etc and process next job
97110
if err == io.EOF || n == 0 {
@@ -173,6 +186,16 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
173186
}
174187
}
175188

189+
func getMimeType(filename string) string {
190+
ext := filepath.Ext(filename)
191+
mimeType := mime.TypeByExtension(ext)
192+
if mimeType == "" {
193+
mimeType = "application/octet-stream"
194+
}
195+
196+
return mimeType
197+
}
198+
176199
func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error {
177200
stat, err := file.Stat()
178201
if err != nil {

0 commit comments

Comments
 (0)