diff --git a/README.md b/README.md index 6bd4950..96c0fc1 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ [![PyPI-Server](https://img.shields.io/pypi/v/compressed-lists.svg)](https://pypi.org/project/compressed-lists/) -![Unit tests](https://github.com/BiocPy/compressed-lists/actions/workflows/pypi-test.yml/badge.svg) +![Unit tests](https://github.com/BiocPy/compressed-lists/actions/workflows/run-tests.yml/badge.svg) -# compressed-lists +# CompressedList Implementation in Python -> Add a short description here! +A Python implementation of the `CompressedList` class from R/Bioconductor for memory-efficient list-like objects. -A longer description of your project goes here... +`CompressedList` is a memory-efficient container for list-like objects. Instead of storing each list element separately, it concatenates all elements into a single vector-like object and maintains information about where each original element begins and ends. This approach is significantly more memory-efficient than standard lists, especially when dealing with many list elements. ## Install @@ -15,6 +15,54 @@ To get started, install the package from [PyPI](https://pypi.org/project/compres pip install compressed-lists ``` +## Usage + + +```py +from compressed_lists import CompressedIntegerList, CompressedStringList + +# Create a CompressedIntegerList +int_data = [[1, 2, 3], [4, 5], [6, 7, 8, 9]] +names = ["A", "B", "C"] +int_list = CompressedIntegerList.from_list(int_data, names) + +# Access elements +print(int_list[0]) # [1, 2, 3] +print(int_list["B"]) # [4, 5] +print(int_list[1:3]) # Slice of elements + +# Apply a function to each element +squared = int_list.lapply(lambda x: [i**2 for i in x]) +print(squared[0]) # [1, 4, 9] + +# Convert to a regular Python list +regular_list = int_list.to_list() + +# Create a CompressedStringList +char_data = [["apple", "banana"], ["cherry", "date", "elderberry"], ["fig"]] +char_list = CompressedStringList.from_list(char_data) +``` + +### Partitioning + +The `Partitioning` class handles the information about where each element begins and ends in the concatenated data. It allows for efficient extraction of elements without storing each element separately. + +```python +from compressed_lists import Partitioning + +# Create partitioning from end positions +ends = [3, 5, 10] +names = ["A", "B", "C"] +part = Partitioning(ends, names) + +# Get partition range for an element +start, end = part[1] # Returns (3, 5) +``` + +> [!NOTE] +> +> Check out the [documentation](https://biocpy.github.io/compressed-lists) for extending CompressedLists to custom data types. + ## Note diff --git a/docs/conf.py b/docs/conf.py index c90a827..a486e0a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -299,6 +299,7 @@ "scipy": ("https://docs.scipy.org/doc/scipy/reference", None), "setuptools": ("https://setuptools.pypa.io/en/stable/", None), "pyscaffold": ("https://pyscaffold.org/en/stable", None), + "biocutils": ("https://biocpy.github.io/BiocUtils", None), } print(f"loading configurations for {project} {version} ...", file=sys.stderr) diff --git a/docs/index.md b/docs/index.md index a05c38f..f45fcb8 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,17 +1,16 @@ # compressed-lists -Add a short description here! +A Python implementation of the `CompressedList` class from R/Bioconductor for memory-efficient list-like objects. +`CompressedList` is a memory-efficient container for list-like objects. Instead of storing each list element separately, it concatenates all elements into a single vector-like object and maintains information about where each original element begins and ends. This approach is significantly more memory-efficient than standard lists, especially when dealing with many list elements. -## Note +## Install -> This is the main page of your project's [Sphinx] documentation. It is -> formatted in [Markdown]. Add additional pages by creating md-files in -> `docs` or rst-files (formatted in [reStructuredText]) and adding links to -> them in the `Contents` section below. -> -> Please check [Sphinx] and [MyST] for more information -> about how to document your project and how to configure your preferences. +To get started, install the package from [PyPI](https://pypi.org/project/compressed-lists/) + +```bash +pip install compressed-lists +``` ## Contents @@ -20,6 +19,7 @@ Add a short description here! :maxdepth: 2 Overview +Tutorial Contributions & Help License Authors diff --git a/docs/tutorial.md b/docs/tutorial.md new file mode 100644 index 0000000..36bd989 --- /dev/null +++ b/docs/tutorial.md @@ -0,0 +1,189 @@ +--- +file_format: mystnb +kernelspec: + name: python +--- + +# Basic Usage + +```{code-cell} +from compressed_lists import CompressedIntegerList, CompressedStringList + +# Create a CompressedIntegerList +int_data = [[1, 2, 3], [4, 5], [6, 7, 8, 9]] +names = ["A", "B", "C"] +int_list = CompressedIntegerList.from_list(int_data, names) + +# Access elements +print(int_list[0]) # [1, 2, 3] +print(int_list["B"]) # [4, 5] +print(int_list[1:3]) # Slice of elements + +# Apply a function to each element +squared = int_list.lapply(lambda x: [i**2 for i in x]) +print(squared[0]) # [1, 4, 9] + +# Convert to a regular Python list +regular_list = int_list.to_list() + +# Create a CompressedStringList +char_data = [["apple", "banana"], ["cherry", "date", "elderberry"], ["fig"]] +char_list = CompressedStringList.from_list(char_data) +``` + +## Partitioning + +The `Partitioning` class handles the information about where each element begins and ends in the concatenated data. It allows for efficient extraction of elements without storing each element separately. + +```{code-cell} +from compressed_lists import Partitioning + +# Create partitioning from end positions +ends = [3, 5, 10] +names = ["A", "B", "C"] +part = Partitioning(ends, names) + +# Get partition range for an element +start, end = part[1] +print(start, end) +``` + +# Creating Custom CompressedList Subclasses + +`CompressedList` can be easily it can be extended to support custom data types. Here's a step-by-step guide to creating your own `CompressedList` subclass: + +## 1. Subclass CompressedList + +Create a new class that inherits from `CompressedList` with appropriate type annotations: + +```python +from typing import List, TypeVar, Generic +from compressed_lists import CompressedList, Partitioning +import numpy as np + +class CustomCompressedList(CompressedList): + """A custom CompressedList for your data type.""" + pass +``` + +## 2. Implement the Constructor + +The constructor should initialize the superclass with the appropriate data: + +```python +def __init__(self, + unlist_data: Any, # Replace with your data type + partitioning: Partitioning, + element_metadata: dict = None, + metadata: dict = None): + super().__init__(unlist_data, partitioning, + element_type="custom_type", # Set your element type + element_metadata=element_metadata, + metadata=metadata) +``` + +## 3. Implement _extract_range Method + +This method defines how to extract a range of elements from your unlisted data: + +```python +def _extract_range(self, start: int, end: int) -> List[T]: + """Extract a range from unlisted data.""" + # For example, with numpy arrays: + return self.unlist_data[start:end].tolist() + + # Or for other data types: + # return self.unlist_data[start:end] +``` + +## 4. Implement from_list Class Method + +This factory method creates a new instance from a list: + +```python +@classmethod +def from_list(cls, lst: List[List[T]], names: list = None, + metadata: dict = None) -> 'CustomCompressedList': + """Create a new CustomCompressedList from a list.""" + # Flatten the list + flat_data = [] + for sublist in lst: + flat_data.extend(sublist) + + # Create partitioning + partitioning = Partitioning.from_list(lst, names) + + # Create unlisted data in your preferred format + # For example, with numpy: + unlist_data = np.array(flat_data, dtype=np.float64) + + return cls(unlist_data, partitioning, metadata=metadata) +``` + +## Complete Example: CompressedFloatList + +Here's a complete example of a custom CompressedList for floating-point numbers: + +```{code-cell} +import numpy as np +from compressed_lists import CompressedList, Partitioning +from typing import List + +class CompressedFloatList(CompressedList): + def __init__(self, + unlist_data: np.ndarray, + partitioning: Partitioning, + element_metadata: dict = None, + metadata: dict = None): + super().__init__(unlist_data, partitioning, + element_type="float", + element_metadata=element_metadata, + metadata=metadata) + + def _extract_range(self, start: int, end: int) -> List[float]: + return self.unlist_data[start:end].tolist() + + @classmethod + def from_list(cls, lst: List[List[float]], names: list = None, + metadata: dict = None) -> 'CompressedFloatList': + # Flatten the list + flat_data = [] + for sublist in lst: + flat_data.extend(sublist) + + # Create partitioning + partitioning = Partitioning.from_list(lst, names) + + # Create unlist_data + unlist_data = np.array(flat_data, dtype=np.float64) + + return cls(unlist_data, partitioning, metadata=metadata) + +# Usage +float_data = [[1.1, 2.2, 3.3], [4.4, 5.5], [6.6, 7.7, 8.8, 9.9]] +float_list = CompressedFloatList.from_list(float_data, names=["X", "Y", "Z"]) +print(float_list["Y"]) +``` + +## For More Complex Data Types + +For more complex data types, you would follow the same pattern but customize the storage and extraction methods to suit your data. + +For example, with a custom object: + +```python +class MyObject: + def __init__(self, value): + self.value = value + +class CompressedMyObjectList(CompressedList[List[MyObject]]): + # Implementation details... + + def _extract_range(self, start: int, end: int) -> List[MyObject]: + return self.unlist_data[start:end] + + @classmethod + def from_list(cls, lst: List[List[MyObject]], ...): + # Custom flattening and storage logic + # ... +``` diff --git a/pyproject.toml b/pyproject.toml index 874febe..086f90c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,14 +12,14 @@ version_scheme = "no-guess-dev" line-length = 120 src = ["src"] exclude = ["tests"] -extend-ignore = ["F821"] +lint.extend-ignore = ["F821"] -[tool.ruff.pydocstyle] +[tool.ruff.lint.pydocstyle] convention = "google" [tool.ruff.format] docstring-code-format = true docstring-code-line-length = 20 -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] "__init__.py" = ["E402", "F401"] diff --git a/setup.cfg b/setup.cfg index 362233b..ed1d09e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,10 +12,10 @@ license = MIT license_files = LICENSE.txt long_description = file: README.md long_description_content_type = text/markdown; charset=UTF-8; variant=GFM -url = https://github.com/pyscaffold/pyscaffold/ +url = https://github.com/biocpy/compressed-lists # Add here related links, for example: project_urls = - Documentation = https://pyscaffold.org/ + Documentation = https://github.com/biocpy/compressed-lists # Source = https://github.com/pyscaffold/pyscaffold/ # Changelog = https://pyscaffold.org/en/latest/changelog.html # Tracker = https://github.com/pyscaffold/pyscaffold/issues @@ -41,7 +41,7 @@ package_dir = =src # Require a min/specific Python version (comma-separated conditions) -# python_requires = >=3.8 +python_requires = >=3.9 # Add here dependencies of your project (line-separated), e.g. requests>=2.2,<3.0. # Version specifiers like >=2.2,<3.0 avoid problems due to API changes in @@ -49,6 +49,7 @@ package_dir = # For more information, check out https://semver.org/. install_requires = importlib-metadata; python_version<"3.8" + biocutils [options.packages.find] diff --git a/src/compressed_lists/CompressedIntegerList.py b/src/compressed_lists/CompressedIntegerList.py new file mode 100644 index 0000000..5d3bae9 --- /dev/null +++ b/src/compressed_lists/CompressedIntegerList.py @@ -0,0 +1,92 @@ +from typing import List, Optional, Sequence + +import numpy as np + +from .CompressedList import CompressedList +from .partition import Partitioning + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +class CompressedIntegerList(CompressedList): + """CompressedList implementation for lists of integers.""" + + def __init__( + self, + unlist_data: np.ndarray, + partitioning: Partitioning, + element_metadata: dict = None, + metadata: dict = None, + **kwargs, + ): + """Initialize a CompressedIntegerList. + + Args: + unlist_data: + NumPy array of integers. + + partitioning: + Partitioning object defining element boundaries. + + element_metadata: + Optional metadata for elements. + + metadata: + Optional general metadata. + + kwargs: + Additional arguments. + """ + super().__init__( + unlist_data, partitioning, element_type="integer", element_metadata=element_metadata, metadata=metadata + ) + + def _extract_range(self, start: int, end: int) -> np.ndarray: + """Extract a range from unlist_data. + + Args: + start: + Start index (inclusive). + + end: + End index (exclusive). + + Returns: + Same type as unlist_data. + """ + return self._unlist_data[start:end] + + @classmethod + def from_list( + cls, lst: List[List[int]], names: Optional[Sequence[str]] = None, metadata: dict = None + ) -> "CompressedIntegerList": + """ + Create a CompressedIntegerList from a list of integer lists. + + Args: + lst: + List of integer lists. + + names: + Optional names for list elements. + + metadata: + Optional metadata. + + Returns: + A new CompressedIntegerList. + """ + # Flatten the list + flat_data = [] + for sublist in lst: + flat_data.extend(sublist) + + # Create partitioning + partitioning = Partitioning.from_list(lst, names) + + # Create unlist_data + unlist_data = np.array(flat_data, dtype=np.int64) + + return cls(unlist_data, partitioning, metadata=metadata) diff --git a/src/compressed_lists/CompressedList.py b/src/compressed_lists/CompressedList.py new file mode 100644 index 0000000..0f629bd --- /dev/null +++ b/src/compressed_lists/CompressedList.py @@ -0,0 +1,583 @@ +from abc import ABC, abstractmethod +from typing import Any, Callable, Iterator, List, Optional, Sequence, Union +from warnings import warn + +import biocutils as ut +import numpy as np + +from .partition import Partitioning + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def _validate_data_and_partitions(unlist_data, partition): + if len(unlist_data) != partition.nobj(): + raise ValueError( + f"Length of 'unlist_data' ({len(unlist_data)}) " + f"doesn't match 'partitioning' total length ({partition.nobj()})." + ) + + +class CompressedList(ABC): + """Base class for compressed list objects. + + `CompressedList` stores list elements concatenated in a single vector-like object + with partitioning information that defines where each list element starts and ends. + """ + + def __init__( + self, + unlist_data: Any, + partitioning: Partitioning, + element_type: str = None, + element_metadata: dict = None, + metadata: Optional[dict] = None, + validate: bool = True, + ): + """Initialize a CompressedList. + + Args: + unlist_data: + Vector-like object containing concatenated elements. + + partitioning: + Partitioning object defining element boundaries. + + element_type: + String identifier for the type of elements. + + element_metadata: + Optional metadata for elements. + + metadata: + Optional general metadata. + + validate: + Internal use only. + """ + self._unlist_data = unlist_data + self._partitioning = partitioning + self._element_type = element_type + self._element_metadata = element_metadata or {} + self._metadata = metadata or {} + + if validate: + _validate_data_and_partitions(self._unlist_data, self._partitioning) + + def _define_output(self, in_place: bool = False) -> "Partitioning": + if in_place is True: + return self + else: + return self.__copy__() + + ######################### + ######>> Copying <<###### + ######################### + + def __deepcopy__(self, memo=None, _nil=[]): + """ + Returns: + A deep copy of the current ``Partitioning``. + """ + from copy import deepcopy + + _unlistdata_copy = deepcopy(self._unlist_data) + _part_copy = deepcopy(self._partitioning) + _elem_type_copy = deepcopy(self._element_type) + _elem_metadata_copy = deepcopy(self._element_metadata) + _metadata_copy = deepcopy(self._metadata) + + current_class_const = type(self) + return current_class_const( + unlist_data=_unlistdata_copy, + partitioning=_part_copy, + element_type=_elem_type_copy, + element_metadata=_elem_metadata_copy, + metadata=_metadata_copy, + ) + + def __copy__(self): + """ + Returns: + A shallow copy of the current ``Partitioning``. + """ + current_class_const = type(self) + return current_class_const( + unlist_data=self._unlist_data, + partitioning=self._partitioning, + element_type=self._element_type, + element_metadata=self._element_metadata, + metadata=self._metadata, + ) + + def copy(self): + """Alias for :py:meth:`~__copy__`.""" + return self.__copy__() + + ###################################### + ######>> length and iterators <<###### + ###################################### + + def __len__(self) -> int: + """Return the number of list elements.""" + return len(self._partitioning) + + def get_element_lengths(self) -> np.ndarray: + """Get the lengths of each list element.""" + return self._partitioning.get_element_lengths() + + def __iter__(self) -> Iterator[Any]: + """Iterate over list elements.""" + for i in range(len(self)): + yield self[i] + + ########################## + ######>> Printing <<###### + ########################## + + def __repr__(self) -> str: + """ + Returns: + A string representation. + """ + output = f"{type(self).__name__}(number_of_elements={len(self)}" + output += ", unlist_data=" + ut.print_truncated_list(self._unlist_data) + output += ", partitioning=" + self._partitioning.__repr__() + output += ", element_type=" + self._element_type + + if len(self._element_metadata) > 0: + output += ", element_metadata=" + ut.print_truncated_dict(self._element_metadata) + + if len(self._metadata) > 0: + output += ", metadata=" + ut.print_truncated_dict(self._metadata) + + output += ")" + return output + + def __str__(self) -> str: + """ + Returns: + A pretty-printed string containing the contents of this object. + """ + output = f"class: {type(self).__name__}\n" + + output += f"number of elements: ({len(self)}) of type: {self._element_type}\n" + + output += f"unlist_data: {ut.print_truncated_list(self._unlist_data)}\n" + + output += f"partitioning: {ut.print_truncated_list(self._partitioning)}\n" + + output += f"element_metadata({str(len(self._element_metadata))}): {ut.print_truncated_list(list(self._element_metadata.keys()), sep=' ', include_brackets=False, transform=lambda y: y)}\n" + output += f"metadata({str(len(self._metadata))}): {ut.print_truncated_list(list(self._metadata.keys()), sep=' ', include_brackets=False, transform=lambda y: y)}\n" + + return output + + ############################# + ######>> element_type <<##### + ############################# + + def get_element_type(self) -> str: + """Return the element_type.""" + return self._element_type + + @property + def element_type(self) -> str: + """Alias for :py:attr:`~get_element_type`, provided for back-compatibility.""" + return self.get_element_type() + + ########################### + ######>> partitions <<##### + ########################### + + def get_partitioning(self) -> Partitioning: + """Return the paritioning info.""" + return self._partitioning + + @property + def paritioning(self) -> Partitioning: + """Alias for :py:attr:`~get_paritioning`, provided for back-compatibility.""" + return self.get_partitioning() + + ####################### + ######>> names <<###### + ####################### + + def get_names(self) -> Optional[ut.NamedList]: + """Get the names of list elements.""" + return self._partitioning.get_names() + + def set_names(self, names: Sequence[str], in_place: bool = False) -> "CompressedList": + """Set the names of list elements. + + names: + New names, same as the number of rows. + + May be `None` to remove names. + + in_place: + Whether to modify the ``CompressedList`` in place. + + Returns: + A modified ``CompressedList`` object, either as a copy of the original + or as a reference to the (in-place-modified) original. + """ + output = self._define_output(in_place) + output._partitioning = self._partitioning.set_names(names, in_place=False) + return output + + @property + def names(self) -> Optional[ut.NamedList]: + """Alias for :py:attr:`~get_names`.""" + return self._partitioning.get_names() + + @names.setter + def names(self, names: Sequence[str]): + """Alias for :py:attr:`~set_names` with ``in_place = True``. + + As this mutates the original object, a warning is raised. + """ + warn( + "Setting property 'names' is an in-place operation, use 'set_names' instead", + UserWarning, + ) + self.set_names(names=names, in_place=True) + + ############################# + ######>> unlist_data <<###### + ############################# + + def get_unlist_data(self) -> Any: + """Get all elements.""" + return self._unlist_data + + def set_unlist_data(self, unlist_data: Any, in_place: bool = False) -> "CompressedList": + """Set new list elements. + + Args: + unlist_data: + New vector-like object containing concatenated elements. + + in_place: + Whether to modify the ``CompressedList`` in place. + + Returns: + A modified ``CompressedList`` object, either as a copy of the original + or as a reference to the (in-place-modified) original. + """ + output = self._define_output(in_place) + + _validate_data_and_partitions(unlist_data=unlist_data, partition=self._partitioning) + + output._unlist_data = unlist_data + return output + + @property + def unlist_data(self) -> Any: + """Alias for :py:attr:`~get_unlist_data`.""" + return self.get_unlist_data() + + @unlist_data.setter + def unlist_data(self, unlist_data: Any): + """Alias for :py:attr:`~set_unlist_data` with ``in_place = True``. + + As this mutates the original object, a warning is raised. + """ + warn( + "Setting property 'unlist_data' is an in-place operation, use 'set_unlist_data' instead", + UserWarning, + ) + self.set_unlist_data(unlist_data, in_place=True) + + ################################### + ######>> element metadata <<####### + ################################### + + def get_element_metadata(self) -> dict: + """ + Returns: + Dictionary of metadata for each element in this object. + """ + return self._element_metadata + + def set_element_metadata(self, element_metadata: dict, in_place: bool = False) -> "CompressedList": + """Set new element metadata. + + Args: + element_metadata: + New element metadata for this object. + + in_place: + Whether to modify the ``CompressedList`` in place. + + Returns: + A modified ``CompressedList`` object, either as a copy of the original + or as a reference to the (in-place-modified) original. + """ + if not isinstance(element_metadata, dict): + raise TypeError(f"`element_metadata` must be a dictionary, provided {type(element_metadata)}.") + output = self._define_output(in_place) + output._element_metadata = element_metadata + return output + + @property + def element_metadata(self) -> dict: + """Alias for :py:attr:`~get_element_metadata`.""" + return self.get_element_metadata() + + @element_metadata.setter + def element_metadata(self, element_metadata: dict): + """Alias for :py:attr:`~set_element_metadata` with ``in_place = True``. + + As this mutates the original object, a warning is raised. + """ + warn( + "Setting property 'element_metadata' is an in-place operation, use 'set_element_metadata' instead", + UserWarning, + ) + self.set_element_metadata(element_metadata, in_place=True) + + ########################### + ######>> metadata <<####### + ########################### + + def get_metadata(self) -> dict: + """ + Returns: + Dictionary of metadata for this object. + """ + return self._metadata + + def set_metadata(self, metadata: dict, in_place: bool = False) -> "CompressedList": + """Set additional metadata. + + Args: + metadata: + New metadata for this object. + + in_place: + Whether to modify the ``CompressedList`` in place. + + Returns: + A modified ``CompressedList`` object, either as a copy of the original + or as a reference to the (in-place-modified) original. + """ + if not isinstance(metadata, dict): + raise TypeError(f"`metadata` must be a dictionary, provided {type(metadata)}.") + output = self._define_output(in_place) + output._metadata = metadata + return output + + @property + def metadata(self) -> dict: + """Alias for :py:attr:`~get_metadata`.""" + return self.get_metadata() + + @metadata.setter + def metadata(self, metadata: dict): + """Alias for :py:attr:`~set_metadata` with ``in_place = True``. + + As this mutates the original object, a warning is raised. + """ + warn( + "Setting property 'metadata' is an in-place operation, use 'set_metadata' instead", + UserWarning, + ) + self.set_metadata(metadata, in_place=True) + + ########################## + ######>> accessors <<##### + ########################## + + def __getitem__(self, key: Union[int, str, slice]) -> Any: + """Get an element or slice of elements from the list. + + Args: + key: + Integer index, string name, or slice. + + Returns: + List element(s). + """ + # string keys (names) + if isinstance(key, str): + if key not in self.names: + raise KeyError(f"No element named '{key}'.") + key = list(self.names).index(key) + + # integer indices + if isinstance(key, int): + if key < 0: + key += len(self) + if key < 0 or key >= len(self): + raise IndexError(f"List index '{key}' out of range.") + + start, end = self._partitioning.get_partition_range(key) + return self._extract_range(start, end) + + # slices + elif isinstance(key, slice): + indices = range(*key.indices(len(self))) + result = [] + for i in indices: + start, end = self._partitioning.get_partition_range(i) + result.append(self._extract_range(start, end)) + + # Create a new CompressedList from the result + return self.__class__.from_list( + result, names=[self.names[i] for i in indices] if self.names[0] is not None else None + ) + + else: + raise TypeError("Index must be int, str, or slice.") + + ################################## + ######>> abstract methods <<###### + ################################## + + @abstractmethod + def _extract_range(self, start: int, end: int) -> Any: + """Extract a range from `unlist_data`. + + This method must be implemented by subclasses to handle + type-specific extraction from `unlist_data`. + + Args: + start: + Start index (inclusive). + + end: + End index (exclusive). + + Returns: + Extracted element. + """ + pass + + @classmethod + @abstractmethod + def from_list( + cls, lst: List[Any], names: Optional[Sequence[str]] = None, metadata: dict = None + ) -> "CompressedList[Any]": + """Create a CompressedList from a regular list. + + This method must be implemented by subclasses to handle + type-specific conversion from list to unlist_data. + + Args: + lst: + List to convert. + + names: + Optional names for list elements. + + metadata: + Optional metadata. + + Returns: + A new `CompressedList`. + """ + pass + + ########################### + ######>> coercions <<###### + ########################### + + def to_list(self) -> List[Any]: + """Convert to a regular Python list. + + Returns: + A regular Python list with all elements. + """ + return list(self) + + def unlist(self, use_names: bool = True) -> Any: + """Get the underlying unlisted data. + + Args: + use_names: + Whether to include names in the result if applicable. + + Currently not used. + + Returns: + The unlisted data. + """ + return self._unlist_data + + def relist(self, unlist_data: Any) -> "CompressedList[Any]": + """Create a new `CompressedList` with the same partitioning but different data. + + Args: + unlist_data: + New unlisted data. + + Returns: + A new CompressedList. + """ + _validate_data_and_partitions(unlist_data, self._partitioning) + + return self.__class__( + unlist_data, + self._partitioning.copy(), + element_type=self._element_type, + element_metadata=self._element_metadata.copy(), + metadata=self._metadata.copy(), + ) + + def extract_subset(self, indices: Sequence[int]) -> "CompressedList[Any]": + """Extract a subset of elements by indices. + + Args: + indices: + Sequence of indices to extract. + + Returns: + A new CompressedList with only the selected elements. + """ + # Validate indices + for i in indices: + if i < 0 or i >= len(self): + raise IndexError(f"Index {i} out of range") + + # Extract element lengths and names + new_lengths = [self.get_element_lengths()[i] for i in indices] + new_names = [self.names[i] for i in indices] if self.names[0] is not None else None + + # Create new partitioning + new_partitioning = Partitioning.from_lengths(new_lengths, new_names) + + # Extract data + new_data = [] + for i in indices: + start, end = self._partitioning.get_partition_range(i) + if isinstance(self._unlist_data, np.ndarray): + new_data.append(self._unlist_data[start:end]) + else: + new_data.extend(self._unlist_data[start:end]) + + if isinstance(self._unlist_data, np.ndarray): + new_data = np.concatenate(new_data) + + # Create new compressed list + return self.__class__( + new_data, + new_partitioning, + element_type=self._element_type, + element_metadata={k: v for k, v in self._element_metadata.items() if k in indices}, + metadata=self._metadata.copy(), + ) + + def lapply(self, func: Callable) -> "CompressedList[Any]": + """Apply a function to each element. + + Args: + func: + Function to apply to each element. + + Returns: + A new CompressedList with the results. + """ + result = [func(elem) for elem in self] + return self.__class__.from_list(result, self.names, self._metadata) diff --git a/src/compressed_lists/CompressedStringList.py b/src/compressed_lists/CompressedStringList.py new file mode 100644 index 0000000..88cce67 --- /dev/null +++ b/src/compressed_lists/CompressedStringList.py @@ -0,0 +1,86 @@ +from typing import List, Optional, Sequence + +from .CompressedList import CompressedList +from .partition import Partitioning + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +class CompressedStringList(CompressedList): + """CompressedList implementation for lists of strings.""" + + def __init__( + self, + unlist_data: List[str], + partitioning: Partitioning, + element_metadata: dict = None, + metadata: dict = None, + **kwargs, + ): + """Initialize a CompressedStringList. + + Args: + unlist_data: + List of strings. + + partitioning: + Partitioning object defining element boundaries. + + element_metadata: + Optional metadata for elements. + + metadata: + Optional general metadata. + + kwargs: + Additional arguments. + """ + super().__init__( + unlist_data, partitioning, element_type="string", element_metadata=element_metadata, metadata=metadata + ) + + def _extract_range(self, start: int, end: int) -> List[str]: + """Extract a range from unlist_data. + + Args: + start: + Start index (inclusive). + + end: + End index (exclusive). + + Returns: + List of strings. + """ + return self._unlist_data[start:end] + + @classmethod + def from_list( + cls, lst: List[List[str]], names: Optional[Sequence[str]] = None, metadata: dict = None + ) -> "CompressedStringList": + """Create a `CompressedStringList` from a list of string lists. + + Args: + lst: + List of string lists. + + names: + Optional names for list elements. + + metadata: + Optional metadata. + + Returns: + A new `CompressedStringList`. + """ + # Flatten the list + flat_data = [] + for sublist in lst: + flat_data.extend(sublist) + + # Create partitioning + partitioning = Partitioning.from_list(lst, names) + + return cls(flat_data, partitioning, metadata=metadata) diff --git a/src/compressed_lists/__init__.py b/src/compressed_lists/__init__.py index 7d0cd42..b320c98 100644 --- a/src/compressed_lists/__init__.py +++ b/src/compressed_lists/__init__.py @@ -14,3 +14,8 @@ __version__ = "unknown" finally: del version, PackageNotFoundError + +from .partition import Partitioning +from .CompressedList import CompressedList +from .CompressedIntegerList import CompressedIntegerList +from .CompressedStringList import CompressedStringList diff --git a/src/compressed_lists/partition.py b/src/compressed_lists/partition.py new file mode 100644 index 0000000..a74a502 --- /dev/null +++ b/src/compressed_lists/partition.py @@ -0,0 +1,285 @@ +from typing import List, Optional, Sequence, Union +from warnings import warn + +import biocutils as ut +import numpy as np + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def _validate_names(names, expected_len): + if names is None: + return + + if len(names) != expected_len: + raise ValueError("Length of names must match length of ends.") + + +class Partitioning: + """Represents partitioning information for a `CompressedList`. + + This is similar to the ``PartitioningByEnd`` class in Bioconductor. + It keeps track of where each element begins and ends in the unlisted data. + """ + + def __init__(self, ends: Sequence[int], names: Optional[Sequence[str]] = None, validate: bool = True): + """Initialize a Partitioning object. + + Args: + ends: + Sequence of ending positions for each partition. + + names: + Optional names for each partition. + + validate: + Internal use only. + """ + self._ends = np.array(ends, dtype=np.int64) + + # Calculate starts from ends + self._starts = np.zeros_like(self._ends) + if len(self._ends) > 0: + self._starts[1:] = self._ends[:-1] + + self._names = None + if names is not None: + self._names = ut.NamedList(names) + + if validate: + _validate_names(names, len(ends)) + + @classmethod + def from_lengths(cls, lengths: Sequence[int], names: Optional[Sequence[str]] = None) -> "Partitioning": + """Create a Partitioning from a sequence of lengths. + + Args: + lengths: + Sequence of partition lengths. + + names: + Optional names for each partition. + + Returns: + A new Partitioning object. + """ + ends = np.cumsum(lengths) + return cls(ends, names) + + @classmethod + def from_list(cls, lst: List, names: Optional[Sequence[str]] = None) -> "Partitioning": + """Create a Partitioning from a list by using the lengths of each element. + + Args: + lst: + A list to create partitioning from. + + names: + Optional names for each partition. + + Returns: + A new Partitioning object. + """ + lengths = [len(item) if hasattr(item, "__len__") else 1 for item in lst] + return cls.from_lengths(lengths, names) + + def _define_output(self, in_place: bool = False) -> "Partitioning": + if in_place is True: + return self + else: + return self.__copy__() + + ######################### + ######>> Copying <<###### + ######################### + + def __deepcopy__(self, memo=None, _nil=[]): + """ + Returns: + A deep copy of the current ``Partitioning``. + """ + from copy import deepcopy + + _ends_copy = deepcopy(self._ends) + _names_copy = deepcopy(self._names) + + current_class_const = type(self) + return current_class_const( + ends=_ends_copy, + names=_names_copy, + ) + + def __copy__(self): + """ + Returns: + A shallow copy of the current ``Partitioning``. + """ + current_class_const = type(self) + return current_class_const( + ends=self._ends, + names=self._names, + ) + + def copy(self): + """Alias for :py:meth:`~__copy__`.""" + return self.__copy__() + + ###################################### + ######>> length and iterators <<###### + ###################################### + + def __len__(self) -> int: + """Return the number of partitions.""" + return len(self._ends) + + def get_nobj(self) -> int: + """Return the total number of objects across all partitions.""" + return self._ends[-1] if len(self._ends) > 0 else 0 + + def nobj(self) -> int: + """Alias for :py:attr:`~get_nobj`.""" + return self.get_nobj() + + def get_element_lengths(self) -> np.ndarray: + """Return the lengths of each partition.""" + return self._ends - self._starts + + def element_lengths(self) -> int: + """Alias for :py:attr:`~get_element_lengths`.""" + return self.get_element_lengths() + + ########################## + ######>> Printing <<###### + ########################## + + def __repr__(self) -> str: + """ + Returns: + A string representation. + """ + output = f"{type(self).__name__}(number_of_elements={len(self)}" + + if self._names is not None: + output += ", names=" + ut.print_truncated_list(self._names) + + output += ")" + return output + + def __str__(self) -> str: + """ + Returns: + A pretty-printed string containing the contents of this object. + """ + output = f"class: {type(self).__name__}\n" + + output += f"num of elements: ({len(self)})\n" + + output += f"names({0 if self._names is None else len(self._names)}): {' ' if self._names is None else ut.print_truncated_list(self._names)}\n" + + return output + + ########################## + ######>> accessors <<##### + ########################## + + def get_partition_range(self, i: int) -> tuple: + """Get the start and end indices for partition ``i``.""" + if i < 0 or i >= len(self): + raise IndexError(f"Partition index {i} out of range.") + return (self._starts[i], self._ends[i]) + + def __getitem__(self, key: Union[int, slice]) -> Union[tuple, List[tuple]]: + """Get partition range(s) by index or slice. + + Args: + key: + Integer index or slice. + + Returns: + Tuple of (start, end) or list of such tuples. + """ + if isinstance(key, int): + return self.get_partition_range(key) + elif isinstance(key, slice): + indices = range(*key.indices(len(self))) + return [self.get_partition_range(i) for i in indices] + else: + raise TypeError("Index must be 'int' or 'slice'.") + + ###################### + ######>> names <<##### + ###################### + + def get_names(self) -> Optional[ut.NamedList]: + """Return the names of each partition.""" + return self._names + + def set_names(self, names: Optional[List[str]], in_place: bool = False) -> "Partitioning": + """Set the names of list elements. + + Args: + names: + New names, same as the number of elements. + + May be `None` to remove row names. + + in_place: + Whether to modify the ``Partitioning`` in place. + + Returns: + A modified ``Partitioning`` object, either as a copy of the original + or as a reference to the (in-place-modified) original. + """ + if names is not None and not isinstance(names, ut.Names): + names = ut.Names(names) + + _validate_names(names, len(self._ends)) + + output = self._define_output(in_place) + output._names = names + return output + + @property + def names(self) -> Optional[ut.Names]: + """Alias for :py:attr:`~get_names`, provided for back-compatibility.""" + return self.get_names() + + @names.setter + def names(self, names: Optional[List[str]]): + """Alias for :py:meth:`~set_names` with ``in_place = True``. + + As this mutates the original object, a warning is raised. + """ + warn( + "Setting property 'row_names' is an in-place operation, use 'set_names' instead", + UserWarning, + ) + self.set_names(names, in_place=True) + + ##################### + ######>> ends <<##### + ##################### + + def get_ends(self) -> Optional[ut.NamedList]: + """Return the names of each partition.""" + return self._ends + + @property + def ends(self) -> Optional[ut.Names]: + """Alias for :py:attr:`~get_ends`, provided for back-compatibility.""" + return self.get_ends() + + ####################### + ######>> starts <<##### + ####################### + + def get_starts(self) -> Optional[ut.NamedList]: + """Return the starts of each partition.""" + return self._starts + + @property + def starts(self) -> Optional[ut.Names]: + """Alias for :py:attr:`~get_starts`, provided for back-compatibility.""" + return self.get_starts() diff --git a/src/compressed_lists/skeleton.py b/src/compressed_lists/skeleton.py deleted file mode 100644 index f139b22..0000000 --- a/src/compressed_lists/skeleton.py +++ /dev/null @@ -1,149 +0,0 @@ -""" -This is a skeleton file that can serve as a starting point for a Python -console script. To run this script uncomment the following lines in the -``[options.entry_points]`` section in ``setup.cfg``:: - - console_scripts = - fibonacci = compressed_lists.skeleton:run - -Then run ``pip install .`` (or ``pip install -e .`` for editable mode) -which will install the command ``fibonacci`` inside your current environment. - -Besides console scripts, the header (i.e. until ``_logger``...) of this file can -also be used as template for Python modules. - -Note: - This file can be renamed depending on your needs or safely removed if not needed. - -References: - - https://setuptools.pypa.io/en/latest/userguide/entry_point.html - - https://pip.pypa.io/en/stable/reference/pip_install -""" - -import argparse -import logging -import sys - -from compressed_lists import __version__ - -__author__ = "Jayaram Kancherla" -__copyright__ = "Jayaram Kancherla" -__license__ = "MIT" - -_logger = logging.getLogger(__name__) - - -# ---- Python API ---- -# The functions defined in this section can be imported by users in their -# Python scripts/interactive interpreter, e.g. via -# `from compressed_lists.skeleton import fib`, -# when using this Python module as a library. - - -def fib(n): - """Fibonacci example function - - Args: - n (int): integer - - Returns: - int: n-th Fibonacci number - """ - assert n > 0 - a, b = 1, 1 - for _i in range(n - 1): - a, b = b, a + b - return a - - -# ---- CLI ---- -# The functions defined in this section are wrappers around the main Python -# API allowing them to be called directly from the terminal as a CLI -# executable/script. - - -def parse_args(args): - """Parse command line parameters - - Args: - args (List[str]): command line parameters as list of strings - (for example ``["--help"]``). - - Returns: - :obj:`argparse.Namespace`: command line parameters namespace - """ - parser = argparse.ArgumentParser(description="Just a Fibonacci demonstration") - parser.add_argument( - "--version", - action="version", - version=f"compressed-lists {__version__}", - ) - parser.add_argument(dest="n", help="n-th Fibonacci number", type=int, metavar="INT") - parser.add_argument( - "-v", - "--verbose", - dest="loglevel", - help="set loglevel to INFO", - action="store_const", - const=logging.INFO, - ) - parser.add_argument( - "-vv", - "--very-verbose", - dest="loglevel", - help="set loglevel to DEBUG", - action="store_const", - const=logging.DEBUG, - ) - return parser.parse_args(args) - - -def setup_logging(loglevel): - """Setup basic logging - - Args: - loglevel (int): minimum loglevel for emitting messages - """ - logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" - logging.basicConfig( - level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" - ) - - -def main(args): - """Wrapper allowing :func:`fib` to be called with string arguments in a CLI fashion - - Instead of returning the value from :func:`fib`, it prints the result to the - ``stdout`` in a nicely formatted message. - - Args: - args (List[str]): command line parameters as list of strings - (for example ``["--verbose", "42"]``). - """ - args = parse_args(args) - setup_logging(args.loglevel) - _logger.debug("Starting crazy calculations...") - print(f"The {args.n}-th Fibonacci number is {fib(args.n)}") - _logger.info("Script ends here") - - -def run(): - """Calls :func:`main` passing the CLI arguments extracted from :obj:`sys.argv` - - This function can be used as entry point to create console scripts with setuptools. - """ - main(sys.argv[1:]) - - -if __name__ == "__main__": - # ^ This is a guard statement that will prevent the following code from - # being executed in the case someone imports this file instead of - # executing it as a script. - # https://docs.python.org/3/library/__main__.html - - # After installing your project with pip, users can also run your Python - # modules as scripts via the ``-m`` flag, as defined in PEP 338:: - # - # python -m compressed_lists.skeleton 42 - # - run() diff --git a/tests/test_comp_custom.py b/tests/test_comp_custom.py new file mode 100644 index 0000000..1476109 --- /dev/null +++ b/tests/test_comp_custom.py @@ -0,0 +1,57 @@ +from typing import List + +import numpy as np +import pytest + +from compressed_lists import CompressedList, Partitioning + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +@pytest.fixture +def CompressedFloatList(): + class CompressedFloatList(CompressedList): + def __init__( + self, + unlist_data: np.ndarray, + partitioning: Partitioning, + element_metadata: dict = None, + metadata: dict = None, + ): + super().__init__( + unlist_data, partitioning, element_type="float", element_metadata=element_metadata, metadata=metadata + ) + + def _extract_range(self, start: int, end: int) -> List[float]: + return self._unlist_data[start:end].tolist() + + @classmethod + def from_list(cls, lst: List[List[float]], names: list = None, metadata: dict = None) -> "CompressedFloatList": + flat_data = [] + for sublist in lst: + flat_data.extend(sublist) + + partitioning = Partitioning.from_list(lst, names) + unlist_data = np.array(flat_data, dtype=np.float64) + return cls(unlist_data, partitioning, metadata=metadata) + + return CompressedFloatList + + +def test_custom_class(CompressedFloatList): + float_data = [[1.1, 2.2, 3.3], [4.4, 5.5], [6.6, 7.7, 8.8, 9.9]] + names = ["X", "Y", "Z"] + float_list = CompressedFloatList.from_list(float_data, names) + + assert len(float_list) == 3 + assert float_list._element_type == "float" + assert list(float_list.names) == names + assert float_list["Y"] == [4.4, 5.5] + + # Test lapply + rounded = float_list.lapply(lambda x: [round(f, 0) for f in x]) + assert rounded[0] == [1.0, 2.0, 3.0] + assert rounded[1] == [4.0, 6.0] + assert rounded[2] == [7.0, 8.0, 9.0, 10.0] diff --git a/tests/test_comp_int.py b/tests/test_comp_int.py new file mode 100644 index 0000000..90d1299 --- /dev/null +++ b/tests/test_comp_int.py @@ -0,0 +1,130 @@ +import numpy as np +import pytest + +from compressed_lists import CompressedIntegerList, Partitioning + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +@pytest.fixture +def int_data(): + return [[1, 2, 3], [4, 5], [6, 7, 8, 9]] + + +@pytest.fixture +def int_list(int_data): + names = ["A", "B", "C"] + return CompressedIntegerList.from_list(int_data, names) + + +def test_creation(int_data): + int_list = CompressedIntegerList.from_list(int_data) + + assert len(int_list) == 3 + assert isinstance(int_list.unlist_data, np.ndarray) + assert list(int_list.get_unlist_data()) == [1, 2, 3, 4, 5, 6, 7, 8, 9] + assert list(int_list.get_element_lengths()) == [3, 2, 4] + + +def test_creation_with_names(int_data): + names = ["A", "B", "C"] + int_list = CompressedIntegerList.from_list(int_data, names) + + assert list(int_list.names) == names + + +def test_validation(): + data = np.array([1, 2, 3, 4, 5]) + partitioning = Partitioning([2, 4, 7]) + + with pytest.raises(ValueError): + CompressedIntegerList(data, partitioning) + + +def test_getitem_by_index(int_list): + assert np.allclose(int_list[0], [1, 2, 3]) + assert np.allclose(int_list[1], [4, 5]) + assert np.allclose(int_list[2], [6, 7, 8, 9]) + + assert np.allclose(int_list[-1], [6, 7, 8, 9]) + + with pytest.raises(IndexError): + int_list[3] + + +def test_getitem_by_name(int_list): + assert np.allclose(int_list["A"], [1, 2, 3]) + assert np.allclose(int_list["B"], [4, 5]) + assert np.allclose(int_list["C"], [6, 7, 8, 9]) + + with pytest.raises(KeyError): + int_list["D"] + + +def test_getitem_by_slice(int_list): + sliced = int_list[1:3] + + assert len(sliced) == 2 + assert np.allclose(sliced[0], [4, 5]) + assert np.allclose(sliced[1], [6, 7, 8, 9]) + assert list(sliced.names) == ["B", "C"] + + # Empty slice + empty = int_list[3:4] + assert len(empty) == 0 + + +def test_iteration(int_list, int_data): + items = list(int_list) + print(items, int_data) + for i, lst in enumerate(items): + assert np.allclose(lst, int_data[i]) + + +def test_to_list(int_list, int_data): + regular_list = int_list.to_list() + for i, lst in enumerate(regular_list): + assert np.allclose(lst, int_data[i]) + + +def test_unlist(int_list): + unlisted = int_list.unlist() + assert isinstance(unlisted, np.ndarray) + assert np.allclose(list(unlisted), [1, 2, 3, 4, 5, 6, 7, 8, 9]) + + +def test_relist(int_list): + new_data = np.array([10, 20, 30, 40, 50, 60, 70, 80, 90], dtype=np.int64) + relisted = int_list.relist(new_data) + + assert len(relisted) == len(int_list) + assert list(relisted.get_names()) == list(int_list.names) + assert np.allclose(relisted[0], [10, 20, 30]) + assert np.allclose(relisted[1], [40, 50]) + assert np.allclose(relisted[2], [60, 70, 80, 90]) + + with pytest.raises(ValueError): + int_list.relist(np.array([1, 2, 3])) + + +def test_extract_subset(int_list): + subset = int_list.extract_subset([0, 2]) + + assert len(subset) == 2 + assert np.allclose(subset[0], [1, 2, 3]) + assert np.allclose(subset[1], [6, 7, 8, 9]) + assert list(subset.names) == ["A", "C"] + + with pytest.raises(IndexError): + int_list.extract_subset([0, 3]) + + +def test_lapply(int_list): + squared = int_list.lapply(lambda x: [i**2 for i in x]) + + assert len(squared) == len(int_list) + assert np.allclose(squared[0], [1, 4, 9]) + assert np.allclose(squared[1], [16, 25]) + assert np.allclose(squared[2], [36, 49, 64, 81]) diff --git a/tests/test_comp_str.py b/tests/test_comp_str.py new file mode 100644 index 0000000..ef54f8c --- /dev/null +++ b/tests/test_comp_str.py @@ -0,0 +1,41 @@ +import pytest + +from compressed_lists import CompressedStringList + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +@pytest.fixture +def char_data(): + return [["apple", "banana"], ["cherry", "date", "elderberry"], ["fig"]] + + +@pytest.fixture +def char_list(char_data): + names = ["fruits1", "fruits2", "fruits3"] + return CompressedStringList.from_list(char_data, names) + + +def test_creation(char_data): + char_list = CompressedStringList.from_list(char_data) + + assert len(char_list) == 3 + assert isinstance(char_list.unlist_data, list) + assert char_list.get_unlist_data() == ["apple", "banana", "cherry", "date", "elderberry", "fig"] + assert list(char_list.get_element_lengths()) == [2, 3, 1] + + +def test_getitem(char_list): + assert char_list[0] == ["apple", "banana"] + assert char_list["fruits2"] == ["cherry", "date", "elderberry"] + + +def test_lapply(char_list): + uppercased = char_list.lapply(lambda x: [s.upper() for s in x]) + + assert len(uppercased) == len(char_list) + assert uppercased[0] == ["APPLE", "BANANA"] + assert uppercased[1] == ["CHERRY", "DATE", "ELDERBERRY"] + assert uppercased[2] == ["FIG"] diff --git a/tests/test_partitioning.py b/tests/test_partitioning.py new file mode 100644 index 0000000..01a7363 --- /dev/null +++ b/tests/test_partitioning.py @@ -0,0 +1,80 @@ +import pytest + +from compressed_lists import ( + Partitioning, +) + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def test_init_with_ends(): + ends = [3, 5, 10] + names = ["A", "B", "C"] + part = Partitioning(ends, names) + + assert list(part.get_ends()) == ends + assert list(part.starts) == [0, 3, 5] + assert list(part.get_names()) == names + assert len(part) == 3 + + +def test_init_with_invalid_names(): + ends = [3, 5, 10] + names = ["A", "B"] + + with pytest.raises(ValueError): + Partitioning(ends, names) + + +def test_from_lengths(): + lengths = [2, 3, 5] + part = Partitioning.from_lengths(lengths) + + assert list(part.ends) == [2, 5, 10] + assert list(part.get_starts()) == [0, 2, 5] + assert len(part) == 3 + + +def test_from_list(): + lst = [[1, 2], [3, 4, 5], ["a", "b", "c", "d", "e"]] + part = Partitioning.from_list(lst) + + assert list(part.ends) == [2, 5, 10] + assert list(part.get_element_lengths()) == [2, 3, 5] + + +def test_nobj(): + ends = [3, 5, 10] + part = Partitioning(ends) + + assert part.nobj() == 10 + + empty_part = Partitioning([]) + assert empty_part.nobj() == 0 + + +def test_get_partition_range(): + ends = [3, 5, 10] + part = Partitioning(ends) + + assert part.get_partition_range(0) == (0, 3) + assert part.get_partition_range(1) == (3, 5) + assert part.get_partition_range(2) == (5, 10) + + with pytest.raises(IndexError): + part.get_partition_range(3) + + +def test_getitem(): + ends = [3, 5, 10] + part = Partitioning(ends) + + assert part[0] == (0, 3) + assert part[1] == (3, 5) + + assert part[0:2] == [(0, 3), (3, 5)] + + with pytest.raises(TypeError): + part["invalid"] diff --git a/tests/test_skeleton.py b/tests/test_skeleton.py deleted file mode 100644 index bd96f84..0000000 --- a/tests/test_skeleton.py +++ /dev/null @@ -1,25 +0,0 @@ -import pytest - -from compressed_lists.skeleton import fib, main - -__author__ = "Jayaram Kancherla" -__copyright__ = "Jayaram Kancherla" -__license__ = "MIT" - - -def test_fib(): - """API Tests""" - assert fib(1) == 1 - assert fib(2) == 1 - assert fib(7) == 13 - with pytest.raises(AssertionError): - fib(-10) - - -def test_main(capsys): - """CLI Tests""" - # capsys is a pytest fixture that allows asserts against stdout/stderr - # https://docs.pytest.org/en/stable/capture.html - main(["7"]) - captured = capsys.readouterr() - assert "The 7-th Fibonacci number is 13" in captured.out