2020
2121
2222def open (filename , mode = "rb" , compresslevel = igzip ._COMPRESS_LEVEL_TRADEOFF ,
23- encoding = None , errors = None , newline = None , * , threads = 1 ):
23+ encoding = None , errors = None , newline = None , * , threads = 1 ,
24+ block_size = 1024 * 1024 ):
2425 """
2526 Utilize threads to read and write gzip objects and escape the GIL.
2627 Comparable to gzip.open. This method is only usable for streamed reading
@@ -39,6 +40,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
3940 :param threads: If 0 will defer to igzip.open, if < 0 will use all threads
4041 available to the system. Reading gzip can only
4142 use one thread.
43+ :param block_size: Determines how large the blocks in the read/write
44+ queues are for threaded reading and writing.
4245 :return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
4346 depending on the mode.
4447 """
@@ -61,21 +64,27 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
6164 else :
6265 raise TypeError ("filename must be a str or bytes object, or a file" )
6366 if "r" in mode :
64- gzip_file = io .BufferedReader (_ThreadedGzipReader (binary_file ))
67+ gzip_file = io .BufferedReader (
68+ _ThreadedGzipReader (binary_file , block_size = block_size ))
6569 else :
6670 gzip_file = io .BufferedWriter (
67- _ThreadedGzipWriter (binary_file , compresslevel , threads ),
68- buffer_size = 1024 * 1024
71+ _ThreadedGzipWriter (
72+ fp = binary_file ,
73+ block_size = block_size ,
74+ level = compresslevel ,
75+ threads = threads
76+ ),
77+ buffer_size = block_size
6978 )
7079 if "t" in mode :
7180 return io .TextIOWrapper (gzip_file , encoding , errors , newline )
7281 return gzip_file
7382
7483
7584class _ThreadedGzipReader (io .RawIOBase ):
76- def __init__ (self , fp , queue_size = 4 , block_size = 8 * 1024 * 1024 ):
85+ def __init__ (self , fp , queue_size = 2 , block_size = 1024 * 1024 ):
7786 self .raw = fp
78- self .fileobj = igzip ._IGzipReader (fp , buffersize = 8 * 1024 * 1024 )
87+ self .fileobj = igzip ._IGzipReader (fp , buffersize = 8 * block_size )
7988 self .pos = 0
8089 self .read_file = False
8190 self .queue = queue .Queue (queue_size )
@@ -179,35 +188,53 @@ class _ThreadedGzipWriter(io.RawIOBase):
179188
180189 The writer thread reads from output queues and uses the crc32_combine
181190 function to calculate the total crc. It also writes the compressed block.
191+
192+ When only one thread is requested, only the input queue is used and
193+ compressing and output is handled in one thread.
182194 """
183195 def __init__ (self ,
184196 fp : BinaryIO ,
185197 level : int = isal_zlib .ISAL_DEFAULT_COMPRESSION ,
186198 threads : int = 1 ,
187- queue_size : int = 2 ):
188- if level < 0 or level > 3 :
189- raise ValueError (
190- f"Invalid compression level, "
191- f"level should be between 0 and 3: { level } " )
199+ queue_size : int = 1 ,
200+ block_size : int = 1024 * 1024 ,
201+ ):
192202 self .lock = threading .Lock ()
193203 self .exception : Optional [Exception ] = None
194204 self .raw = fp
195205 self .level = level
196206 self .previous_block = b""
197- self .input_queues : List [queue .Queue [Tuple [bytes , memoryview ]]] = [
198- queue .Queue (queue_size ) for _ in range (threads )]
199- self .output_queues : List [queue .Queue [Tuple [bytes , int , int ]]] = [
200- queue .Queue (queue_size ) for _ in range (threads )]
201- self .index = 0
207+ # Deflating random data results in an output a little larger than the
208+ # input. Making the output buffer 10% larger is sufficient overkill.
209+ compress_buffer_size = block_size + max (block_size // 10 , 500 )
210+ self .block_size = block_size
211+ self .compressors : List [isal_zlib ._ParallelCompress ] = [
212+ isal_zlib ._ParallelCompress (buffersize = compress_buffer_size ,
213+ level = level ) for _ in range (threads )
214+ ]
215+ if threads > 1 :
216+ self .input_queues : List [queue .Queue [Tuple [bytes , memoryview ]]] = [
217+ queue .Queue (queue_size ) for _ in range (threads )]
218+ self .output_queues : List [queue .Queue [Tuple [bytes , int , int ]]] = [
219+ queue .Queue (queue_size ) for _ in range (threads )]
220+ self .output_worker = threading .Thread (target = self ._write )
221+ self .compression_workers = [
222+ threading .Thread (target = self ._compress , args = (i ,))
223+ for i in range (threads )
224+ ]
225+ elif threads == 1 :
226+ self .input_queues = [queue .Queue (queue_size )]
227+ self .output_queues = []
228+ self .compression_workers = []
229+ self .output_worker = threading .Thread (
230+ target = self ._compress_and_write )
231+ else :
232+ raise ValueError (f"threads should be at least 1, got { threads } " )
202233 self .threads = threads
234+ self .index = 0
203235 self ._crc = 0
204236 self .running = False
205237 self ._size = 0
206- self .output_worker = threading .Thread (target = self ._write )
207- self .compression_workers = [
208- threading .Thread (target = self ._compress , args = (i ,))
209- for i in range (threads )
210- ]
211238 self ._closed = False
212239 self ._write_gzip_header ()
213240 self .start ()
@@ -246,8 +273,19 @@ def write(self, b) -> int:
246273 with self .lock :
247274 if self .exception :
248275 raise self .exception
249- index = self .index
276+ length = b .nbytes if isinstance (b , memoryview ) else len (b )
277+ if length > self .block_size :
278+ # write smaller chunks and return the result
279+ memview = memoryview (b )
280+ start = 0
281+ total_written = 0
282+ while start < length :
283+ total_written += self .write (
284+ memview [start :start + self .block_size ])
285+ start += self .block_size
286+ return total_written
250287 data = bytes (b )
288+ index = self .index
251289 zdict = memoryview (self .previous_block )[- DEFLATE_WINDOW_SIZE :]
252290 self .previous_block = data
253291 self .index += 1
@@ -289,6 +327,7 @@ def closed(self) -> bool:
289327 def _compress (self , index : int ):
290328 in_queue = self .input_queues [index ]
291329 out_queue = self .output_queues [index ]
330+ compressor : isal_zlib ._ParallelCompress = self .compressors [index ]
292331 while True :
293332 try :
294333 data , zdict = in_queue .get (timeout = 0.05 )
@@ -297,23 +336,11 @@ def _compress(self, index: int):
297336 return
298337 continue
299338 try :
300- compressor = isal_zlib .compressobj (
301- self .level , wbits = - 15 , zdict = zdict )
302- compressed = compressor .compress (data ) + compressor .flush (
303- isal_zlib .Z_SYNC_FLUSH )
304- crc = isal_zlib .crc32 (data )
339+ compressed , crc = compressor .compress_and_crc (data , zdict )
305340 except Exception as e :
306- with self .lock :
307- self .exception = e
308- # Abort everything and empty the queue
309- in_queue .task_done ()
310- self .running = False
311- while True :
312- try :
313- _ = in_queue .get (timeout = 0.05 )
314- in_queue .task_done ()
315- except queue .Empty :
316- return
341+ in_queue .task_done ()
342+ self ._set_error_and_empty_queue (e , in_queue )
343+ return
317344 data_length = len (data )
318345 out_queue .put ((compressed , crc , data_length ))
319346 in_queue .task_done ()
@@ -341,5 +368,46 @@ def _write(self):
341368 output_queue .task_done ()
342369 index += 1
343370
371+ def _compress_and_write (self ):
372+ if not self .threads == 1 :
373+ raise SystemError ("Compress_and_write is for one thread only" )
374+ fp = self .raw
375+ total_crc = 0
376+ size = 0
377+ in_queue = self .input_queues [0 ]
378+ compressor = self .compressors [0 ]
379+ while True :
380+ try :
381+ data , zdict = in_queue .get (timeout = 0.05 )
382+ except queue .Empty :
383+ if not self .running :
384+ self ._crc = total_crc
385+ self ._size = size
386+ return
387+ continue
388+ try :
389+ compressed , crc = compressor .compress_and_crc (data , zdict )
390+ except Exception as e :
391+ in_queue .task_done ()
392+ self ._set_error_and_empty_queue (e , in_queue )
393+ return
394+ data_length = len (data )
395+ total_crc = isal_zlib .crc32_combine (total_crc , crc , data_length )
396+ size += data_length
397+ fp .write (compressed )
398+ in_queue .task_done ()
399+
400+ def _set_error_and_empty_queue (self , error , q ):
401+ with self .lock :
402+ self .exception = error
403+ # Abort everything and empty the queue
404+ self .running = False
405+ while True :
406+ try :
407+ _ = q .get (timeout = 0.05 )
408+ q .task_done ()
409+ except queue .Empty :
410+ return
411+
344412 def writable (self ) -> bool :
345413 return True
0 commit comments