diff --git a/audinterface/core/process.py b/audinterface/core/process.py index d10b5e0..c7e9977 100644 --- a/audinterface/core/process.py +++ b/audinterface/core/process.py @@ -248,51 +248,84 @@ def _process_file( ) -> typing.Tuple[ typing.List[typing.Any], typing.List[str], - typing.List[pd.Timedelta], - typing.List[pd.Timedelta], + typing.Optional[typing.List[pd.Timedelta]], + typing.Optional[typing.List[pd.Timedelta]], ]: + r"""Process a file. + + Args: + file: file path + idx: index value + root: optional root path of file + start: start time to read media file + end: end time to read media file + process_func_args: arguments to pass to process function + + Returns: + result of processing function, files, starts, ends + + """ if start is not None: start = utils.to_timedelta(start, self.sampling_rate) if end is not None: end = utils.to_timedelta(end, self.sampling_rate) - signal, sampling_rate = utils.read_audio( - file, - start=start, - end=end, - root=root, - ) - - y, files, starts, ends = self._process_signal( - signal, - sampling_rate, - idx=idx, - root=root, - file=file, - process_func_args=process_func_args, - ) + ext = audeer.file_extension(file).lower() - def precision_offset(duration, sampling_rate): - # Ensure we get the same precision - # by storing what is lost due to rounding - # when reading the file - duration_at_sample = utils.to_timedelta( - audmath.samples(duration.total_seconds(), sampling_rate) / sampling_rate + # Text files + if ext in ["json", "txt"]: + data = utils.read_text(file, root=root) + y, file = self._process_data( + data, + idx=idx, + root=root, + file=file, + process_func_args=process_func_args, ) - return duration - duration_at_sample + files = [file] + starts = None + ends = None - if self.win_dur is not None: - if start is not None: - starts = starts + start - ends = ends + start + # Audio/video files else: - if start is not None and not pd.isna(start): - starts[0] += start - ends[0] += start - precision_offset(start, sampling_rate) - if self.keep_nat and (end is None or pd.isna(end)): - ends[0] = pd.NaT - if end is not None and not pd.isna(end): - ends[-1] += precision_offset(end, sampling_rate) + signal, sampling_rate = utils.read_audio( + file, + start=start, + end=end, + root=root, + ) + + y, files, starts, ends = self._process_signal( + signal, + sampling_rate, + idx=idx, + root=root, + file=file, + process_func_args=process_func_args, + ) + + def precision_offset(duration, sampling_rate): + # Ensure we get the same precision + # by storing what is lost due to rounding + # when reading the file + duration_at_sample = utils.to_timedelta( + audmath.samples(duration.total_seconds(), sampling_rate) + / sampling_rate + ) + return duration - duration_at_sample + + if self.win_dur is not None: + if start is not None: + starts = starts + start + ends = ends + start + else: + if start is not None and not pd.isna(start): + starts[0] += start + ends[0] += start - precision_offset(start, sampling_rate) + if self.keep_nat and (end is None or pd.isna(end)): + ends[0] = pd.NaT + if end is not None and not pd.isna(end): + ends[-1] += precision_offset(end, sampling_rate) return y, files, starts, ends @@ -307,6 +340,11 @@ def process_file( ) -> pd.Series: r"""Process the content of an audio file. + The results of processed audio/video files + are returned with a segmented index, + all other processed files + with a filewise index. + Args: file: file path start: start processing at this position. @@ -348,8 +386,10 @@ def process_file( end=end, process_func_args=process_func_args, ) - - index = audformat.segmented_index(files, starts, ends) + if starts is None and ends is None: + index = audformat.filewise_index(files) + else: + index = audformat.segmented_index(files, starts, ends) if len(y) == 0: return pd.Series([], index, dtype=object) @@ -367,6 +407,12 @@ def process_files( ) -> pd.Series: r"""Process a list of files. + The index of the returned series + is a segmented index, + if any of the processed files + are audio/video files. + Otherwise it is a filewise index. + Args: files: list of file paths starts: segment start positions. @@ -448,7 +494,15 @@ def process_files( starts = list(itertools.chain.from_iterable([x[2] for x in xs])) ends = list(itertools.chain.from_iterable([x[3] for x in xs])) - index = audformat.segmented_index(files, starts, ends) + if ( + len(audeer.unique(starts)) == 1 + and audeer.unique(starts)[0] is None + and len(audeer.unique(ends)) == 1 + and audeer.unique(ends)[0] is None + ): + index = audformat.filewise_index(files) + else: + index = audformat.segmented_index(files, starts, ends) y = pd.Series(y, index) return y @@ -463,6 +517,12 @@ def process_folder( ) -> pd.Series: r"""Process files in a folder. + The index of the returned series + is a segmented index, + if any of the processed files + are audio/video files. + Otherwise it is a filewise index. + .. note:: At the moment does not scan in sub-folders! Args: @@ -546,7 +606,16 @@ def _process_index_wo_segment( starts = list(itertools.chain.from_iterable([x[2] for x in xs])) ends = list(itertools.chain.from_iterable([x[3] for x in xs])) - index = audformat.segmented_index(files, starts, ends) + if ( + len(audeer.unique(starts)) == 1 + and audeer.unique(starts)[0] is None + and len(audeer.unique(ends)) == 1 + and audeer.unique(ends)[0] is None + ): + index = audformat.filewise_index(files) + else: + index = audformat.segmented_index(files, starts, ends) + y = pd.Series(y, index) return y @@ -576,8 +645,10 @@ def process_index( and :attr:`audinterface.Process.segment` is ``None`` the returned index will be of same type - as the original one, - otherwise always a segmented index is returned + as the original one. + Otherwise it will be a segmented index + if any audio/video files are processed, + or a filewise index otherwise root: root folder to expand relative file paths cache_root: cache folder (see description) process_func_args: (keyword) arguments passed on @@ -647,6 +718,7 @@ def _process_signal( typing.List[pd.Timedelta], typing.List[pd.Timedelta], ]: + r"""Process signal and handle special processing function arguments.""" signal = np.atleast_2d(signal) # Find start and end index @@ -711,10 +783,68 @@ def _process_signal( return y, [file] * len(starts), starts, ends + def _process_data( + self, + data: typing.Any, + *, + idx: int = 0, + root: str = None, + file: str = None, + process_func_args: typing.Dict[str, typing.Any] = None, + ) -> typing.Tuple[typing.Any, str]: + r"""Process signal and handle special processing function arguments.""" + y = self._call_data( + data, + idx=idx, + root=root, + file=file, + process_func_args=process_func_args, + ) + return y, file + + def process_data( + self, + data: typing.Any, + file: str = None, + process_func_args: typing.Dict[str, typing.Any] = None, + ) -> pd.Series: + r"""Process audio signal and return result. + + If file is given, + the returned series contains a filewise index. + Otherwise, an integer index is returned. + + Args: + data: data to process + file: file path + process_func_args: (keyword) arguments passed on + to the processing function. + They will temporarily overwrite + the ones stored in + :attr:`audinterface.Process.process_func_args` + + Returns: + Series with processed data + + """ + y, file = self._process_data( + data, + file=file, + process_func_args=process_func_args, + ) + if file is not None: + index = audformat.filewise_index([file]) + else: + index = pd.Index([0], dtype="int") + if len(y) == 0: + return pd.Series([], index, dtype=object) + else: + return pd.Series([y], index) + def process_signal( self, signal: np.ndarray, - sampling_rate: int, + sampling_rate: int = None, *, file: str = None, start: Timestamp = None, @@ -920,7 +1050,28 @@ def _call( file: str = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> typing.Any: - r"""Call processing function, possibly pass special args.""" + r"""Call processing function on audio/video files. + + Assumes a ``numpy`` array as signal, + with channels and samples as dimensions. + The signal is resampled and/or remixed, + if required. + + Special arguments are extracted, + and passed to the processing function. + + Args: + signal: signal values + sampling_rate: sampling rate in Hz + idx: index + root: root path + file: file path + process_func_args: processing function arguments + + Returns: + result of processing function + + """ signal, sampling_rate = utils.preprocess_signal( signal, sampling_rate, @@ -931,14 +1082,7 @@ def _call( ) process_func_args = process_func_args or self.process_func_args - special_args = {} - for key, value in [ - ("idx", idx), - ("root", root), - ("file", file), - ]: - if key in self._process_func_signature and key not in process_func_args: - special_args[key] = value + special_args = self._special_args(idx, root, file, process_func_args) def _helper(x): if self.process_func_is_mono: @@ -973,29 +1117,99 @@ def _helper(x): return y + def _call_data( + self, + data: typing.Any, + *, + idx: int = 0, + root: str = None, + file: str = None, + process_func_args: typing.Dict[str, typing.Any] = None, + ) -> typing.Any: + r"""Call processing function on general data. + + It does not make any assumptions about ``data``. + + Special arguments are extracted, + and passed to the processing function. + + Args: + data: data object passed to processing function + idx: index + root: root path + file: file path + process_func_args: processing function arguments + + Returns: + result of processing function + + """ + process_func_args = process_func_args or self.process_func_args + special_args = self._special_args(idx, root, file, process_func_args) + y = self.process_func(data, **special_args, **process_func_args) + return y + + def _special_args( + self, + idx: int, + root: typing.Optional[str], + file: typing.Optional[str], + process_func_args: typing.Dict[str, typing.Any] = None, + ) -> typing.Dict[str, typing.Union[int, str]]: + r"""Identify special arguments in processing function. + + If one of the arguments of the processing function is named + ``"idx"``, ``"root"``, or ``"file"``, + and not provided in ``process_func_args``, + it is identified as a special argument. + + Args: + idx: index + root: root path + file: file path + process_func_args: processing function arguments + + Returns: + special arguments dictionary + + """ + special_args = {} + for key, value in [("idx", idx), ("root", root), ("file", file)]: + if key in self._process_func_signature and key not in process_func_args: + special_args[key] = value + return special_args + def __call__( self, - signal: np.ndarray, - sampling_rate: int, + data: typing.Any, + sampling_rate: int = None, ) -> typing.Any: - r"""Apply processing to signal. + r"""Apply processing to data/signal. - This function processes the signal **without** transforming the output - into a :class:`pd.Series`. Instead, it will return the raw processed - signal. However, if channel selection, mixdown and/or resampling - is enabled, the signal will be first remixed and resampled if the - input sampling rate does not fit the expected sampling rate. + This function processes the data/signal + **without** transforming the output into a :class:`pd.Series`. + Instead, it will return the raw processed data/signal. + However, + if channel selection, mixdown and/or resampling is enabled, + and ``sampling_rate`` is not ``None``, + the signal will be first remixed and resampled + if the input sampling rate does not fit the expected sampling rate. Args: - signal: signal values - sampling_rate: sampling rate in Hz + data: data or signal + sampling_rate: sampling rate in Hz. + If not ``None``, + ``data`` is expected to be a :class:`numpy.ndarray` Returns: - Processed signal + Processed data/signal Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid """ - return self._call(signal, sampling_rate) + if sampling_rate is not None: + return self._call(data, sampling_rate) + else: + return self._call_data(data) diff --git a/audinterface/core/utils.py b/audinterface/core/utils.py index 4363fd3..56b7dba 100644 --- a/audinterface/core/utils.py +++ b/audinterface/core/utils.py @@ -1,10 +1,12 @@ import collections +import json import os import typing import numpy as np import pandas as pd +import audeer import audformat import audiofile import audmath @@ -148,6 +150,37 @@ def read_audio( return signal, sampling_rate +def read_text( + file: str, + *, + root: str = None, +) -> typing.Union[dict, str]: + """Reads text file. + + Args: + file: path to audio file + root: root folder + + Returns: + dictionary with values, + if ``file`` is a json file, + else content of file as string + + """ + if root is not None and not os.path.isabs(file): + file = os.path.join(root, file) + + ext = audeer.file_extension(file).lower() + if ext == "json": + with open(file) as json_file: + data = json.load(json_file) + elif ext == "txt": + with open(file) as txt_file: + data = txt_file.read() + + return data + + def segment_to_indices( signal: np.ndarray, sampling_rate: int, diff --git a/pyproject.toml b/pyproject.toml index a19c3d3..b75b114 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ classifiers = [ ] requires-python = '>=3.9' dependencies = [ - 'audeer >=1.18.0', + 'audeer >=2.1.0', 'audformat >=1.0.1,<2.0.0', 'audiofile >=1.3.0', 'audmath >=1.4.1', diff --git a/tests/test_process_text.py b/tests/test_process_text.py new file mode 100644 index 0000000..4238ace --- /dev/null +++ b/tests/test_process_text.py @@ -0,0 +1,496 @@ +import json +import os +import typing + +import pandas as pd +import pytest + +import audeer +import audformat +import audobject + +import audinterface + + +def identity(data): + return data + + +def length(data): + return len(data) + + +class DataObject(audobject.Object): + def __call__(self, data): + return data[0] + + +def repeat(data, number=2): + return "".join([data for _ in range(number)]) + + +def write_text_file(file: str, data: typing.Union[dict, str]): + r"""Store data in text file. + + Depending on the file extension, + the data is stored in a json file + or a txt file. + + Args: + file: file path + data: data to be written to ``file`` + + """ + ext = audeer.file_extension(file).lower() + with open(file, "w") as fp: + if ext == "json": + json.dump(data, fp) + else: + fp.write(data) + + +@pytest.mark.parametrize( + "process_func, data, file_format, expected_data", + [ + (identity, "abc", "txt", "abc"), + (identity, {"a": 0}, "json", {"a": 0}), + ], +) +def test_process_file( + tmpdir, + process_func, + data, + file_format, + expected_data, +): + process = audinterface.Process(process_func=process_func, verbose=False) + + # create test file + root = audeer.mkdir(tmpdir, "test") + file = f"file.{file_format}" + path = os.path.join(root, file) + write_text_file(path, data) + + # test absolute path + y = process.process_file(path) + expected_series = pd.Series( + [expected_data], + index=audformat.filewise_index(path), + ) + print(f"{y=}") + print(f"{expected_series=}") + pd.testing.assert_series_equal(y, expected_series) + + # test relative path + y = process.process_file(file, root=root) + expected_series = pd.Series( + [expected_data], + index=audformat.filewise_index(file), + ) + pd.testing.assert_series_equal(y, expected_series) + + +@pytest.mark.parametrize( + "process_func, num_files, data, file_format, expected_output", + [ + (identity, 0, "abc", "txt", []), + (identity, 1, "abc", "txt", ["abc"]), + ], +) +def test_process_files( + tmpdir, + process_func, + num_files, + data, + file_format, + expected_output, +): + r"""Test processing of multiple text files. + + Args: + tmpdir: tmpdir fixture + process_func: processing function + num_files: number of files to create from ``data`` + data: data to write into text files + file_format: file format of text files, + ``"json"`` or ``"txt"`` + expected_output: expected result of processing function + + """ + process = audinterface.Process(process_func=process_func, verbose=False) + + # create files + files = [] + paths = [] + root = tmpdir + for idx in range(num_files): + file = f"file{idx}.{file_format}" + path = os.path.join(root, file) + write_text_file(path, data) + files.append(file) + paths.append(path) + + # test absolute paths + y = process.process_files(paths) + expected_y = pd.Series( + expected_output, + index=audformat.filewise_index(paths), + ) + pd.testing.assert_series_equal(y, expected_y) + + # test relative paths + y = process.process_files(files, root=root) + expected_y = pd.Series( + expected_output, + index=audformat.filewise_index(files), + ) + pd.testing.assert_series_equal(y, expected_y) + + +@pytest.mark.parametrize("num_files", [3]) +@pytest.mark.parametrize("file_format", ["json", "txt"]) +@pytest.mark.parametrize("num_workers", [1, 2, None]) +@pytest.mark.parametrize("multiprocessing", [False, True]) +def test_process_folder( + tmpdir, + num_files, + file_format, + num_workers, + multiprocessing, +): + process = audinterface.Process( + process_func=None, + num_workers=num_workers, + multiprocessing=multiprocessing, + verbose=False, + ) + + if file_format == "json": + data = {"a": 0} + else: + data = "abc" + + # Create test files + root = audeer.mkdir(tmpdir, "text") + files = [os.path.join(root, f"file{n}.{file_format}") for n in range(num_files)] + for file in files: + write_text_file(file, data) + y = process.process_folder(root) + pd.testing.assert_series_equal( + y, + process.process_files(files), + ) + + # non-existing folder + with pytest.raises(FileNotFoundError): + process.process_folder("bad-folder") + + # empty folder + root = str(tmpdir.mkdir("empty")) + y = process.process_folder(root) + pd.testing.assert_series_equal(y, pd.Series(dtype=object)) + + +@pytest.mark.parametrize("num_workers", [1, 2, None]) +@pytest.mark.parametrize("file_format", ["json", "txt"]) +@pytest.mark.parametrize("multiprocessing", [False, True]) +@pytest.mark.parametrize("preserve_index", [False, True]) +def test_process_index( + tmpdir, + num_workers, + file_format, + multiprocessing, + preserve_index, +): + cache_root = os.path.join(tmpdir, "cache") + + process = audinterface.Process( + process_func=None, + num_workers=num_workers, + multiprocessing=multiprocessing, + verbose=False, + ) + + if file_format == "json": + data = {"a": 0} + else: + data = "abc" + + # Create file + root = audeer.mkdir(tmpdir, "text") + file = f"file.{file_format}" + path = os.path.join(root, file) + write_text_file(path, data) + + # Empty index + index = audformat.filewise_index() + y = process.process_index(index, preserve_index=preserve_index) + assert y.empty + + # Segmented index with absolute paths + index = audformat.segmented_index( + [path] * 4, + starts=[0, 0, 1, 2], + ends=[None, 1, 2, 3], + ) + y = process.process_index( + index, + preserve_index=preserve_index, + ) + if preserve_index: + pd.testing.assert_index_equal(y.index, index) + for (path, _, _), value in y.items(): + assert audinterface.utils.read_text(path) == data + assert value == data + + # Segmented index with relative paths + index = audformat.segmented_index( + [file] * 4, + starts=[0, 0, 1, 2], + ends=[None, 1, 2, 3], + ) + y = process.process_index( + index, + preserve_index=preserve_index, + root=root, + ) + if preserve_index: + pd.testing.assert_index_equal(y.index, index) + for (file, _, _), value in y.items(): + assert audinterface.utils.read_text(file, root=root) == data + assert value == data + + # Filewise index with absolute paths + index = audformat.filewise_index(path) + y = process.process_index( + index, + preserve_index=preserve_index, + ) + if preserve_index: + pd.testing.assert_index_equal(y.index, index) + for path, value in y.items(): + assert audinterface.utils.read_text(path) == data + assert value == data + else: + expected_index = audformat.filewise_index(files=list(index)) + pd.testing.assert_index_equal(y.index, expected_index) + for (path, _, _), value in y.items(): + assert audinterface.utils.read_text(path) == data + assert value == data + + # Filewise index with relative paths + index = audformat.filewise_index(file) + y = process.process_index( + index, + preserve_index=preserve_index, + root=root, + ) + if preserve_index: + pd.testing.assert_index_equal(y.index, index) + for file, value in y.items(): + assert audinterface.utils.read_text(file, root=root) == data + assert value == data + else: + for (file, _, _), value in y.items(): + assert audinterface.utils.read_text(file, root=root) == data + assert value == data + + # Cache result + y = process.process_index( + index, + preserve_index=preserve_index, + root=root, + cache_root=cache_root, + ) + os.remove(path) + + # Fails because second file does not exist + with pytest.raises(RuntimeError): + process.process_index( + index, + preserve_index=preserve_index, + root=root, + ) + + # Loading from cache still works + y_cached = process.process_index( + index, + preserve_index=preserve_index, + root=root, + cache_root=cache_root, + ) + pd.testing.assert_series_equal(y, y_cached) + + +@pytest.mark.parametrize( + "process_func, process_func_args, data, file, expected_signal", + [ + ( + identity, + None, + "abc", + None, + "abc", + ) + ], +) +def test_process_data( + process_func, + process_func_args, + data, + file, + expected_signal, +): + process = audinterface.Process( + process_func=process_func, + process_func_args=process_func_args, + verbose=False, + ) + x = process.process_signal(data, file=file) + + if file is None: + y = pd.Series([expected_signal]) + else: + y = pd.Series( + [expected_signal], + index=audformat.filewise_index(file), + ) + pd.testing.assert_series_equal(x, y) + + +# def test_process_with_special_args(tmpdir): +# duration = 3 +# sampling_rate = 1 +# signal = np.zeros((2, duration), np.float32) +# num_files = 10 +# win_dur = 1 +# num_frames = duration // win_dur +# num_workers = 3 +# +# # create files +# root = tmpdir +# files = [f"f{idx}.wav" for idx in range(num_files)] +# index = audformat.segmented_index( +# np.repeat(files, num_frames), +# np.tile(range(num_frames), num_files), +# np.tile(range(1, num_frames + 1), num_files), +# ) +# for file in files: +# path = os.path.join(root, file) +# audiofile.write(path, signal, sampling_rate, bit_depth=32) +# +# # create interface +# def process_func(signal, sampling_rate, idx, file, root): +# return (idx, file, root) +# +# process = audinterface.Process( +# process_func=process_func, +# num_workers=num_workers, +# ) +# +# # process signal +# y = process.process_signal(signal, sampling_rate) +# expected = pd.Series( +# [(0, None, None)], +# audinterface.utils.signal_index(0, duration), +# ) +# pd.testing.assert_series_equal(y, expected) +# +# # process signal from index +# y = process.process_signal_from_index( +# signal, +# sampling_rate, +# expected.index, +# ) +# pd.testing.assert_series_equal(y, expected) +# +# # process file +# y = process.process_file(files[0], root=root) +# expected = pd.Series( +# [(0, files[0], root)], +# audformat.segmented_index(files[0], 0, duration), +# ) +# pd.testing.assert_series_equal(y, expected) +# +# # process files +# y = process.process_files(files, root=root) +# expected = pd.Series( +# [(idx, files[idx], root) for idx in range(num_files)], +# audformat.segmented_index( +# files, +# [0] * num_files, +# [duration] * num_files, +# ), +# ) +# pd.testing.assert_series_equal(y, expected) +# +# # process index with a filewise index +# y = process.process_index( +# audformat.filewise_index(files), +# root=root, +# ) +# pd.testing.assert_series_equal(y, expected) +# +# # process index with a segmented index +# y = process.process_index(index, root=root) +# expected = pd.Series( +# [(idx, file, root) for idx, (file, _, _) in enumerate(index)], +# index, +# ) +# pd.testing.assert_series_equal(y, expected) +# +# # sliding window +# # frames belonging to the same files have same idx +# process = audinterface.Process( +# process_func=process_func, +# win_dur=win_dur, +# hop_dur=win_dur, +# num_workers=num_workers, +# ) +# y = process.process_files(files, root=root) +# values = [] +# for idx in range(num_files): +# file = files[idx] +# for _ in range(num_frames): +# values.append((idx, file, root)) +# expected = pd.Series(values, index) +# pd.testing.assert_series_equal(y, expected) +# +# # mono processing function +# # returns +# # [((0, files[0], root), (0, files[0], root)), +# # ((1, files[1], root), (1, files[1], root)), +# # ... ] +# process = audinterface.Process( +# process_func=process_func, +# process_func_is_mono=True, +# num_workers=num_workers, +# ) +# y = process.process_index(index, root=root) +# expected = pd.Series( +# [ +# ((idx, file, root), (idx, file, root)) +# for idx, (file, _, _) in enumerate(index) +# ], +# index, +# ) +# pd.testing.assert_series_equal(y, expected) +# +# # explicitly pass special arguments +# +# process = audinterface.Process( +# process_func=process_func, +# process_func_args={"idx": 99, "file": "my/file", "root": None}, +# num_workers=num_workers, +# ) +# y = process.process_index(index, root=root) +# expected = pd.Series([(99, "my/file", None)] * len(index), index) +# pd.testing.assert_series_equal(y, expected) + + +@pytest.mark.parametrize("data", ["abc"]) +def test_read_data(tmpdir, data): + file = audeer.path(tmpdir, "media.txt") + with open(file, "w") as fp: + fp.write(data) + assert audinterface.utils.read_data(file) == data