Skip to content

Commit 424801c

Browse files
authored
feat: reduce memory use when mapping default isolates
- Replace memory-buffering bowtie2 execution with streaming processing. - Reorganize candidate OTU code. - Remove unused functions and clean up dead code.
1 parent 900cc1e commit 424801c

File tree

4 files changed

+316
-481
lines changed

4 files changed

+316
-481
lines changed

src/candidates.rs

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
use std::collections::HashSet;
2+
use log::info;
3+
use pyo3::prelude::*;
4+
use pyo3::exceptions::PyIOError;
5+
6+
7+
8+
9+
/// Parse a single SAM line and extract candidate OTU information
10+
///
11+
/// This function processes one SAM line and determines if the read meets the score cutoff.
12+
/// Used for testing and by the streaming functions.
13+
///
14+
/// # Arguments
15+
/// * `line` - A SAM format line as string
16+
/// * `p_score_cutoff` - Minimum score threshold (AS:i score + read length)
17+
///
18+
/// # Returns
19+
/// Option containing the reference name if the read meets the cutoff, None otherwise
20+
pub fn parse_sam_line(line: &str, p_score_cutoff: f64) -> Option<String> {
21+
// Skip header lines and empty lines
22+
if line.starts_with('@') || line.trim().is_empty() {
23+
return None;
24+
}
25+
26+
// Parse SAM line - tab-separated format
27+
let fields: Vec<&str> = line.split('\t').collect();
28+
29+
// SAM format requires at least 11 fields
30+
if fields.len() < 11 {
31+
return None;
32+
}
33+
34+
// Extract key fields:
35+
// 1: FLAG
36+
// 2: RNAME (reference name)
37+
// 9: SEQ (read sequence)
38+
let flag: u16 = fields[1].parse().unwrap_or(4); // Default to unmapped if parse fails
39+
let ref_name = fields[2];
40+
let seq_len = fields[9].len() as f64;
41+
42+
// Skip unmapped reads (flag & 4 != 0) or reads mapping to "*"
43+
if (flag & 4) != 0 || ref_name == "*" {
44+
return None;
45+
}
46+
47+
// Find AS:i score in the optional fields (starting from field 11)
48+
let mut as_score: Option<f64> = None;
49+
for field in &fields[11..] {
50+
if let Some(stripped) = field.strip_prefix("AS:i:") {
51+
if let Ok(score) = stripped.parse::<i32>() {
52+
as_score = Some(score as f64);
53+
break;
54+
}
55+
}
56+
}
57+
58+
// Skip if no AS score found
59+
if let Some(as_score) = as_score {
60+
// Calculate total score (AS score + read length)
61+
let total_score = as_score + seq_len;
62+
63+
// Apply score cutoff
64+
if total_score >= p_score_cutoff {
65+
return Some(ref_name.to_string());
66+
}
67+
}
68+
69+
None
70+
}
71+
72+
/// Extract candidate OTU reference IDs from SAM text data
73+
///
74+
/// This function parses SAM format data directly from text without using rust-htslib.
75+
/// Used for testing and can be called by other functions that need SAM text parsing.
76+
///
77+
/// # Arguments
78+
/// * `sam_text` - Raw SAM format data as string
79+
/// * `p_score_cutoff` - Minimum score threshold (AS:i score + read length)
80+
///
81+
/// # Returns
82+
/// HashSet of reference IDs that have reads meeting the score cutoff
83+
pub fn extract_candidates_from_sam_text(
84+
sam_text: &str,
85+
p_score_cutoff: f64,
86+
) -> HashSet<String> {
87+
let mut candidate_otus = HashSet::new();
88+
89+
for line in sam_text.lines() {
90+
if let Some(ref_name) = parse_sam_line(line, p_score_cutoff) {
91+
candidate_otus.insert(ref_name);
92+
}
93+
}
94+
95+
candidate_otus
96+
}
97+
98+
/// Extract candidate OTU reference IDs by running bowtie2 directly with streaming
99+
///
100+
/// This function spawns a bowtie2 process directly from Rust and streams its output
101+
/// to avoid memory issues with large SAM files. It processes SAM lines as they arrive
102+
/// and returns only the unique reference IDs that meet the score cutoff.
103+
///
104+
/// # Arguments
105+
/// * `bowtie_index_path` - Path to the bowtie2 index
106+
/// * `read_paths` - List of paths to the input read files
107+
/// * `proc` - Number of processor threads for bowtie2
108+
/// * `p_score_cutoff` - Minimum score threshold (AS:i score + read length)
109+
///
110+
/// # Returns
111+
/// Set of reference IDs that have reads meeting the score cutoff
112+
pub fn find_candidate_otus_with_bowtie2(
113+
py: Python,
114+
bowtie_index_path: &str,
115+
read_paths: Vec<String>,
116+
proc: i32,
117+
p_score_cutoff: f64,
118+
) -> PyResult<HashSet<String>> {
119+
use std::process::{Command, Stdio};
120+
use std::io::{BufRead, BufReader};
121+
122+
info!("running bowtie2 directly from rust: index={}, reads={:?}, cutoff={}",
123+
bowtie_index_path, read_paths, p_score_cutoff);
124+
py.allow_threads(|| {
125+
let mut cmd = Command::new("bowtie2");
126+
cmd.arg("-p").arg(proc.to_string())
127+
.arg("--local")
128+
.arg("--no-unal")
129+
.arg("--score-min").arg("L,20,1.0")
130+
.arg("-N").arg("0")
131+
.arg("-L").arg("15")
132+
.arg("-x").arg(bowtie_index_path)
133+
.arg("-U").arg(read_paths.join(","))
134+
.stdout(Stdio::piped())
135+
.stderr(Stdio::piped());
136+
137+
info!("spawning bowtie2 process");
138+
let mut child = cmd.spawn()
139+
.map_err(|e| PyErr::new::<PyIOError, _>(format!("Failed to spawn bowtie2: {}", e)))?;
140+
141+
let stdout = child.stdout.take().unwrap();
142+
let reader = BufReader::new(stdout);
143+
144+
let mut candidate_otus = HashSet::new();
145+
let mut line_count = 0u64;
146+
let mut passing_count = 0u64;
147+
148+
for line_result in reader.lines() {
149+
let line = line_result
150+
.map_err(|e| PyErr::new::<PyIOError, _>(format!("Error reading bowtie2 output: {}", e)))?;
151+
152+
line_count += 1;
153+
154+
// Use the extracted SAM parsing function
155+
if let Some(ref_name) = parse_sam_line(&line, p_score_cutoff) {
156+
candidate_otus.insert(ref_name);
157+
passing_count += 1;
158+
}
159+
}
160+
161+
// Wait for bowtie2 to finish and check exit status
162+
let status = child.wait()
163+
.map_err(|e| PyErr::new::<PyIOError, _>(format!("Error waiting for bowtie2: {}", e)))?;
164+
165+
if !status.success() {
166+
// Read stderr for error details
167+
let stderr_output = if let Some(mut stderr) = child.stderr.take() {
168+
let mut buf = String::new();
169+
let _ = std::io::Read::read_to_string(&mut stderr, &mut buf);
170+
buf
171+
} else {
172+
"Unknown error".to_string()
173+
};
174+
175+
return Err(PyErr::new::<PyIOError, _>(format!(
176+
"bowtie2 failed with exit code {:?}: {}",
177+
status.code(),
178+
stderr_output
179+
)));
180+
}
181+
182+
info!("processed {} sam lines, {} passed cutoff, found {} unique otus",
183+
line_count, passing_count, candidate_otus.len());
184+
185+
Ok(candidate_otus)
186+
})
187+
}
188+
189+
190+
#[cfg(test)]
191+
mod tests {
192+
use super::*;
193+
194+
#[test]
195+
fn test_parse_sam_line_basic() {
196+
let line = "read1\t0\tref1\t100\t255\t50M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\t*\tAS:i:45";
197+
let result = parse_sam_line(line, 0.01);
198+
199+
// AS:i:45 + seq_len(50) = 95.0, should pass cutoff of 0.01
200+
assert_eq!(result, Some("ref1".to_string()));
201+
}
202+
203+
#[test]
204+
fn test_parse_sam_line_below_cutoff() {
205+
let line = "read1\t0\tref1\t100\t255\t50M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\t*\tAS:i:45";
206+
let result = parse_sam_line(line, 100.0);
207+
208+
// AS:i:45 + seq_len(50) = 95.0, should not pass cutoff of 100.0
209+
assert_eq!(result, None);
210+
}
211+
212+
#[test]
213+
fn test_parse_sam_line_unmapped() {
214+
let line = "read1\t4\t*\t0\t0\t*\t*\t0\t0\tAAAAA\t*";
215+
let result = parse_sam_line(line, 0.01);
216+
217+
// Unmapped read (flag & 4 != 0), should return None
218+
assert_eq!(result, None);
219+
}
220+
221+
#[test]
222+
fn test_parse_sam_line_no_as_score() {
223+
let line = "read1\t0\tref1\t100\t255\t50M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\t*";
224+
let result = parse_sam_line(line, 0.01);
225+
226+
// No AS:i score, should return None
227+
assert_eq!(result, None);
228+
}
229+
230+
#[test]
231+
fn test_parse_sam_line_header() {
232+
let line = "@HD\tVN:1.0\tSO:unsorted";
233+
let result = parse_sam_line(line, 0.01);
234+
235+
// Header line, should return None
236+
assert_eq!(result, None);
237+
}
238+
239+
#[test]
240+
fn test_extract_candidates_from_sam_text_basic() {
241+
let sam_data = "@HD\tVN:1.0\tSO:unsorted
242+
@SQ\tSN:ref1\tLN:1000
243+
@SQ\tSN:ref2\tLN:2000
244+
read1\t0\tref1\t100\t255\t50M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\t*\tAS:i:45
245+
read2\t0\tref2\t200\t255\t30M\t*\t0\t0\tTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT\t*\tAS:i:25
246+
read3\t0\tref1\t300\t255\t40M\t*\t0\t0\tCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC\t*\tAS:i:2";
247+
248+
let result = extract_candidates_from_sam_text(sam_data, 0.01);
249+
250+
// Should find 2 unique references since scores are:
251+
// read1: AS:i:45 + 50 = 95.0
252+
// read2: AS:i:25 + 30 = 55.0
253+
// read3: AS:i:2 + 40 = 42.0
254+
assert_eq!(result.len(), 2, "Should find 2 unique references");
255+
assert!(result.contains("ref1"), "Should contain ref1");
256+
assert!(result.contains("ref2"), "Should contain ref2");
257+
}
258+
259+
#[test]
260+
fn test_extract_candidates_from_sam_text_with_cutoff() {
261+
let sam_data = "@HD\tVN:1.0\tSO:unsorted
262+
read1\t0\tref1\t100\t255\t50M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\t*\tAS:i:45
263+
read2\t0\tref2\t200\t255\t30M\t*\t0\t0\tTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT\t*\tAS:i:25
264+
read3\t0\tref1\t300\t255\t40M\t*\t0\t0\tCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC\t*\tAS:i:2";
265+
266+
let result = extract_candidates_from_sam_text(sam_data, 50.0);
267+
268+
// Only read1 (95.0) and read2 (55.0) should pass, read3 (42.0) should be filtered
269+
assert_eq!(result.len(), 2, "Should find 2 references with high cutoff");
270+
assert!(result.contains("ref1"), "Should contain ref1");
271+
assert!(result.contains("ref2"), "Should contain ref2");
272+
}
273+
274+
#[test]
275+
fn test_extract_candidates_from_sam_text_very_high_cutoff() {
276+
let sam_data = "@HD\tVN:1.0\tSO:unsorted
277+
read1\t0\tref1\t100\t255\t50M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\t*\tAS:i:45";
278+
279+
let result = extract_candidates_from_sam_text(sam_data, 100.0);
280+
281+
// No reads should pass this cutoff (read1 is 95.0)
282+
assert_eq!(result.len(), 0, "Should find no references with very high cutoff");
283+
}
284+
285+
#[test]
286+
fn test_extract_candidates_from_sam_text_deduplication() {
287+
let sam_data = "@HD\tVN:1.0\tSO:unsorted
288+
read1\t0\tref1\t100\t255\t50M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\t*\tAS:i:45
289+
read2\t0\tref1\t200\t255\t30M\t*\t0\t0\tTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT\t*\tAS:i:25";
290+
291+
let result = extract_candidates_from_sam_text(sam_data, 0.01);
292+
293+
// Even if multiple reads map to ref1, it should only appear once in the set
294+
assert_eq!(result.len(), 1, "Should deduplicate reference names");
295+
assert!(result.contains("ref1"), "Should contain ref1");
296+
let ref1_count = result.iter().filter(|&r| r == "ref1").count();
297+
assert_eq!(ref1_count, 1, "Each reference should appear only once in the result set");
298+
}
299+
300+
}

