Skip to content

Commit ca72ac8

Browse files
committed
Allow multithreaded BAM reading
1 parent 4a95985 commit ca72ac8

File tree

3 files changed

+24
-4
lines changed

3 files changed

+24
-4
lines changed

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ classifiers = [
2222
requires-python = ">=3.8"
2323
dynamic = ["version"]
2424
dependencies = [
25-
"dnaio >= 1.2.0",
25+
"dnaio >= 1.3.0",
2626
"xopen >= 1.6.0",
2727
]
2828

src/cutadapt/files.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,21 @@ def __init__(
9595
self,
9696
*files: BinaryIO,
9797
interleaved: bool = False,
98+
fileformat=None,
9899
):
99100
self._files = files
100101
self.interleaved = interleaved
102+
self.fileformat = fileformat
101103
for f in self._files:
102104
assert f is not None
103105

104106
def open(self):
105-
return dnaio.open(*self._files, interleaved=self.interleaved, mode="r")
107+
return dnaio.open(
108+
*self._files,
109+
interleaved=self.interleaved,
110+
mode="r",
111+
fileformat=self.fileformat,
112+
)
106113

107114
def close(self) -> None:
108115
for file in self._files:

src/cutadapt/runners.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ def __init__(
158158
read_pipe: Connection,
159159
write_pipe: Connection,
160160
need_work_queue: multiprocessing.Queue,
161+
file_format,
161162
):
162163
super().__init__()
163164
self._id = id_
@@ -168,6 +169,7 @@ def __init__(
168169
self._write_pipe = write_pipe
169170
self._need_work_queue = need_work_queue
170171
self._proxy_files = proxy_files
172+
self._file_format = file_format
171173

172174
def run(self):
173175
try:
@@ -189,7 +191,11 @@ def run(self):
189191
io.BytesIO(self._read_pipe.recv_bytes())
190192
for _ in range(self._n_input_files)
191193
]
192-
infiles = InputFiles(*files, interleaved=self._interleaved_input)
194+
infiles = InputFiles(
195+
*files,
196+
interleaved=self._interleaved_input,
197+
fileformat=self._file_format,
198+
)
193199
(n, bp1, bp2) = self._pipeline.process_reads(infiles)
194200
stats += Statistics().collect(n, bp1, bp2, [], [])
195201
self._send_outfiles(chunk_index, n)
@@ -320,7 +326,13 @@ def __init__(
320326
)
321327
self._reader_process.daemon = True
322328
self._reader_process.start()
323-
self._input_file_format = self._try_receive(file_format_connection_r)
329+
self._input_file_format: FileFormat = self._try_receive(
330+
file_format_connection_r
331+
)
332+
self._file_format_string = self._input_file_format.name.lower()
333+
if self._file_format_string == "bam":
334+
# Individual BAM record chunks will have no header
335+
self._file_format_string = "bam_no_header"
324336

325337
def _start_workers(
326338
self, pipeline, proxy_files
@@ -338,6 +350,7 @@ def _start_workers(
338350
self._connections[index],
339351
conn_w,
340352
self._need_work_queue,
353+
file_format=self._file_format_string,
341354
)
342355
worker.daemon = True
343356
worker.start()

0 commit comments

Comments
 (0)