Skip to content

Commit 7b2a409

Browse files
author
Alexandre Lissy
committed
Converting importers from multiprocessing.dummy to multiprocessing
Fixes #2817
1 parent ce59228 commit 7b2a409

9 files changed

+342
-342
lines changed

bin/import_cv.py

+45-48
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515

1616
from glob import glob
1717
from os import path
18-
from threading import RLock
19-
from multiprocessing.dummy import Pool
20-
from multiprocessing import cpu_count
21-
from util.importers import validate_label_eng as validate_label
18+
from multiprocessing import Pool
19+
from util.importers import validate_label_eng as validate_label, get_counter, get_imported_samples, print_import_report
2220
from util.downloader import maybe_download, SIMPLE_BAR
2321

2422
FIELDNAMES = ['wav_filename', 'wav_filesize', 'transcript']
@@ -53,6 +51,38 @@ def _maybe_convert_sets(target_dir, extracted_data):
5351
for source_csv in glob(path.join(extracted_dir, '*.csv')):
5452
_maybe_convert_set(extracted_dir, source_csv, path.join(target_dir, os.path.split(source_csv)[-1]))
5553

54+
def one_sample(sample):
55+
mp3_filename = sample[0]
56+
# Storing wav files next to the mp3 ones - just with a different suffix
57+
wav_filename = path.splitext(mp3_filename)[0] + ".wav"
58+
_maybe_convert_wav(mp3_filename, wav_filename)
59+
frames = int(subprocess.check_output(['soxi', '-s', wav_filename], stderr=subprocess.STDOUT))
60+
file_size = -1
61+
if path.exists(wav_filename):
62+
file_size = path.getsize(wav_filename)
63+
frames = int(subprocess.check_output(['soxi', '-s', wav_filename], stderr=subprocess.STDOUT))
64+
label = validate_label(sample[1])
65+
rows = []
66+
counter = get_counter()
67+
if file_size == -1:
68+
# Excluding samples that failed upon conversion
69+
counter['failed'] += 1
70+
elif label is None:
71+
# Excluding samples that failed on label validation
72+
counter['invalid_label'] += 1
73+
elif int(frames/SAMPLE_RATE*1000/10/2) < len(str(label)):
74+
# Excluding samples that are too short to fit the transcript
75+
counter['too_short'] += 1
76+
elif frames/SAMPLE_RATE > MAX_SECS:
77+
# Excluding very long samples to keep a reasonable batch-size
78+
counter['too_long'] += 1
79+
else:
80+
# This one is good - keep it for the target CSV
81+
rows.append((wav_filename, file_size, label))
82+
counter['all'] += 1
83+
counter['total_time'] += frames
84+
return (counter, rows)
85+
5686
def _maybe_convert_set(extracted_dir, source_csv, target_csv):
5787
print()
5888
if path.exists(target_csv):
@@ -63,48 +93,19 @@ def _maybe_convert_set(extracted_dir, source_csv, target_csv):
6393
with open(source_csv) as source_csv_file:
6494
reader = csv.DictReader(source_csv_file)
6595
for row in reader:
66-
samples.append((row['filename'], row['text']))
96+
samples.append((os.path.join(extracted_dir, row['filename']), row['text']))
6797

6898
# Mutable counters for the concurrent embedded routine
69-
counter = { 'all': 0, 'failed': 0, 'invalid_label': 0, 'too_short': 0, 'too_long': 0 }
70-
lock = RLock()
99+
counter = get_counter()
71100
num_samples = len(samples)
72101
rows = []
73102

74-
def one_sample(sample):
75-
mp3_filename = path.join(*(sample[0].split('/')))
76-
mp3_filename = path.join(extracted_dir, mp3_filename)
77-
# Storing wav files next to the mp3 ones - just with a different suffix
78-
wav_filename = path.splitext(mp3_filename)[0] + ".wav"
79-
_maybe_convert_wav(mp3_filename, wav_filename)
80-
frames = int(subprocess.check_output(['soxi', '-s', wav_filename], stderr=subprocess.STDOUT))
81-
file_size = -1
82-
if path.exists(wav_filename):
83-
file_size = path.getsize(wav_filename)
84-
frames = int(subprocess.check_output(['soxi', '-s', wav_filename], stderr=subprocess.STDOUT))
85-
label = validate_label(sample[1])
86-
with lock:
87-
if file_size == -1:
88-
# Excluding samples that failed upon conversion
89-
counter['failed'] += 1
90-
elif label is None:
91-
# Excluding samples that failed on label validation
92-
counter['invalid_label'] += 1
93-
elif int(frames/SAMPLE_RATE*1000/10/2) < len(str(label)):
94-
# Excluding samples that are too short to fit the transcript
95-
counter['too_short'] += 1
96-
elif frames/SAMPLE_RATE > MAX_SECS:
97-
# Excluding very long samples to keep a reasonable batch-size
98-
counter['too_long'] += 1
99-
else:
100-
# This one is good - keep it for the target CSV
101-
rows.append((wav_filename, file_size, label))
102-
counter['all'] += 1
103-
104103
print('Importing mp3 files...')
105-
pool = Pool(cpu_count())
104+
pool = Pool()
106105
bar = progressbar.ProgressBar(max_value=num_samples, widgets=SIMPLE_BAR)
107-
for i, _ in enumerate(pool.imap_unordered(one_sample, samples), start=1):
106+
for i, processed in enumerate(pool.imap_unordered(one_sample, samples), start=1):
107+
counter += processed[0]
108+
rows += processed[1]
108109
bar.update(i)
109110
bar.update(num_samples)
110111
pool.close()
@@ -118,15 +119,11 @@ def one_sample(sample):
118119
for filename, file_size, transcript in bar(rows):
119120
writer.writerow({ 'wav_filename': filename, 'wav_filesize': file_size, 'transcript': transcript })
120121