src/lib.rs

Lines changed: 9 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod coverage;
33
mod matrix;
44
mod em;
55
mod parse_sam;
6-
mod stream_processor;
6+
mod candidates;
77
mod logging;
88

99
use subtraction::eliminate_subtraction;
@@ -110,11 +110,7 @@ fn rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
110110
m.add_function(wrap_pyfunction!(init_logging, m)?)?;
111111
m.add_function(wrap_pyfunction!(parse_isolate_scores, m)?)?;
112112
m.add_function(wrap_pyfunction!(run_expectation_maximization, m)?)?;
113-
m.add_function(wrap_pyfunction!(run_expectation_maximization_streaming, m)?)?;
114-
m.add_function(wrap_pyfunction!(run_eliminate_subtraction, m)?)?;
115-
m.add_function(wrap_pyfunction!(calculate_coverage_from_em_results, m)?)?;
116-
m.add_function(wrap_pyfunction!(find_candidate_otus, m)?)?;
117-
m.add_function(wrap_pyfunction!(find_candidate_otus_from_bytes, m)?)?;
113+
m.add_function(wrap_pyfunction!(find_candidate_otus_with_bowtie2, m)?)?;
118114
m.add_function(wrap_pyfunction!(subtract_fastq, m)?)?;
119115
m.add_function(wrap_pyfunction!(eliminate_subtraction_and_filter_fastq, m)?)?;
120116
Ok(())
@@ -338,53 +334,19 @@ pub fn run_expectation_maximization_streaming(
338334
})
339335
}
340336

