Skip to content

Add streaming decompression for ZSTD_CONTENTSIZE_UNKNOWN case #707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

mkitti
Copy link
Contributor

@mkitti mkitti commented Feb 13, 2025

Zstandard can use a streaming compression scheme where the total size of the data is not known at the beginning of the process. In this case, the size of the data is unknown
and is not saved in the Zstandard frame header.

Before this pull request, numcodecs would refuse to decompress data if the size were unknown. This pull request adds a routine to decompress data if the size is unknown,
specifically when ZSTD_getFrameContentSize returns ZSTD_CONTENTSIZE_UNKNOWN.

This pull request is based on prior pull request I made to numcodecs.js:
manzt/numcodecs.js#47

Fixes zarr-developers/zarr-python#2056

xref:
zarr-developers/zarr-python#2056

TODO:

  • Unit tests and/or doctests in docstrings
  • Tests pass locally
  • Docstrings and API docs for any new/modified user-facing classes and functions
  • Changes documented in docs/release.rst
  • Docs build locally
  • GitHub Actions CI passes
  • Test coverage to 100% (Codecov passes)

@mkitti
Copy link
Contributor Author

mkitti commented Feb 24, 2025

@StephanPreibisch

Copy link

codecov bot commented Jun 27, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 99.96%. Comparing base (4fdb625) to head (a2c7fb0).

Additional details and impacted files
@@           Coverage Diff           @@
##             main     #707   +/-   ##
=======================================
  Coverage   99.96%   99.96%           
=======================================
  Files          63       63           
  Lines        2712     2726   +14     
=======================================
+ Hits         2711     2725   +14     
  Misses          1        1           
Files with missing lines Coverage Δ
numcodecs/tests/test_zstd.py 100.00% <100.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mkitti
Copy link
Contributor Author

mkitti commented Jun 27, 2025

Before

In [1]: import numcodecs

In [2]: numcodecs.__version__
Out[2]: '0.16.1'

In [3]: codec = numcodecs.Zstd()

In [4]: bytes_val = b'(\xb5/\xfd\x00Xa\x00\x00Hello World!'

In [5]: codec.decode(bytes_val)
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[5], line 1
----> 1 codec.decode(bytes_val)

File numcodecs/zstd.pyx:261, in numcodecs.zstd.Zstd.decode()

File numcodecs/zstd.pyx:191, in numcodecs.zstd.decompress()

RuntimeError: Zstd decompression error: invalid input data

In [6]: bytes3 = b'(\xb5/\xfd\x00X$\x02\x00\xa4\x03ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz\x01\x00:\xfc\xdfs\x05\x05L\x00\x00\x08s\x01\x00\xfc\xff9\x10\x02L\x00\x00
\x08k\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08c\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08[\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08S\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08K\x01\x00\xfc\xf
f9\x10\x02L\x00\x00\x08C\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08u\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08m\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08e\x01\x00\xfc\xff9\x10\x02L\x00\x00\
      ⋮ x08]\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08U\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08M\x01\x00\xfc\xff9\x10\x02M\x00\x00\x08E\x01\x00\xfc\x7f\x1d\x08\x01'

In [7]: codec.decode(bytes3)
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[7], line 1
----> 1 codec.decode(bytes3)

File numcodecs/zstd.pyx:261, in numcodecs.zstd.Zstd.decode()

File numcodecs/zstd.pyx:191, in numcodecs.zstd.decompress()

RuntimeError: Zstd decompression error: invalid input data

After

In [1]: import numcodecs

In [2]: codec = numcodecs.Zstd()

In [3]: bytes_val = b'(\xb5/\xfd\x00Xa\x00\x00Hello World!'

In [4]: codec.decode(bytes_val)
Out[4]: b'Hello World!'

In [5]: bytes3 = b'(\xb5/\xfd\x00X$\x02\x00\xa4\x03ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz\x01\x00:\xfc\xdfs\x05\x05L\x00\x00\x08s\x01\x00\xfc\xff9\x10\x02L\x00\x00
      ⋮ \x08k\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08c\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08[\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08S\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08K\x01\x00\xfc\xf
      ⋮ f9\x10\x02L\x00\x00\x08C\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08u\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08m\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08e\x01\x00\xfc\xff9\x10\x02L\x00\x00\
      ⋮ x08]\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08U\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08M\x01\x00\xfc\xff9\x10\x02M\x00\x00\x08E\x01\x00\xfc\x7f\x1d\x08\x01'