121-
print('Imported %d samples.' % (counter['all'] - counter['failed'] - counter['too_short'] - counter['too_long']))
122-
if counter['failed'] > 0:
123-
print('Skipped %d samples that failed upon conversion.' % counter['failed'])
124-
if counter['invalid_label'] > 0:
125-
print('Skipped %d samples that failed on transcript validation.' % counter['invalid_label'])
126-
if counter['too_short'] > 0:
127-
print('Skipped %d samples that were too short to match the transcript.' % counter['too_short'])
128-
if counter['too_long'] > 0:
129-
print('Skipped %d samples that were longer than %d seconds.' % (counter['too_long'], MAX_SECS))
122+
imported_samples = get_imported_samples(counter)
123+
assert counter['all'] == num_samples
124+
assert len(rows) == imported_samples
125+
126+
print_import_report(counter, SAMPLE_RATE, MAX_SECS)
130127

131128
def _maybe_convert_wav(mp3_filename, wav_filename):
132129
if not path.exists(wav_filename):

bin/import_cv2.py

+54-60
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,61 @@
2121
import unicodedata
2222

2323
from os import path
24-
from threading import RLock
25-
from multiprocessing.dummy import Pool
26-
from multiprocessing import cpu_count
24+
from multiprocessing import Pool
2725
from util.downloader import SIMPLE_BAR
2826
from util.text import Alphabet
29-
from util.importers import get_importers_parser, get_validate_label
30-
from util.helpers import secs_to_hours
27+
from util.importers import get_importers_parser, get_validate_label, get_counter, get_imported_samples, print_import_report
3128

3229

3330
FIELDNAMES = ['wav_filename', 'wav_filesize', 'transcript']
3431
SAMPLE_RATE = 16000
3532
MAX_SECS = 10
3633

3734

38-
def _preprocess_data(tsv_dir, audio_dir, label_filter, space_after_every_character=False):
35+
def _preprocess_data(tsv_dir, audio_dir, space_after_every_character=False):
3936
for dataset in ['train', 'test', 'dev', 'validated', 'other']:
4037
input_tsv = path.join(path.abspath(tsv_dir), dataset+".tsv")
4138
if os.path.isfile(input_tsv):
4239
print("Loading TSV file: ", input_tsv)
43-
_maybe_convert_set(input_tsv, audio_dir, label_filter, space_after_every_character)
44-
45-
46-
def _maybe_convert_set(input_tsv, audio_dir, label_filter, space_after_every_character=None):
40+
_maybe_convert_set(input_tsv, audio_dir, space_after_every_character)
41+
42+
def one_sample(sample):
43+
""" Take a audio file, and optionally convert it to 16kHz WAV """
44+
mp3_filename = sample[0]
45+
if not path.splitext(mp3_filename.lower())[1] == '.mp3':
46+
mp3_filename += ".mp3"
47+
# Storing wav files next to the mp3 ones - just with a different suffix
48+
wav_filename = path.splitext(mp3_filename)[0] + ".wav"
49+
_maybe_convert_wav(mp3_filename, wav_filename)
50+
file_size = -1
51+
frames = 0
52+
if path.exists(wav_filename):
53+
file_size = path.getsize(wav_filename)
54+
frames = int(subprocess.check_output(['soxi', '-s', wav_filename], stderr=subprocess.STDOUT))
55+
label = label_filter_fun(sample[1])
56+
rows = []
57+
counter = get_counter()
58+
if file_size == -1:
59+
# Excluding samples that failed upon conversion
60+
counter['failed'] += 1
61+
elif label is None:
62+
# Excluding samples that failed on label validation
63+
counter['invalid_label'] += 1
64+
elif int(frames/SAMPLE_RATE*1000/10/2) < len(str(label)):
65+
# Excluding samples that are too short to fit the transcript
66+
counter['too_short'] += 1
67+
elif frames/SAMPLE_RATE > MAX_SECS:
68+
# Excluding very long samples to keep a reasonable batch-size
69+
counter['too_long'] += 1
70+
else:
71+
# This one is good - keep it for the target CSV
72+
rows.append((os.path.split(wav_filename)[-1], file_size, label))
73+
counter['all'] += 1
74+
counter['total_time'] += frames
75+
76+
return (counter, rows)
77+
78+
def _maybe_convert_set(input_tsv, audio_dir, space_after_every_character=None):
4779
output_csv = path.join(audio_dir, os.path.split(input_tsv)[-1].replace('tsv', 'csv'))
4880
print("Saving new DeepSpeech-formatted CSV file to: ", output_csv)
4981

