@@ -60,13 +60,15 @@ type jobProvider struct {
6060}
6161
6262type Job struct {
63- file * os.File
64- inode inodeID
65- sourceID pipeline.SourceID // some value to distinguish jobs with same inode
66- filename string
67- symlink string
68- curOffset int64 // offset to not call Seek() everytime
69- tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
63+ file * os.File
64+ mimeType string
65+ isCompressed bool
66+ inode inodeID
67+ sourceID pipeline.SourceID // some value to distinguish jobs with same inode
68+ filename string
69+ symlink string
70+ curOffset int64 // offset to not call Seek() everytime
71+ tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
7072
7173 ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
7274 lastEventSeq uint64
@@ -83,10 +85,15 @@ type Job struct {
8385 mu * sync.Mutex
8486}
8587
86- func (j * Job ) seek (offset int64 , whence int , hint string ) int64 {
87- n , err := j .file .Seek (offset , whence )
88- if err != nil {
89- logger .Infof ("file seek error hint=%s, name=%s, err=%s" , hint , j .filename , err .Error ())
88+ func (j * Job ) seek (offset int64 , whence int , hint string ) (n int64 ) {
89+ var err error
90+ if ! j .isCompressed {
91+ n , err = j .file .Seek (offset , whence )
92+ if err != nil {
93+ logger .Infof ("file seek error hint=%s, name=%s, err=%s" , hint , j .filename , err .Error ())
94+ }
95+ } else {
96+ n = 0
9097 }
9198 j .curOffset = n
9299
@@ -354,6 +361,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
354361 }
355362}
356363
364+ func isCompressed (mimeType string ) bool {
365+ return mimeType == "application/x-lz4"
366+ }
367+
357368func (jp * jobProvider ) addJob (file * os.File , stat os.FileInfo , filename string , symlink string ) {
358369 sourceID := sourceIDByStat (stat , symlink )
359370
@@ -370,12 +381,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
370381 }
371382
372383 inode := getInode (stat )
384+ mimeType := getMimeType (filename )
385+
373386 job := & Job {
374- file : file ,
375- inode : inode ,
376- filename : filename ,
377- symlink : symlink ,
378- sourceID : sourceID ,
387+ file : file ,
388+ isCompressed : isCompressed (mimeType ),
389+ mimeType : mimeType ,
390+ inode : inode ,
391+ filename : filename ,
392+ symlink : symlink ,
393+ sourceID : sourceID ,
379394
380395 isVirgin : true ,
381396 isDone : true ,
@@ -746,6 +761,24 @@ func (jp *jobProvider) deleteJobAndUnlock(job *Job) {
746761 }
747762}
748763
764+ func (jp * jobProvider ) deleteJob (job * Job ) {
765+ if ! job .isDone {
766+ jp .logger .Panicf ("can't delete job, it isn't done: %d:%s" , job .sourceID , job .filename )
767+ }
768+ sourceID := job .sourceID
769+ filename := job .filename
770+
771+ jp .jobsMu .Lock ()
772+ delete (jp .jobs , sourceID )
773+ c := jp .jobsDone .Dec ()
774+ jp .jobsMu .Unlock ()
775+
776+ jp .logger .Infof ("job %d:%s deleted" , job .sourceID , filename )
777+ if c < 0 {
778+ jp .logger .Panicf ("done jobs counter less than zero" )
779+ }
780+ }
781+
749782func getInode (stat os.FileInfo ) inodeID {
750783 return inodeID (stat .Sys ().(* syscall.Stat_t ).Ino )
751784}
0 commit comments