Skip to content

Commit 0f4551d

Browse files
committed
fix: reduce memory usage in subtraction elimination step
1 parent cf17b14 commit 0f4551d

File tree

3 files changed

+33
-129
lines changed

3 files changed

+33
-129
lines changed

Cargo.lock

Lines changed: 0 additions & 46 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ rustc-hash = "2.0"
1616
thiserror = "1.0"
1717
log = "0.4"
1818
env_logger = "0.11"
19-
rayon = "1.8"
2019

2120
[dependencies.pyo3]
2221
version = "^0.22.0"

src/subtraction.rs

Lines changed: 33 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
use crate::sam::{extract_alignment_score, SamReader, CHUNK_SIZE};
22
use crate::PathoscopeError;
33
use log::info;
4-
use rayon::prelude::*;
54
use rust_htslib::bam;
65
use rust_htslib::bam::Format;
76
use rustc_hash::FxHashMap;
87
use std::collections::HashSet;
98
use std::fs::File;
109
use std::io::{BufRead, BufReader, BufWriter, Write};
1110

12-
/// Result type for parallel chunk processing
13-
type ChunkResult = (Vec<bam::Record>, HashSet<String>);
14-
1511
#[derive(Debug, Clone)]
1612
pub struct SubtractionProcessor {
1713
subtraction_scores: FxHashMap<String, f32>,
@@ -135,99 +131,54 @@ pub fn process_isolate_file(
135131
let mut writer = bam::Writer::from_path(output_path, &header, Format::Bam)
136132
.map_err(PathoscopeError::Htslib)?;
137133

138-
// Use half the threads for BAM compression, reserve the rest for rayon processing
139-
let writer_threads = (proc / 2).max(1);
140-
let rayon_threads = proc - writer_threads;
141-
142-
// Configure writer to use compression threads
143134
writer
144-
.set_threads(writer_threads)
135+
.set_threads(proc)
145136
.map_err(PathoscopeError::Htslib)?;
146137

147-
// Configure rayon thread pool for this scope
148-
let pool = rayon::ThreadPoolBuilder::new()
149-
.num_threads(rayon_threads)
150-
.build()
151-
.map_err(|e| {
152-
PathoscopeError::Parse(format!("Failed to create thread pool: {}", e))
153-
})?;
154-
155138
let mut all_subtracted_read_ids = HashSet::new();
156139
let mut write_buffer: Vec<bam::Record> = Vec::with_capacity(CHUNK_SIZE);
157140

158-
// Collect all chunks first for parallel processing
159-
let mut all_chunks = Vec::new();
160-
161141
reader.stream_chunks(|chunk| {
162-
// Clone the chunk to store it
163-
let chunk_data: Vec<bam::Record> = chunk.to_vec();
164-
all_chunks.push(chunk_data);
165-
Ok(())
166-
})?;
142+
let mut chunk_records_to_write = Vec::new();
143+
let mut chunk_subtracted_read_ids = HashSet::new();
144+
145+
for record in chunk {
146+
if record.is_unmapped() {
147+
continue;
148+
}
149+
150+
let read_id_str =
151+
unsafe { std::str::from_utf8_unchecked(record.qname()) };
152+
153+
if record.tid() < 0 {
154+
continue;
155+
}
156+
157+
let isolate_score = match extract_alignment_score(record) {
158+
Some(score) => score as f32,
159+
None => continue,
160+
};
161+
162+
if processor.should_eliminate(read_id_str, isolate_score) {
163+
chunk_subtracted_read_ids.insert(read_id_str.to_string());
164+
} else {
165+
chunk_records_to_write.push(record.clone());
166+
}
167+
}
168+
169+
all_subtracted_read_ids.extend(chunk_subtracted_read_ids);
170+
write_buffer.extend(chunk_records_to_write);
167171

168-
// Process chunks in parallel using our custom thread pool
169-
let results: Result<Vec<ChunkResult>, String> = pool.install(|| {
170-
all_chunks
171-
.into_par_iter()
172-
.map(|chunk| {
173-
let mut records_to_write = Vec::new();
174-
let mut subtracted_read_ids = HashSet::new();
175-
176-
for record in chunk {
177-
// Skip unmapped reads
178-
if record.is_unmapped() {
179-
continue;
180-
}
181-
182-
let read_id_str =
183-
unsafe { std::str::from_utf8_unchecked(record.qname()) };
184-
185-
// Skip if reference is unmapped (tid < 0 means unmapped)
186-
if record.tid() < 0 {
187-
continue;
188-
}
189-
190-
// Calculate alignment score using shared function
191-
let isolate_score = match extract_alignment_score(&record) {
192-
Some(score) => score as f32,
193-
None => continue,
194-
};
195-
196-
// Check if this read should be eliminated
197-
if processor.should_eliminate(read_id_str, isolate_score) {
198-
// Only allocate string when we need to store it
199-
subtracted_read_ids.insert(read_id_str.to_string());
200-
} else {
201-
// Add record to write buffer
202-
records_to_write.push(record);
203-
}
204-
}
205-
206-
Ok((records_to_write, subtracted_read_ids))
207-
})
208-
.collect()
209-
});
210-
211-
let chunk_results = results.map_err(PathoscopeError::Parse)?;
212-
213-
// Write results in batches and collect subtracted IDs
214-
for (records_to_write, subtracted_ids) in chunk_results {
215-
// Merge subtracted IDs
216-
all_subtracted_read_ids.extend(subtracted_ids);
217-
218-
// Add records to write buffer
219-
write_buffer.extend(records_to_write);
220-
221-
// Write in batches when buffer is full
222172
if write_buffer.len() >= CHUNK_SIZE {
223173
for record in &write_buffer {
224174
writer.write(record).map_err(PathoscopeError::Htslib)?;
225175
}
226176
write_buffer.clear();
227177
}
228-
}
229178

230-
// Write any remaining records in buffer
179+
Ok(())
180+
})?;
181+
231182
for record in &write_buffer {
232183
writer.write(record).map_err(PathoscopeError::Htslib)?;
233184
}

0 commit comments

Comments
 (0)