@@ -96,17 +96,14 @@ class LogStash::Inputs::AzureBlobStorage < LogStash::Inputs::Base
9696
9797config :path_filters , :validate => :array , :default => [ '**/*' ] , :required => false
9898
99- # TODO: Other feature requests
100- # show file path in logger
101- # add filepath as part of log message
102- # option to keep registry on local disk
10399
104100
105101public
106102def register
107103 @pipe_id = Thread . current [ :name ] . split ( "[" ) . last . split ( "]" ) . first
108104 @logger . info ( "=== #{ config_name } #{ Gem . loaded_specs [ "logstash-input-" +config_name ] . version . to_s } / #{ @pipe_id } / #{ @id [ 0 , 6 ] } / ruby #{ RUBY_VERSION } p#{ RUBY_PATCHLEVEL } ===" )
109105 @logger . info ( "If this plugin doesn't work, please raise an issue in https://github.com/janmg/logstash-input-azure_blob_storage" )
106+ @busy_writing_registry = Mutex . new
110107 # TODO: consider multiple readers, so add pipeline @id or use logstash-to-logstash communication?
111108 # TODO: Implement retry ... Error: Connection refused - Failed to open TCP connection to
112109end
@@ -152,7 +149,7 @@ def run(queue)
152149 # read filelist and set offsets to file length to mark all the old files as done
153150 if registry_create_policy == "start_fresh"
154151 @registry = list_blobs ( true )
155- save_registry ( @registry )
152+ save_registry ( )
156153 @logger . info ( "starting fresh, writing a clean registry to contain #{ @registry . size } blobs/files" )
157154 end
158155
@@ -178,7 +175,6 @@ def run(queue)
178175 @logger . info ( "head will be: #{ @head } and tail is set to #{ @tail } " )
179176 end
180177
181- newreg = Hash . new
182178 filelist = Hash . new
183179 worklist = Hash . new
184180 @last = start = Time . now . to_i
@@ -197,7 +193,6 @@ def run(queue)
197193 #filelist.sort_by(|k,v|resource(k)[:date])
198194 worklist . clear
199195 filelist . clear
200- newreg . clear
201196
202197 # Listing all the files
203198 filelist = list_blobs ( false )
@@ -208,16 +203,24 @@ def run(queue)
208203 rescue
209204 off = 0
210205 end
211- newreg . store ( name , { :offset => off , :length => file [ :length ] } )
206+ @registry . store ( name , { :offset => off , :length => file [ :length ] } )
212207 if ( @debug_until > @processed ) then @logger . info ( "2: adding offsets: #{ name } #{ off } #{ file [ :length ] } " ) end
213208 end
214209 # size nilClass when the list doesn't grow?!
210+
211+ # clean registry of files that are not in the filelist
212+ @registry . each do |name , file |
213+ unless filelist . include? ( name )
214+ @registry . delete ( name )
215+ if ( @debug_until > @processed ) then @logger . info ( "purging #{ name } " ) end
216+ end
217+ end
218+
215219 # Worklist is the subset of files where the already read offset is smaller than the file size
216- @registry = newreg
217220 worklist . clear
218221 chunk = nil
219222
220- worklist = newreg . select { |name , file | file [ :offset ] < file [ :length ] }
223+ worklist = @registry . select { |name , file | file [ :offset ] < file [ :length ] }
221224 if ( worklist . size > 4 ) then @logger . info ( "worklist contains #{ worklist . size } blobs" ) end
222225
223226 # Start of processing
@@ -236,7 +239,8 @@ def run(queue)
236239 chunk = full_read ( name )
237240 size = chunk . size
238241 rescue Exception => e
239- @logger . error ( "Failed to read #{ name } because of: #{ e . message } .. will continue, set file as read and pretend this never happened" )
242+ # Azure::Core::Http::HTTPError / undefined method `message='
243+ @logger . error ( "Failed to read #{ name } ... will continue, set file as read and pretend this never happened" )
240244 @logger . error ( "#{ size } size and #{ file [ :length ] } file length" )
241245 size = file [ :length ]
242246 end
@@ -275,12 +279,12 @@ def run(queue)
275279 decorate ( event )
276280 queue << event
277281 end
282+ @processed += counter
278283 rescue Exception => e
279284 @logger . error ( "codec exception: #{ e . message } .. will continue and pretend this never happened" )
280285 @registry . store ( name , { :offset => file [ :length ] , :length => file [ :length ] } )
281286 @logger . debug ( "#{ chunk } " )
282287 end
283- @processed += counter
284288 end
285289 @registry . store ( name , { :offset => size , :length => file [ :length ] } )
286290 # TODO add input plugin option to prevent connection cache
@@ -291,14 +295,14 @@ def run(queue)
291295 return
292296 end
293297 if ( ( Time . now . to_i - @last ) > @interval )
294- save_registry ( @registry )
298+ save_registry ( )
295299 end
296300 end
297301 end
298302 # The files that got processed after the last registry save need to be saved too, in case the worklist is empty for some intervals.
299303 now = Time . now . to_i
300304 if ( ( now - @last ) > @interval )
301- save_registry ( @registry )
305+ save_registry ( )
302306 end
303307 sleeptime = interval - ( ( now - start ) % interval )
304308 if @debug_timer
@@ -309,10 +313,10 @@ def run(queue)
309313end
310314
311315def stop
312- save_registry ( @registry )
316+ save_registry ( )
313317end
314318def close
315- save_registry ( @registry )
319+ save_registry ( )
316320end
317321
318322
@@ -490,30 +494,35 @@ def try_list_blobs(fill)
490494end
491495
492496# When events were processed after the last registry save, start a thread to update the registry file.
493- def save_registry ( filelist )
494- # Because of threading, processed values and regsaved are not thread safe, they can change as instance variable @! Most of the time this is fine because the registry is the last resort, but be careful about corner cases!
497+ def save_registry ( )
495498 unless @processed == @regsaved
496- @regsaved = @processed
497- unless ( @busy_writing_registry )
498- Thread . new {
499- begin
500- @busy_writing_registry = true
501- unless ( @registry_local_path )
502- @blob_client . create_block_blob ( container , registry_path , Marshal . dump ( filelist ) )
503- @logger . info ( "processed #{ @processed } events, saving #{ filelist . size } blobs and offsets to remote registry #{ registry_path } " )
504- else
505- File . open ( @registry_local_path +"/" +@pipe_id , 'w' ) { |file | file . write ( Marshal . dump ( filelist ) ) }
506- @logger . info ( "processed #{ @processed } events, saving #{ filelist . size } blobs and offsets to local registry #{ registry_local_path +"/" +@pipe_id } " )
507- end
508- @busy_writing_registry = false
509- @last = Time . now . to_i
510- rescue
511- @logger . error ( "Oh my, registry write failed, do you have write access?" )
512- end
499+ unless ( @busy_writing_registry . locked? )
500+ # deep_copy hash, to save the registry independant from the variable for thread safety
501+ # if deep_clone uses Marshall to do a copy,
502+ regdump = Marshal . dump ( @registry )
503+ regsize = @registry . size
504+ Thread . new {
505+ begin
506+ @busy_writing_registry . lock
507+ unless ( @registry_local_path )
508+ @blob_client . create_block_blob ( container , registry_path , regdump )
509+ @logger . info ( "processed #{ @processed } events, saving #{ regsize } blobs and offsets to remote registry #{ registry_path } " )
510+ else
511+ File . open ( @registry_local_path +"/" +@pipe_id , 'w' ) { |file | file . write ( regdump ) }
512+ @logger . info ( "processed #{ @processed } events, saving #{ regsize } blobs and offsets to local registry #{ registry_local_path +"/" +@pipe_id } " )
513+ end
514+ @last = Time . now . to_i
515+ @regsaved = @processed
516+ rescue Exception => e
517+ @logger . error ( "Oh my, registry write failed" )
518+ @logger . error ( "#{ e . message } " )
519+ ensure
520+ @busy_writing_registry . unlock
521+ end
513522 }
514- else
515- @logger . info ( "Skipped writing the registry because previous write still in progress, it just takes long or may be hanging!" )
516- end
523+ else
524+ @logger . info ( "Skipped writing the registry because previous write still in progress, it just takes long or may be hanging!" )
525+ end
517526 end
518527end
519528
0 commit comments