In [6]: codec.decode(bytes3) == b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 32 * 1024
Out[6]: True

@mkitti mkitti marked this pull request as ready for review June 27, 2025 23:37
@mkitti
Copy link
Contributor Author

mkitti commented Jun 27, 2025

@d-v-b @jakirkham could you review?

@mkitti
Copy link
Contributor Author

mkitti commented Jun 28, 2025

Consider the following script:

import zarr, tensorstore as ts, numpy as np
arr = ts.open({
    'driver': 'zarr',
    'kvstore': {
        'driver': 'file',
        'path': 'ts_zarr2_zstd',
    },
    'metadata': {
        'compressor': {
            'id': 'zstd',
            'level': 3,
        },
        'shape': [1024, 1024],
        'chunks': [64, 64],
        'dtype': '|u1',
    }
}, create=True, delete_existing=True).result()
arr[:,:] = np.random.randint(0, 9, size=(1024,1024), dtype='u1')
arr2 = zarr.open_array("ts_zarr2_zstd")
print(arr2[:,:])

Before

$ python ts_zarr2_zstd.py 
Traceback (most recent call last):
  File "/home/mkitti/src/numcodecs/before/ts_zarr2_zstd.py", line 20, in <module>
    print(arr2[:,:])
          ~~~~^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/array.py", line 2441, in __getitem__
    return self.get_orthogonal_selection(pure_selection, fields=fields)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/_compat.py", line 43, in inner_f
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/array.py", line 2883, in get_orthogonal_selection
    return sync(
           ^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/sync.py", line 163, in sync
    raise return_result
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/sync.py", line 119, in _runner
    return await coro
           ^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/array.py", line 1298, in _get_selection
    await self.codec_pipeline.read(
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 464, in read
    await concurrent_map(
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/common.py", line 69, in concurrent_map
    return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/common.py", line 67, in run
    return await func(*item)
           ^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 270, in read_batch
    chunk_array_batch = await self.decode_batch(
                        ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 190, in decode_batch
    chunk_array_batch = await ab_codec.decode(
                        ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/abc/codec.py", line 129, in decode
    return await _batching_helper(self._decode_single, chunks_and_specs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/abc/codec.py", line 407, in _batching_helper
    return await concurrent_map(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/common.py", line 69, in concurrent_map
    return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/core/common.py", line 67, in run
    return await func(*item)
           ^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/abc/codec.py", line 420, in wrap
    return await func(chunk, chunk_spec)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/site-packages/zarr/codecs/_v2.py", line 36, in _decode_single
    chunk = await asyncio.to_thread(self.compressor.decode, cdata)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/asyncio/threads.py", line 25, in to_thread
    return await loop.run_in_executor(None, func_call)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/mkitti/src/numcodecs/before/.pixi/envs/default/lib/python3.12/concurrent/futures/thread.py", line 59, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "numcodecs/zstd.pyx", line 261, in numcodecs.zstd.Zstd.decode
  File "numcodecs/zstd.pyx", line 191, in numcodecs.zstd.decompress
RuntimeError: Zstd decompression error: invalid input data

After

$ python before/ts_zarr2_zstd.py 
[[4 3 6 ... 3 4 2]
 [1 7 7 ... 0 8 5]
 [2 4 2 ... 0 1 2]
 ...
 [7 1 3 ... 4 5 4]
 [0 6 1 ... 4 1 4]
 [5 6 8 ... 6 1 7]]

@d-v-b
Copy link
Contributor

d-v-b commented Jun 28, 2025

it seems like compatibility with tensorstore is part of the goal here. Would it make sense to add an integration test that uses tensorstore?

@mkitti
Copy link
Contributor Author

mkitti commented Jun 28, 2025

it seems like compatibility with tensorstore is part of the goal here. Would it make sense to add an integration test that uses tensorstore?

This be better done in another pull request. It would be good to add zarr and tensorstore as optional test dependencies.

Also note that the zstd compatability issue only occurs when writing Zarr v2 with tensorstore.

The closest analog to numcodecs for tensorstore is riegeli where most of the compression routines are implemented.

@mkitti
Copy link
Contributor Author

mkitti commented Jun 30, 2025

@normanrz , you may be interested in taking a look as well with regard to Zstandard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

zarr-python cannot read arrays saved by tensorstore using the zstd compressor
2 participants