@@ -158,6 +158,7 @@ def __init__(
158
158
read_pipe : Connection ,
159
159
write_pipe : Connection ,
160
160
need_work_queue : multiprocessing .Queue ,
161
+ file_format ,
161
162
):
162
163
super ().__init__ ()
163
164
self ._id = id_
@@ -168,6 +169,7 @@ def __init__(
168
169
self ._write_pipe = write_pipe
169
170
self ._need_work_queue = need_work_queue
170
171
self ._proxy_files = proxy_files
172
+ self ._file_format = file_format
171
173
172
174
def run (self ):
173
175
try :
@@ -189,7 +191,11 @@ def run(self):
189
191
io .BytesIO (self ._read_pipe .recv_bytes ())
190
192
for _ in range (self ._n_input_files )
191
193
]
192
- infiles = InputFiles (* files , interleaved = self ._interleaved_input )
194
+ infiles = InputFiles (
195
+ * files ,
196
+ interleaved = self ._interleaved_input ,
197
+ fileformat = self ._file_format ,
198
+ )
193
199
(n , bp1 , bp2 ) = self ._pipeline .process_reads (infiles )
194
200
stats += Statistics ().collect (n , bp1 , bp2 , [], [])
195
201
self ._send_outfiles (chunk_index , n )
@@ -320,7 +326,13 @@ def __init__(
320
326
)
321
327
self ._reader_process .daemon = True
322
328
self ._reader_process .start ()
323
- self ._input_file_format = self ._try_receive (file_format_connection_r )
329
+ self ._input_file_format : FileFormat = self ._try_receive (
330
+ file_format_connection_r
331
+ )
332
+ self ._file_format_string = self ._input_file_format .name .lower ()
333
+ if self ._file_format_string == "bam" :
334
+ # Individual BAM record chunks will have no header
335
+ self ._file_format_string = "bam_no_header"
324
336
325
337
def _start_workers (
326
338
self , pipeline , proxy_files
@@ -338,6 +350,7 @@ def _start_workers(
338
350
self ._connections [index ],
339
351
conn_w ,
340
352
self ._need_work_queue ,
353
+ file_format = self ._file_format_string ,
341
354
)
342
355
worker .daemon = True
343
356
worker .start ()
0 commit comments