@@ -52,51 +84,18 @@ def _maybe_convert_set(input_tsv, audio_dir, label_filter, space_after_every_cha
5284
with open(input_tsv, encoding='utf-8') as input_tsv_file:
5385
reader = csv.DictReader(input_tsv_file, delimiter='\t')
5486
for row in reader:
55-
samples.append((row['path'], row['sentence']))
87+
samples.append((path.join(audio_dir, row['path']), row['sentence']))
5688

57-
# Keep track of how many samples are good vs. problematic
58-
counter = {'all': 0, 'failed': 0, 'invalid_label': 0, 'too_short': 0, 'too_long': 0, 'total_time': 0}
59-
lock = RLock()
89+
counter = get_counter()
6090
num_samples = len(samples)
6191
rows = []
6292

63-
def one_sample(sample):
64-
""" Take a audio file, and optionally convert it to 16kHz WAV """
65-
mp3_filename = path.join(audio_dir, sample[0])
66-
if not path.splitext(mp3_filename.lower())[1] == '.mp3':
67-
mp3_filename += ".mp3"
68-
# Storing wav files next to the mp3 ones - just with a different suffix
69-
wav_filename = path.splitext(mp3_filename)[0] + ".wav"
70-
_maybe_convert_wav(mp3_filename, wav_filename)
71-
file_size = -1
72-
frames = 0
73-
if path.exists(wav_filename):
74-
file_size = path.getsize(wav_filename)
75-
frames = int(subprocess.check_output(['soxi', '-s', wav_filename], stderr=subprocess.STDOUT))
76-
label = label_filter(sample[1])
77-
with lock:
78-
if file_size == -1:
79-
# Excluding samples that failed upon conversion
80-
counter['failed'] += 1
81-
elif label is None:
82-
# Excluding samples that failed on label validation
83-
counter['invalid_label'] += 1
84-
elif int(frames/SAMPLE_RATE*1000/10/2) < len(str(label)):
85-
# Excluding samples that are too short to fit the transcript
86-
counter['too_short'] += 1
87-
elif frames/SAMPLE_RATE > MAX_SECS:
88-
# Excluding very long samples to keep a reasonable batch-size
89-
counter['too_long'] += 1
90-
else:
91-
# This one is good - keep it for the target CSV
92-
rows.append((os.path.split(wav_filename)[-1], file_size, label))
93-
counter['all'] += 1
94-
counter['total_time'] += frames
95-
9693
print("Importing mp3 files...")
97-
pool = Pool(cpu_count())
94+
pool = Pool()
9895
bar = progressbar.ProgressBar(max_value=num_samples, widgets=SIMPLE_BAR)
99-
for i, _ in enumerate(pool.imap_unordered(one_sample, samples), start=1):
96+
for i, processed in enumerate(pool.imap_unordered(one_sample, samples), start=1):
97+
counter += processed[0]
98+
rows += processed[1]
10099
bar.update(i)
101100
bar.update(num_samples)
102101
pool.close()
@@ -113,16 +112,11 @@ def one_sample(sample):
113112
else:
114113
writer.writerow({'wav_filename': filename, 'wav_filesize': file_size, 'transcript': transcript})
115114

116-
print('Imported %d samples.' % (counter['all'] - counter['failed'] - counter['too_short'] - counter['too_long']))
117-
if counter['failed'] > 0:
118-
print('Skipped %d samples that failed upon conversion.' % counter['failed'])
119-
if counter['invalid_label'] > 0:
120-
print('Skipped %d samples that failed on transcript validation.' % counter['invalid_label'])
121-
if counter['too_short'] > 0:
122-
print('Skipped %d samples that were too short to match the transcript.' % counter['too_short'])
123-
if counter['too_long'] > 0:
124-
print('Skipped %d samples that were longer than %d seconds.' % (counter['too_long'], MAX_SECS))
125-
print('Final amount of imported audio: %s.' % secs_to_hours(counter['total_time'] / SAMPLE_RATE))
115+
imported_samples = get_imported_samples(counter)
116+
assert counter['all'] == num_samples
117+
assert len(rows) == imported_samples
118+
119+
print_import_report(counter, SAMPLE_RATE, MAX_SECS)
126120

127121

128122
def _maybe_convert_wav(mp3_filename, wav_filename):
@@ -162,4 +156,4 @@ def label_filter_fun(label):
162156
label = None
163157
return label
164158

165-
_preprocess_data(PARAMS.tsv_dir, AUDIO_DIR, label_filter_fun, PARAMS.space_after_every_character)
159+
_preprocess_data(PARAMS.tsv_dir, AUDIO_DIR, PARAMS.space_after_every_character)

0 commit comments

Comments
 (0)