Skip to content

Commit 6ab0c15

Browse files
authored
Merge pull request #234 from pycompression/issue230
Fix multithreaded flushing implementation.
2 parents 993f529 + 0fef0a9 commit 6ab0c15

File tree

3 files changed

+29
-14
lines changed

3 files changed

+29
-14
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ Changelog
1111
version 1.8.0-dev
1212
-----------------
1313
+ Python 3.8 is no longer supported.
14+
+ Fix an issue where flushing using igzip_threaded caused a gzip end of stream
15+
and started a new gzip stream. In essence creating a concatenated gzip
16+
stream. Now it is in concordance with how single threaded gzip streams
17+
are flushed using Z_SYNC_FLUSH.
1418
+ Change build backend to setuptools-scm which is more commonly used and
1519
supported.
1620
+ Include test packages in the source distribution, so source distribution

src/isal/igzip_threaded.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -316,30 +316,26 @@ def write(self, b) -> int:
316316
self.input_queues[worker_index].put((data, zdict))
317317
return len(data)
318318

319-
def _end_gzip_stream(self):
319+
def flush(self):
320320
self._check_closed()
321321
# Wait for all data to be compressed
322322
for in_q in self.input_queues:
323323
in_q.join()
324324
# Wait for all data to be written
325325
for out_q in self.output_queues:
326326
out_q.join()
327-
# Write an empty deflate block with a lost block marker.
327+
self.raw.flush()
328+
329+
def close(self) -> None:
330+
if self._closed:
331+
return
332+
self.flush()
328333
self.raw.write(isal_zlib.compress(b"", wbits=-15))
329334
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
330335
self.raw.write(trailer)
331336
self._crc = 0
332337
self._size = 0
333338
self.raw.flush()
334-
335-
def flush(self):
336-
self._end_gzip_stream()
337-
self._write_gzip_header()
338-
339-
def close(self) -> None:
340-
if self._closed:
341-
return
342-
self._end_gzip_stream()
343339
self.stop()
344340
if self.exception:
345341
self.raw.close()

tests/test_igzip_threaded.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import subprocess
1313
import sys
1414
import tempfile
15+
import zlib
1516
from pathlib import Path
1617

1718
from isal import igzip_threaded
@@ -243,15 +244,29 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads):
243244

244245
@pytest.mark.parametrize("threads", [1, 2])
245246
def test_flush(tmp_path, threads):
247+
empty_block_end = b"\x00\x00\xff\xff"
248+
compressobj = zlib.compressobj(wbits=-15)
249+
deflate_last_block = compressobj.compress(b"") + compressobj.flush()
246250
test_file = tmp_path / "output.gz"
247251
with igzip_threaded.open(test_file, "wb", threads=threads) as f:
248252
f.write(b"1")
249253
f.flush()
250-
assert gzip.decompress(test_file.read_bytes()) == b"1"
254+
data = test_file.read_bytes()
255+
assert data[-4:] == empty_block_end
256+
# Cut off gzip header and end data with an explicit last block to
257+
# test if the data was compressed correctly.
258+
deflate_block = data[10:] + deflate_last_block
259+
assert zlib.decompress(deflate_block, wbits=-15) == b"1"
251260
f.write(b"2")
252261
f.flush()
253-
assert gzip.decompress(test_file.read_bytes()) == b"12"
262+
data = test_file.read_bytes()
263+
assert data[-4:] == empty_block_end
264+
deflate_block = data[10:] + deflate_last_block
265+
assert zlib.decompress(deflate_block, wbits=-15) == b"12"
254266
f.write(b"3")
255267
f.flush()
256-
assert gzip.decompress(test_file.read_bytes()) == b"123"
268+
data = test_file.read_bytes()
269+
assert data[-4:] == empty_block_end
270+
deflate_block = data[10:] + deflate_last_block
271+
assert zlib.decompress(deflate_block, wbits=-15) == b"123"
257272
assert gzip.decompress(test_file.read_bytes()) == b"123"

0 commit comments

Comments
 (0)