341-
#[pyfunction]
342-
/// Extract candidate OTU reference IDs from an alignment file (SAM/BAM)
343-
///
344-
/// This function replaces the Python line-by-line processing in map_default_isolates
345-
/// with high-performance Rust processing. It reads a SAM/BAM file and extracts
346-
/// reference IDs for reads that meet the score cutoff.
347-
///
348-
/// # Arguments
349-
/// * `alignment_path` - Path to the SAM/BAM file to process
350-
/// * `p_score_cutoff` - Minimum score threshold (AS:i score + read length)
351-
///
352-
/// # Returns
353-
/// Set of reference IDs that have reads meeting the score cutoff
354-
pub fn find_candidate_otus(
355-
py: Python,
356-
alignment_path: String,
357-
p_score_cutoff: f64,
358-
) -> PyResult<HashSet<String>> {
359-
// Release the GIL during the CPU-intensive file processing
360-
py.allow_threads(|| {
361-
stream_processor::extract_candidate_otus_from_sam_file(&alignment_path, p_score_cutoff)
362-
.map_err(|e| PyErr::new::<PyIOError, _>(e.to_string()))
363-
})
364-
}
365337

366338
#[pyfunction]
367-
/// Extract candidate OTU reference IDs from SAM text data
339+
/// Extract candidate OTU reference IDs by running bowtie2 directly with streaming
368340
///
369-
/// This function parses SAM format data directly from bytes without using rust-htslib.
370-
/// It provides memory-based processing that doesn't require temporary files or unsafe code.
371-
///
372-
/// # Arguments
373-
/// * `sam_bytes` - Raw SAM format data as bytes (typically from subprocess stdout)
374-
/// * `p_score_cutoff` - Minimum score threshold (AS:i score + read length)
375-
///
376-
/// # Returns
377-
/// Set of reference IDs that have reads meeting the score cutoff
378-
pub fn find_candidate_otus_from_bytes(
341+
/// This is a PyO3 wrapper around the function in the candidates module.
342+
pub fn find_candidate_otus_with_bowtie2(
379343
py: Python,
380-
sam_bytes: &[u8],
344+
bowtie_index_path: &str,
345+
read_paths: Vec<String>,
346+
proc: i32,
381347
p_score_cutoff: f64,
382348
) -> PyResult<HashSet<String>> {
383-
// Release the GIL during the CPU-intensive SAM parsing
384-
py.allow_threads(|| {
385-
stream_processor::extract_candidate_otus_from_bytes(sam_bytes, p_score_cutoff)
386-
.map_err(|e| PyErr::new::<PyIOError, _>(e.to_string()))
387-
})
349+
candidates::find_candidate_otus_with_bowtie2(py, bowtie_index_path, read_paths, proc, p_score_cutoff)
388350
}
389351

390352
#[pyfunction]

0 commit comments

Comments
 (0)