Skip to content

Commit bb62a4c

Browse files
authored
Threading feature (#2)
* Added threaded scanning support. * Added thread limit checking and thread id. * Added locking around progress bar updates * Added support for triggering scanning while fingerprinting
1 parent 659afa5 commit bb62a4c

File tree

4 files changed

+136
-105
lines changed

4 files changed

+136
-105
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99
### Added
1010
- Upcoming changes...
1111

12+
## [0.6.5] - 2021-07-15
13+
### Added
14+
- Added support to start scanning while fingerprinting to further increase scan performance
15+
### Fixed
16+
- Ignoring broken symlink files
17+
1218
## [0.6.0] - 2021-07-14
1319
### Added
1420
- Added threading to speed up fingerprint scanning
@@ -34,3 +40,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3440
[0.5.5]: https://github.com/scanoss/scanoss.py/compare/v0.5.4...v0.5.5
3541
[0.5.6]: https://github.com/scanoss/scanoss.py/compare/v0.5.5...v0.5.6
3642
[0.6.0]: https://github.com/scanoss/scanoss.py/compare/v0.5.6...v0.6.0
43+
[0.6.5]: https://github.com/scanoss/scanoss.py/compare/v0.6.0...v0.6.5

src/scanoss/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@
1717
along with this program. If not, see <https://www.gnu.org/licenses/>.
1818
"""
1919

20-
__version__ = '0.6.0'
20+
__version__ = '0.6.5'

src/scanoss/scanner.py

Lines changed: 90 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -254,122 +254,91 @@ def scan_folder(self, scan_dir: str) -> bool:
254254
raise Exception(f"ERROR: Please specify a folder to scan")
255255
if not os.path.exists(scan_dir) or not os.path.isdir(scan_dir):
256256
raise Exception(f"ERROR: Specified folder does not exist or is not a folder: {scan_dir}")
257-
wfps = ''
257+
258258
scan_dir_len = len(scan_dir) if scan_dir.endswith(os.path.sep) else len(scan_dir)+1
259259
self.print_msg(f'Searching {scan_dir} for files to fingerprint...')
260260
spinner = None
261261
if not self.quiet and self.isatty:
262262
spinner = Spinner('Fingerprinting ')
263+
wfp_list = []
264+
scan_block = ''
265+
scan_size = 0
266+
queue_size = 0
267+
file_count = 0
268+
scan_started = False
263269
for root, dirs, files in os.walk(scan_dir):
264-
self.print_debug(f'U Root: {root}, Dirs: {dirs}, Files {files}')
265-
dirs[:] = Scanner.__filter_dirs(dirs) # Strip out unwanted directories
266-
filtered_files = Scanner.__filter_files(files) # Strip out unwanted files
270+
self.print_trace(f'U Root: {root}, Dirs: {dirs}, Files {files}')
271+
dirs[:] = Scanner.__filter_dirs(dirs) # Strip out unwanted directories
272+
filtered_files = Scanner.__filter_files(files) # Strip out unwanted files
267273
self.print_debug(f'F Root: {root}, Dirs: {dirs}, Files {filtered_files}')
268-
for file in filtered_files:
274+
for file in filtered_files: # Cycle through each filtered file
269275
path = os.path.join(root, file)
270-
file_stat = os.stat(path)
271-
if file_stat.st_size > 0: # Ignore empty files
272-
self.print_debug(f'Fingerprinting {path}...')
276+
f_size = 0
277+
try:
278+
f_size = os.stat(path).st_size
279+
except:
280+
self.print_trace(f'Ignoring missing symlink file: {file}') # Can fail if there is a broken symlink
281+
if f_size > 0: # Ignore broken links and empty files
282+
self.print_trace(f'Fingerprinting {path}...')
273283
if spinner:
274284
spinner.next()
275-
wfps += self.winnowing.wfp_for_file(path, Scanner.__strip_dir(scan_dir, scan_dir_len, path))
285+
wfp = self.winnowing.wfp_for_file(path, Scanner.__strip_dir(scan_dir, scan_dir_len, path))
286+
wfp_list.append(wfp)
287+
file_count += 1
288+
if self.threaded_scan:
289+
wfp_size = len(wfp.encode("utf-8"))
290+
if (wfp_size + scan_size) >= MAX_POST_SIZE:
291+
self.threaded_scan.queue_add(scan_block)
292+
queue_size += 1
293+
scan_block = ''
294+
scan_block += wfp
295+
scan_size = len(scan_block.encode("utf-8"))
296+
if scan_size >= MAX_POST_SIZE:
297+
self.threaded_scan.queue_add(scan_block)
298+
queue_size += 1
299+
scan_block = ''
300+
if queue_size > self.nb_threads and not scan_started: # Start scanning if we have something to do
301+
scan_started = True
302+
if not self.threaded_scan.run(wait=False):
303+
self.print_stderr(
304+
f'Warning: Some errors encounted while scanning. Results might be incomplete.')
305+
success = False
306+
# End for loop
307+
if self.threaded_scan and scan_block:
308+
self.threaded_scan.queue_add(scan_block) # Make sure all files have been submitted
276309
if spinner:
277310
spinner.finish()
278-
if wfps:
311+
312+
if wfp_list:
279313
self.print_debug(f'Writing fingerprints to {self.wfp}')
280314
with open(self.wfp, 'w') as f:
281-
f.write(wfps)
282-
self.print_msg(f'Scanning fingerprints...')
315+
f.write(''.join(wfp_list))
283316
if self.scan_output:
284317
self.print_msg(f'Writing results to {self.scan_output}...')
285318
if self.threaded_scan:
286-
success = self.scan_wfp_file_threaded()
319+
success = self.__finish_scan_threaded(scan_started, file_count)
287320
else:
288321
success = self.scan_wfp_file()
289322
else:
290323
Scanner.print_stderr(f'Warning: No files found to scan in folder: {scan_dir}')
291324
return success
292325

293-
def scan_file(self, file: str) -> bool:
326+
def __finish_scan_threaded(self, scan_started: bool, file_count: int) -> bool:
294327
"""
295-
Scan the specified file and produce a result
296-
Parameters
297-
----------
298-
file: str
299-
File to fingerprint and scan/identify
300-
:return True if successful, False otherwise
328+
Finish scanning the filtered files and wait for the threads to complete
329+
:param scan_started: If the scan has already started or not
330+
:param file_count: Number of total files to be scanned
331+
:return: True if successful, False otherwise
301332
"""
302333
success = True
303-
if not file:
304-
raise Exception(f"ERROR: Please specify a file to scan")
305-
if not os.path.exists(file) or not os.path.isfile(file):
306-
raise Exception(f"ERROR: Specified files does not exist or is not a file: {file}")
307-
self.print_debug(f'Fingerprinting {file}...')
308-
wfps = self.winnowing.wfp_for_file(file, file)
309-
if wfps:
310-
self.print_debug(f'Scanning {file}...')
311-
if self.scan_output:
312-
self.print_msg(f'Writing results to {self.scan_output}...')
313-
success = self.scan_wfp(wfps)
334+
self.threaded_scan.update_bar(create=True, file_count=file_count)
335+
if not scan_started:
336+
if not self.threaded_scan.run(): # Run the scan and wait for it to complete
337+
self.print_stderr(f'Warning: Some errors encounted while scanning. Results might be incomplete.')
338+
success = False
314339
else:
315-
success = False
316-
317-
return success
318-
319-
def scan_wfp_file_threaded(self, file: str = None) -> bool:
320-
"""
321-
Scan the supplied WFP file in multiple threads
322-
:param file: WFP file to scan
323-
:return True if scuccessful, False otherwise
324-
"""
325-
success = True
326-
wfp_file = file if file else self.wfp # If a WFP file is specified, use it, otherwise us the default
327-
if not os.path.exists(wfp_file) or not os.path.isfile(wfp_file):
328-
raise Exception(f"ERROR: Specified WFP file does not exist or is not a file: {wfp_file}")
329-
file_count = Scanner.__count_files_in_wfp_file(wfp_file)
330-
cur_files = 0
331-
cur_size = 0
332-
batch_files = 0
333-
wfp = ''
334-
self.print_debug(f'Found {file_count} files to process.')
335-
file_print = ''
336-
bar = None
337-
if not self.quiet and self.isatty:
338-
bar = Bar('Scanning', max=file_count)
339-
bar.next(0)
340-
self.threaded_scan.set_bar(bar)
341-
342-
with open(wfp_file) as f:
343-
for line in f:
344-
if line.startswith(WFP_FILE_START):
345-
if file_print:
346-
wfp += file_print # Store the WFP for the current file
347-
cur_size = len(wfp.encode("utf-8"))
348-
file_print = line # Start storing the next file
349-
cur_files += 1
350-
batch_files += 1
351-
else:
352-
file_print += line # Store the rest of the WFP for this file
353-
l_size = cur_size + len(file_print.encode('utf-8'))
354-
# Hit the max post size, so sending the current batch and continue processing
355-
if l_size >= MAX_POST_SIZE and wfp:
356-
self.print_debug(f'Added {batch_files} ({cur_files}) of'
357-
f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the pending queue.')
358-
if cur_size > MAX_POST_SIZE:
359-
Scanner.print_stderr(f'Warning: Post size {cur_size} greater than limit {MAX_POST_SIZE}')
360-
self.threaded_scan.queue_add(wfp)
361-
batch_files = 0
362-
wfp = ''
363-
if file_print:
364-
wfp += file_print # Store the WFP for the current file
365-
if wfp:
366-
self.print_debug(f'Adding {batch_files} ({cur_files}) of'
367-
f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the pending queue.')
368-
self.threaded_scan.queue_add(wfp)
369-
370-
if not self.threaded_scan.run():
371-
self.print_stderr(f'Warning: Some errors encounted while scanning. Result might not be complete.')
372-
success = False
340+
self.threaded_scan.complete() # Wait for the scans to complete
341+
self.threaded_scan.complete_bar()
373342
responses = self.threaded_scan.responses
374343
raw_output = "{\n"
375344
if responses:
@@ -382,11 +351,10 @@ def scan_wfp_file_threaded(self, file: str = None) -> bool:
382351
first = False
383352
else:
384353
raw_output += ",\n \"%s\":%s" % (key, json.dumps(value, indent=2))
354+
# End for loop
385355
else:
386356
success = False
387357
raw_output += "\n}"
388-
if bar:
389-
bar.finish()
390358
parsed_json = None
391359
try:
392360
parsed_json = json.loads(raw_output)
@@ -404,6 +372,32 @@ def scan_wfp_file_threaded(self, file: str = None) -> bool:
404372
success = cdx.produce_from_json(parsed_json)
405373
else:
406374
success = cdx.produce_from_str(raw_output)
375+
return success
376+
377+
378+
def scan_file(self, file: str) -> bool:
379+
"""
380+
Scan the specified file and produce a result
381+
Parameters
382+
----------
383+
file: str
384+
File to fingerprint and scan/identify
385+
:return True if successful, False otherwise
386+
"""
387+
success = True
388+
if not file:
389+
raise Exception(f"ERROR: Please specify a file to scan")
390+
if not os.path.exists(file) or not os.path.isfile(file):
391+
raise Exception(f"ERROR: Specified files does not exist or is not a file: {file}")
392+
self.print_debug(f'Fingerprinting {file}...')
393+
wfp = self.winnowing.wfp_for_file(file, file)
394+
if wfp:
395+
self.print_debug(f'Scanning {file}...')
396+
if self.scan_output:
397+
self.print_msg(f'Writing results to {self.scan_output}...')
398+
success = self.scan_wfp(wfp)
399+
else:
400+
success = False
407401

408402
return success
409403

@@ -538,14 +532,14 @@ def wfp_file(self, scan_file: str, wfp_file: str = None):
538532
raise Exception(f"ERROR: Specified file does not exist or is not a file: {scan_file}")
539533

540534
self.print_debug(f'Fingerprinting {scan_file}...')
541-
wfps = self.winnowing.wfp_for_file(scan_file, scan_file)
542-
if wfps:
535+
wfp = self.winnowing.wfp_for_file(scan_file, scan_file)
536+
if wfp:
543537
if wfp_file:
544538
self.print_stderr(f'Writing fingerprints to {wfp_file}')
545539
with open(wfp_file, 'w') as f:
546-
f.write(wfps)
540+
f.write(wfp)
547541
else:
548-
print(wfps)
542+
print(wfp)
549543
else:
550544
Scanner.print_stderr(f'Warning: No fingerprints generated for: {scan_file}')
551545

@@ -561,7 +555,7 @@ def wfp_folder(self, scan_dir: str, wfp_file: str = None):
561555
scan_dir_len = len(scan_dir) if scan_dir.endswith(os.path.sep) else len(scan_dir)+1
562556
self.print_msg(f'Searching {scan_dir} for files to fingerprint...')
563557
for root, dirs, files in os.walk(scan_dir):
564-
dirs = Scanner.__filter_dirs(dirs) # Strip out unwanted directories
558+
dirs[:] = Scanner.__filter_dirs(dirs) # Strip out unwanted directories
565559
filtered_files = Scanner.__filter_files(files) # Strip out unwanted files
566560
self.print_trace(f'Root: {root}, Dirs: {dirs}, Files {filtered_files}')
567561
for file in filtered_files:

src/scanoss/threadedscanning.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
from typing import Dict, List
2626
from dataclasses import dataclass
2727
from progress.bar import Bar
28-
from progress.spinner import Spinner
2928

3029
from .scanossapi import ScanossApi
3130

@@ -41,6 +40,7 @@ class ThreadedScanning(object):
4140
"""
4241
inputs: queue.Queue = queue.Queue()
4342
output: queue.Queue = queue.Queue()
43+
bar: Bar = None
4444

4545
def __init__(self, scanapi :ScanossApi, debug: bool = False, trace: bool = False, quiet: bool = False,
4646
nb_threads: int = 5
@@ -57,9 +57,11 @@ def __init__(self, scanapi :ScanossApi, debug: bool = False, trace: bool = False
5757
self.debug = debug
5858
self.trace = trace
5959
self.quiet = quiet
60+
self.isatty = sys.stderr.isatty()
6061
self.nb_threads = nb_threads
61-
self.bar = None
62+
self.bar_count = 0
6263
self.errors = False
64+
self.lock = threading.Lock()
6365
if nb_threads > MAX_ALLOWED_THREADS:
6466
self.print_msg(f'Warning: Requested threads too large: {nb_threads}. Reducing to {MAX_ALLOWED_THREADS}')
6567
self.nb_threads = MAX_ALLOWED_THREADS
@@ -106,20 +108,39 @@ def print_trace(self, *args, **kwargs):
106108
if self.trace:
107109
self.print_stderr(*args, **kwargs)
108110

111+
def create_bar(self, file_count: int):
112+
if not self.quiet and self.isatty and not self.bar:
113+
self.bar = Bar('Scanning', max=file_count)
114+
self.bar.next(self.bar_count)
115+
116+
def complete_bar(self):
117+
if self.bar:
118+
self.bar.finish()
119+
109120
def set_bar(self, bar: Bar) -> None:
110121
"""
111122
Set the Progress Bar to display progress while scanning
112123
:param bar: Progress Bar object
113124
"""
114125
self.bar = bar
115126

116-
def update_bar(self, amount: int) -> None:
127+
def update_bar(self, amount: int = 0, create: bool = False, file_count: int = 0) -> None:
117128
"""
118129
Update the Progress Bar progress
119130
:param amount: amount of progress to update
120131
"""
121-
if self.bar:
122-
self.bar.next(amount)
132+
try:
133+
self.lock.acquire()
134+
try:
135+
if create and not self.bar:
136+
self.create_bar(file_count)
137+
elif self.bar:
138+
self.bar.next(amount)
139+
self.bar_count += amount
140+
finally:
141+
self.lock.release()
142+
except Exception as e:
143+
self.print_debug(f'Warning: Update status bar lock failed: {e}. Ignoring.')
123144

124145
def queue_add(self, wfp: str) -> None:
125146
"""
@@ -128,6 +149,9 @@ def queue_add(self, wfp: str) -> None:
128149
"""
129150
self.inputs.put(wfp)
130151

152+
def get_queue_size(self) -> int:
153+
return self.inputs.qsize()
154+
131155
@property
132156
def responses(self) -> List[Dict]:
133157
"""
@@ -136,7 +160,7 @@ def responses(self) -> List[Dict]:
136160
"""
137161
return list(self.output.queue)
138162

139-
def run(self) -> bool:
163+
def run(self, wait: bool = True) -> bool:
140164
"""
141165
Initiate the threads and process all pending requests
142166
:return: True if successful, False if error encountered
@@ -154,10 +178,16 @@ def run(self) -> bool:
154178
except Exception as e:
155179
self.print_stderr(f'ERROR: Problem running threaded scanning: {e}')
156180
self.errors = True
157-
self.inputs.join()
158-
181+
if wait: # Wait for all inputs to complete
182+
self.inputs.join()
159183
return False if self.errors else True
160184

185+
def complete(self) -> None:
186+
"""
187+
Wait for input queue to complete processing
188+
"""
189+
self.inputs.join()
190+
161191
def worker_post(self) -> None:
162192
"""
163193
Take each request and process it

0 commit comments

Comments
 (0)