diff --git a/docs/cookbook/streaming.md b/docs/cookbook/streaming.md new file mode 100644 index 00000000..7d7b279a --- /dev/null +++ b/docs/cookbook/streaming.md @@ -0,0 +1,28 @@ +# Streaming updates + +`ndv` can be used to visualize data that is continuously updated, such as +images from a camera or a live data stream. The following document shows some +examples of such implementation. + +## Basic streaming, with no history + +To visualize a live data stream, simply create an `ndv.ArrayViewer` controller +with an empty buffer matching your data shape. Then, when new data is available, +update the buffer in place with the new data. Calling `update()` on the +[`ArrayDisplayModel.current_index`][ndv.models.ArrayDisplayModel] +will force the display to fetch your new data: + +````python title="examples/streaming.py" +--8<-- "examples/streaming.py" +```` + +## Streaming, remembering the last N frames + +To visualize a live data stream while keeping the last N frames in memory, +you can use the [`ndv.models.RingBuffer`][] class. It offers a convenient +`append()` method to add new data, and takes care of updating the "apparent" +shape of the data (as far as the viewer is concerned): + +````python title="examples/streaming_with_history.py" +--8<-- "examples/streaming_with_history.py" +```` diff --git a/examples/streaming.py b/examples/streaming.py new file mode 100644 index 00000000..6a79f3d5 --- /dev/null +++ b/examples/streaming.py @@ -0,0 +1,33 @@ +# /// script +# dependencies = [ +# "ndv[pyqt,pygfx]", +# "imageio[tifffile]", +# ] +# /// +"""Example of streaming data.""" + +import numpy as np + +import ndv + +# some data we're going to stream (as if it was coming from a camera) +data = ndv.data.cells3d()[:, 1] + +# a buffer to hold the current frame in the viewer +buffer = np.zeros_like(data[0]) +viewer = ndv.ArrayViewer(buffer) +viewer.show() + + +# function that will be called after the app is running +def stream(nframes: int = len(data) * 4) -> None: + for i in range(nframes): + # iterate over the data, update the buffer *in place*, + buffer[:] = data[i % len(data)] + # and update the viewer index to redraw + viewer.display_model.current_index.update() + ndv.process_events() # force viewer updates for this example + + +ndv.call_later(200, stream) +ndv.run_app() diff --git a/examples/streaming_with_history.py b/examples/streaming_with_history.py new file mode 100644 index 00000000..b6169f20 --- /dev/null +++ b/examples/streaming_with_history.py @@ -0,0 +1,37 @@ +# /// script +# dependencies = [ +# "ndv[pyqt,pygfx]", +# "imageio[tifffile]", +# ] +# /// +"""Example of streaming data, retaining the last N frames.""" + +import ndv +from ndv.models import RingBuffer + +# some data we're going to stream (as if it was coming from a camera) +data = ndv.data.cells3d()[:, 1] + +# a ring buffer to hold the data as it comes in +# the ring buffer is a circular buffer that holds the last N frames +N = 50 +rb = RingBuffer(max_capacity=N, dtype=(data.dtype, data.shape[-2:])) + +# pass the ring buffer to the viewer +viewer = ndv.ArrayViewer(rb) +viewer.show() + + +# function that will be called after the app is running +def stream() -> None: + # iterate over the data, add it to the ring buffer + for n, plane in enumerate(data): + rb.append(plane) + # and update the viewer index to redraw (and possibly move the slider) + viewer.display_model.current_index.update({0: max(n, N - 1)}) + + ndv.process_events() # force viewer updates for this example + + +ndv.call_later(200, stream) +ndv.run_app() diff --git a/mkdocs.yml b/mkdocs.yml index c434dbd4..0fc459b2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -21,6 +21,7 @@ nav: - env_var.md - Cookbook: - cookbook/embedding.md + - cookbook/streaming.md # This is populated by api-autonav plugin # - API reference: reference/ diff --git a/src/ndv/__init__.py b/src/ndv/__init__.py index 6d71a6d2..8d411fce 100644 --- a/src/ndv/__init__.py +++ b/src/ndv/__init__.py @@ -11,13 +11,21 @@ from .controllers import ArrayViewer from .models import DataWrapper from .util import imshow -from .views import run_app, set_canvas_backend, set_gui_backend +from .views import ( + call_later, + process_events, + run_app, + set_canvas_backend, + set_gui_backend, +) __all__ = [ "ArrayViewer", "DataWrapper", + "call_later", "data", "imshow", + "process_events", "run_app", "set_canvas_backend", "set_gui_backend", diff --git a/src/ndv/controllers/_array_viewer.py b/src/ndv/controllers/_array_viewer.py index 4e06fd4e..a24c8c54 100644 --- a/src/ndv/controllers/_array_viewer.py +++ b/src/ndv/controllers/_array_viewer.py @@ -557,6 +557,9 @@ def _on_data_response_ready(self, future: Future[DataResponse]) -> None: display_model = self._data_model.display for key, data in response.data.items(): + if data.size == 0: + # no data for this channel + continue if (lut_ctrl := self._lut_controllers.get(key)) is None: if key is None: model = display_model.default_lut diff --git a/src/ndv/models/__init__.py b/src/ndv/models/__init__.py index 18fc4ddf..70467e87 100644 --- a/src/ndv/models/__init__.py +++ b/src/ndv/models/__init__.py @@ -2,7 +2,7 @@ from ._array_display_model import ArrayDisplayModel, ChannelMode from ._base_model import NDVModel -from ._data_wrapper import DataWrapper +from ._data_wrapper import DataWrapper, RingBufferWrapper from ._lut_model import ( ClimPolicy, ClimsManual, @@ -11,6 +11,7 @@ ClimsStdDev, LUTModel, ) +from ._ring_buffer import RingBuffer from ._roi_model import RectangularROIModel __all__ = [ @@ -25,4 +26,6 @@ "LUTModel", "NDVModel", "RectangularROIModel", + "RingBuffer", + "RingBufferWrapper", ] diff --git a/src/ndv/models/_data_wrapper.py b/src/ndv/models/_data_wrapper.py index 640c1cfa..299dcb19 100644 --- a/src/ndv/models/_data_wrapper.py +++ b/src/ndv/models/_data_wrapper.py @@ -15,6 +15,8 @@ import numpy.typing as npt from psygnal import Signal +from ._ring_buffer import RingBuffer + if TYPE_CHECKING: from collections.abc import Container, Iterator from typing import Any, TypeAlias, TypeGuard @@ -498,3 +500,62 @@ def supports(cls, obj: Any) -> TypeGuard[torch.Tensor]: if (torch := sys.modules.get("torch")) and isinstance(obj, torch.Tensor): return True return False + + +class RingBufferWrapper(DataWrapper[RingBuffer]): + """Wrapper for ring buffer objects.""" + + def __init__( + self, + max_capacity: int | RingBuffer, + dtype: npt.DTypeLike = None, + *, + allow_overwrite: bool = True, + ): + if isinstance(max_capacity, RingBuffer): + if dtype is not None: # pragma: no cover + raise ValueError( + "Cannot specify dtype when passing an existing RingBuffer." + ) + self._ring = max_capacity + else: + if dtype is None: + dtype = float + self._ring = RingBuffer( + max_capacity=max_capacity, dtype=dtype, allow_overwrite=allow_overwrite + ) + self._ring.resized.connect(self.dims_changed) + super().__init__(self._ring) + + @property + def dims(self) -> tuple[int, ...]: + """Return the dimensions of the data.""" + return tuple(range(len(self._ring.shape))) + + @property + def coords(self) -> Mapping: + """Return the coordinates for the data.""" + shape = self._ring.shape + return {i: range(s) for i, s in enumerate(shape)} + + @classmethod + def supports(cls, obj: Any) -> TypeGuard[np.ndarray]: + if isinstance(obj, RingBuffer): + return True + return False + + def append(self, value: npt.ArrayLike) -> None: + """Append a value to the right end of the buffer.""" + self._ring.append(value) + + def appendleft(self, value: npt.ArrayLike) -> None: + """Append a value to the left end of the buffer.""" + self._ring.appendleft(value) + + def pop(self) -> np.ndarray: + """Pop a value from the right end of the buffer.""" + return self._ring.pop() + + def popleft(self) -> np.ndarray: + """Pop a value from the left end of the buffer.""" + return self._ring.popleft() diff --git a/src/ndv/models/_ring_buffer.py b/src/ndv/models/_ring_buffer.py new file mode 100644 index 00000000..a3276abe --- /dev/null +++ b/src/ndv/models/_ring_buffer.py @@ -0,0 +1,293 @@ +"""Numpy Ring Buffer. + +Vendored from https://github.com/eric-wieser/numpy_ringbuffer +April 27, 2025 +(and subsequently updated). + +MIT License + +Copyright (c) 2016 Eric Wieser + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +from __future__ import annotations + +import warnings +from collections.abc import Iterator, Sequence +from typing import TYPE_CHECKING, cast, overload + +import numpy as np +from psygnal import Signal + +if TYPE_CHECKING: + from typing import Any, Callable, SupportsIndex + + import numpy.typing as npt + + +class RingBuffer(Sequence): + """Ring buffer structure with a given capacity and element type. + + Parameters + ---------- + max_capacity: int + The maximum capacity of the ring buffer. + dtype: npt.DTypeLike + Desired type (and shape) of individual buffer elements. + This is passed to `np.empty`, so it can be any + [dtype-like object](https://numpy.org/doc/stable/reference/arrays.dtypes.html). + Common scenarios will be: + - a fixed dtype (e.g. `int`, `np.uint8`, `'u2'`, `np.dtype('f4')`) + - a `(fixed_dtype, shape)` tuple (e.g. `('uint16', (512, 512))`) + allow_overwrite: bool + If false, throw an IndexError when trying to append to an already full + buffer. + create_buffer: Callable[[int, npt.DTypeLike], npt.NDArray] + A callable that creates the underlying array. + May be used to customize the initialization of the array. Defaults to + `np.empty`. + + Notes + ----- + Vendored from [numpy-ringbuffer](https://github.com/eric-wieser/numpy_ringbuffer), + by Eric Wieser (MIT License). And updated with typing and signals. + """ + + resized = Signal(int) + + def __init__( + self, + max_capacity: int, + dtype: npt.DTypeLike = float, + *, + allow_overwrite: bool = True, + create_buffer: Callable[[int, npt.DTypeLike], npt.NDArray] = np.empty, + ) -> None: + self._arr = create_buffer(max_capacity, dtype) + self._left_index = 0 + self._right_index = 0 + self._capacity = max_capacity + self._allow_overwrite = allow_overwrite + + # -------------------- Properties -------------------- + + @property + def is_full(self) -> bool: + """True if there is no more space in the buffer.""" + return len(self) == self._capacity + + @property + def dtype(self) -> np.dtype: + """Return the dtype of the buffer.""" + return self._arr.dtype + + @property + def shape(self) -> tuple[int, ...]: + """Return the shape of the valid buffer (excluding unused space).""" + return (len(self),) + self._arr.shape[1:] + + # these mirror methods from deque + @property + def maxlen(self) -> int: + """Return the maximum capacity of the buffer.""" + return self._capacity + + # -------------------- Methods -------------------- + + def append(self, value: npt.ArrayLike) -> None: + """Append a value to the right end of the buffer.""" + if was_full := self.is_full: + if not self._allow_overwrite: + raise IndexError("append to a full RingBuffer with overwrite disabled") + elif not len(self): + return + else: + self._left_index += 1 + + self._arr[self._right_index % self._capacity] = value + self._right_index += 1 + self._fix_indices() + if not was_full: + self.resized.emit(len(self)) + + def appendleft(self, value: npt.ArrayLike) -> None: + """Append a value to the left end of the buffer.""" + if was_full := self.is_full: + if not self._allow_overwrite: + raise IndexError("append to a full RingBuffer with overwrite disabled") + elif not len(self): + return + else: + self._right_index -= 1 + + self._left_index -= 1 + self._fix_indices() + self._arr[self._left_index] = value + if not was_full: + self.resized.emit(len(self)) + + def pop(self) -> np.ndarray: + """Pop a value from the right end of the buffer.""" + if len(self) == 0: + raise IndexError("pop from an empty RingBuffer") + self._right_index -= 1 + self._fix_indices() + res = cast("np.ndarray", self._arr[self._right_index % self._capacity]) + self.resized.emit(len(self)) + return res + + def popleft(self) -> np.ndarray: + """Pop a value from the left end of the buffer.""" + if len(self) == 0: + raise IndexError("pop from an empty RingBuffer") + res = cast("np.ndarray", self._arr[self._left_index]) + self._left_index += 1 + self._fix_indices() + self.resized.emit(len(self)) + return res + + def extend(self, values: npt.ArrayLike) -> None: + """Extend the buffer with the given values.""" + values = np.asarray(values) + lv = len(values) + if len(self) + lv > self._capacity: + if not self._allow_overwrite: + raise IndexError( + "Extending a RingBuffer such that it would overflow, " + "with overwrite disabled." + ) + elif not len(self): + return + if lv >= self._capacity: + # wipe the entire array! - this may not be threadsafe + self._arr[...] = values[-self._capacity :] + self._right_index = self._capacity + self._left_index = 0 + return + + was_full = self.is_full + ri = self._right_index % self._capacity + sl1 = np.s_[ri : min(ri + lv, self._capacity)] + sl2 = np.s_[: max(ri + lv - self._capacity, 0)] + self._arr[sl1] = values[: sl1.stop - sl1.start] + self._arr[sl2] = values[sl1.stop - sl1.start :] + self._right_index += lv + + self._left_index = max(self._left_index, self._right_index - self._capacity) + self._fix_indices() + if not was_full: + self.resized.emit(len(self)) + + def extendleft(self, values: npt.ArrayLike) -> None: + """Prepend the buffer with the given values.""" + values = np.asarray(values) + lv = len(values) + if len(self) + lv > self._capacity: + if not self._allow_overwrite: + raise IndexError( + "Extending a RingBuffer such that it would overflow, " + "with overwrite disabled" + ) + elif not len(self): + return + if lv >= self._capacity: + # wipe the entire array! - this may not be threadsafe + self._arr[...] = values[: self._capacity] + self._right_index = self._capacity + self._left_index = 0 + return + + was_full = self.is_full + self._left_index -= lv + self._fix_indices() + li = self._left_index + sl1 = np.s_[li : min(li + lv, self._capacity)] + sl2 = np.s_[: max(li + lv - self._capacity, 0)] + self._arr[sl1] = values[: sl1.stop - sl1.start] + self._arr[sl2] = values[sl1.stop - sl1.start :] + + self._right_index = min(self._right_index, self._left_index + self._capacity) + if not was_full: + self.resized.emit(len(self)) + + # numpy compatibility + def __array__( + self, dtype: npt.DTypeLike = None, copy: bool | None = None + ) -> np.ndarray: + if copy is False: + warnings.warn( + "`copy=False` isn't supported. A copy is always created.", + RuntimeWarning, + stacklevel=2, + ) + return np.asarray(self._unwrap(), dtype=dtype) + + # implement Sequence methods + def __len__(self) -> int: + """Return the number of valid elements in the buffer.""" + return self._right_index - self._left_index + + @overload # type: ignore [override] + def __getitem__(self, key: SupportsIndex) -> Any: ... + @overload + def __getitem__(self, key: Any, /) -> np.ndarray: ... + def __getitem__(self, key: Any) -> np.ndarray | Any: + """Index into the buffer. + + This supports both simple and fancy indexing. + """ + # handle simple (b[1]) and basic (b[np.array([1, 2, 3])]) fancy indexing quickly + if not isinstance(key, tuple): + item_arr = np.asarray(key) + if issubclass(item_arr.dtype.type, np.integer): + # Map negative indices to positive ones + item_arr = np.where(item_arr < 0, item_arr + len(self), item_arr) + # Map indices to the range of the buffer + item_arr = (item_arr + self._left_index) % self._capacity + return self._arr[item_arr] + + # for everything else, get it right at the expense of efficiency + return self._unwrap()[key] + + def __iter__(self) -> Iterator[Any]: + # this is comparable in speed to using itertools.chain + return iter(self._unwrap()) + + def __repr__(self) -> str: + """Return a string representation of the buffer.""" + return f"<{self.__class__.__name__} of {np.asarray(self)!r}>" + + def _unwrap(self) -> np.ndarray: + """Copy the data from this buffer into unwrapped form.""" + return np.concatenate( + ( + self._arr[self._left_index : min(self._right_index, self._capacity)], + self._arr[: max(self._right_index - self._capacity, 0)], + ) + ) + + def _fix_indices(self) -> None: + """Enforce our invariant that 0 <= self._left_index < self._capacity.""" + if self._left_index >= self._capacity: + self._left_index -= self._capacity + self._right_index -= self._capacity + elif self._left_index < 0: + self._left_index += self._capacity + self._right_index += self._capacity diff --git a/src/ndv/views/__init__.py b/src/ndv/views/__init__.py index aa7e8ba6..89f9ee5f 100644 --- a/src/ndv/views/__init__.py +++ b/src/ndv/views/__init__.py @@ -12,6 +12,7 @@ get_array_view_class, get_histogram_canvas_class, gui_frontend, + process_events, run_app, set_canvas_backend, set_gui_backend, @@ -25,6 +26,7 @@ "get_array_view_class", "get_histogram_canvas_class", "gui_frontend", + "process_events", "run_app", "set_canvas_backend", "set_gui_backend", diff --git a/src/ndv/views/_app.py b/src/ndv/views/_app.py index 71dd720c..f72e5743 100644 --- a/src/ndv/views/_app.py +++ b/src/ndv/views/_app.py @@ -339,6 +339,11 @@ def call_later(msec: int, func: Callable[[], None]) -> None: ndv_app().call_later(msec, func) +def process_events() -> None: + """Force processing of events for the application.""" + ndv_app().process_events() + + def run_app() -> None: """Start the active GUI application event loop.""" ndv_app().run() diff --git a/tests/test_ring_buffer.py b/tests/test_ring_buffer.py new file mode 100644 index 00000000..d00b9078 --- /dev/null +++ b/tests/test_ring_buffer.py @@ -0,0 +1,227 @@ +import numpy as np +import pytest + +from ndv.models._ring_buffer import RingBuffer + + +def test_dtype() -> None: + r = RingBuffer(5) + assert r.dtype == np.dtype(np.float64) + + r = RingBuffer(5, dtype=bool) + assert r.dtype == np.dtype(bool) + + +def test_sizes() -> None: + rb = RingBuffer(5, dtype=(int, 2)) + assert rb.maxlen == 5 + assert len(rb) == 0 + assert rb.shape == (0, 2) + + rb.append([0, 0]) + assert rb.maxlen == 5 + assert len(rb) == 1 + assert rb.shape == (1, 2) + + +def test_append() -> None: + rb = RingBuffer(5) + + rb.append(1) + np.testing.assert_equal(rb, np.array([1])) + assert len(rb) == 1 + + rb.append(2) + np.testing.assert_equal(rb, np.array([1, 2])) + assert len(rb) == 2 + + rb.append(3) + rb.append(4) + rb.append(5) + np.testing.assert_equal(rb, np.array([1, 2, 3, 4, 5])) + assert len(rb) == 5 + + rb.append(6) + np.testing.assert_equal(rb, np.array([2, 3, 4, 5, 6])) + assert len(rb) == 5 + + assert rb[4] == 6 + assert rb[-1] == 6 + + +def test_getitem() -> None: + rb = RingBuffer(5) + rb.extend([1, 2, 3]) + rb.extendleft([4, 5]) + expected = np.array([4, 5, 1, 2, 3]) + np.testing.assert_equal(rb, expected) + + for i in range(rb.maxlen): + assert expected[i] == rb[i] + + ii = [0, 4, 3, 1, 2] + np.testing.assert_equal(rb[ii], expected[ii]) + + +def test_getitem_negative_index() -> None: + rb = RingBuffer(5) + rb.extend([1, 2, 3]) + assert rb[-1] == 3 + + +def test_appendleft() -> None: + rb = RingBuffer(5) + + rb.appendleft(1) + np.testing.assert_equal(rb, np.array([1])) + assert len(rb) == 1 + + rb.appendleft(2) + np.testing.assert_equal(rb, np.array([2, 1])) + assert len(rb) == 2 + + rb.appendleft(3) + rb.appendleft(4) + rb.appendleft(5) + np.testing.assert_equal(rb, np.array([5, 4, 3, 2, 1])) + assert len(rb) == 5 + + rb.appendleft(6) + np.testing.assert_equal(rb, np.array([6, 5, 4, 3, 2])) + assert len(rb) == 5 + + +def test_extend() -> None: + rb = RingBuffer(5) + rb.extend([1, 2, 3]) + np.testing.assert_equal(rb, np.array([1, 2, 3])) + rb.popleft() + rb.extend([4, 5, 6]) + np.testing.assert_equal(rb, np.array([2, 3, 4, 5, 6])) + rb.extendleft([0, 1]) + np.testing.assert_equal(rb, np.array([0, 1, 2, 3, 4])) + + rb.extendleft([1, 2, 3, 4, 5, 6, 7]) + np.testing.assert_equal(rb, np.array([1, 2, 3, 4, 5])) + + rb.extend([1, 2, 3, 4, 5, 6, 7]) + np.testing.assert_equal(rb, np.array([3, 4, 5, 6, 7])) + + +def test_pops() -> None: + rb = RingBuffer(3) + rb.append(1) + rb.appendleft(2) + rb.append(3) + np.testing.assert_equal(rb, np.array([2, 1, 3])) + + assert rb.pop() == 3 + np.testing.assert_equal(rb, np.array([2, 1])) + + assert rb.popleft() == 2 + np.testing.assert_equal(rb, np.array([1])) + + # test empty pops + empty = RingBuffer(1) + with pytest.raises(IndexError, match="pop from an empty RingBuffer"): + empty.pop() + with pytest.raises(IndexError, match="pop from an empty RingBuffer"): + empty.popleft() + + +def test_2d() -> None: + rb = RingBuffer(5, dtype=(float, 2)) + + rb.append([1, 2]) + np.testing.assert_equal(rb, np.array([[1, 2]])) + assert len(rb) == 1 + assert np.shape(rb) == (1, 2) + + rb.append([3, 4]) + np.testing.assert_equal(rb, np.array([[1, 2], [3, 4]])) + assert len(rb) == 2 + assert np.shape(rb) == (2, 2) + + rb.appendleft([5, 6]) + np.testing.assert_equal(rb, np.array([[5, 6], [1, 2], [3, 4]])) + assert len(rb) == 3 + assert np.shape(rb) == (3, 2) + + np.testing.assert_equal(rb[0], [5, 6]) + np.testing.assert_equal(rb[0, :], [5, 6]) + np.testing.assert_equal(rb[:, 0], [5, 1, 3]) + + +def test_3d() -> None: + np.random.seed(0) + frame_shape = (32, 32) + + rb = RingBuffer(5, dtype=("u2", frame_shape)) + frame = np.random.randint(0, 65535, frame_shape, dtype="u2") + rb.append(frame) + np.testing.assert_equal(rb, frame[None]) + frame2 = np.random.randint(0, 65535, frame_shape, dtype="u2") + rb.append(frame2) + np.testing.assert_equal(rb[-1], frame2) + np.testing.assert_equal(rb, np.array([frame, frame2])) + + # fill buffer + for _ in range(5): + rb.append(np.random.randint(0, 65535, frame_shape, dtype="u2")) + + # add known frame + frame3 = np.random.randint(0, 65535, frame_shape, dtype="u2") + rb.append(frame3) + np.testing.assert_equal(rb[-1], frame3) + + +def test_iter() -> None: + rb = RingBuffer(5) + for i in range(5): + rb.append(i) + for i, j in zip(rb, range(5)): + assert i == j + + +def test_repr() -> None: + rb = RingBuffer(5, dtype=int) + for i in range(5): + rb.append(i) + + assert repr(rb) == "" + + +def test_no_overwrite() -> None: + rb = RingBuffer(3, allow_overwrite=False) + rb.append(1) + rb.append(2) + rb.appendleft(3) + with pytest.raises(IndexError, match="overwrite"): + rb.appendleft(4) + with pytest.raises(IndexError, match="overwrite"): + rb.extendleft([4]) + rb.extendleft([]) + + np.testing.assert_equal(rb, np.array([3, 1, 2])) + with pytest.raises(IndexError, match="overwrite"): + rb.append(4) + with pytest.raises(IndexError, match="overwrite"): + rb.extend([4]) + rb.extend([]) + + # works fine if we pop the surplus + rb.pop() + rb.append(4) + np.testing.assert_equal(rb, np.array([3, 1, 4])) + + +def test_degenerate() -> None: + r = RingBuffer(0) + np.testing.assert_equal(r, np.array([])) + + # this does not error with deque(maxlen=0), so should not error here + try: + r.append(0) + r.appendleft(0) + except IndexError: + pytest.fail("IndexError raised when appending to a degenerate RingBuffer")