Skip to content

Memory leaks #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 30, 2023
11 changes: 7 additions & 4 deletions fakeminimap2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
minimap2 = { path = "..", features = ["map-file"] }
clap = {version = "4.0.10", features = ["derive"] }
flate2 = { version = "1.0.17", features = ["zlib-ng"], default-features = false }
minimap2 = { path = ".."}
crossbeam = "0.8.2"
needletail = "0.5.1"
fern = { version = "0.6.2", features = ["colored", "chrono"] }
humantime = "2.1.0"
log = "0.4.17"
colored = { version = "2.0.0", features = ["no-color"] }

[profile.release]
opt-level = 3
Expand All @@ -20,4 +23,4 @@ codegen-units = 1
debug = true

[profile.dev.package."*"]
opt-level = 3
opt-level = 3
4 changes: 4 additions & 0 deletions fakeminimap2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Multithreaded example of mapping

Using crossbeam and standard library threading, we can multi thread alignments sharing the same references to the index.
Small example files are provided,
Binary file added fakeminimap2/hg38_chr_M.mmi
Binary file not shown.
244 changes: 148 additions & 96 deletions fakeminimap2/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,140 +1,192 @@
use std::io::{BufReader, Read};
use std::sync::Arc;
use std::mem;
use std::sync::{Arc, Mutex};

use clap::Parser;
use crossbeam::queue::ArrayQueue;
use flate2::read::GzDecoder;
use fern::colors::{Color, ColoredLevelConfig};
use log::LevelFilter::*;
use minimap2::*;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Cli {
reference: String,
query: String,
use needletail::parse_fastx_file;
use needletail::{FastxReader, Sequence};
use std::time::{Duration, SystemTime};
#[macro_use]
extern crate log;
/// Set up logging for this module.
pub fn set_up_logging(level: log::LevelFilter) {
// configure colors for the whole line
let colors_line = ColoredLevelConfig::new()
.error(Color::Red)
.warn(Color::Yellow)
// we actually don't need to specify the color for debug and info, they are white by default
.info(Color::Green)
.debug(Color::Magenta)
// depending on the terminals color scheme, this is the same as the background color
.trace(Color::BrightBlack);

let colors_level = colors_line.info(Color::Green);
fern::Dispatch::new()
.format(move |out, message, record| {
out.finish(format_args!(
"{color_line}[{date} {level} {target} {color_line}] {message}\x1B[0m",
color_line = format_args!(
"\x1B[{}m",
colors_line.get_color(&record.level()).to_fg_str()
),
date = humantime::format_rfc3339_seconds(SystemTime::now()),
target = record.target(),
level = colors_level.color(record.level()),
message = message,
));
}) // set the default log level. to filter out verbose log messages from dependencies, set
// this to Warn and overwrite the log level for your crate.
.level(log::LevelFilter::Warn)
// change log levels for individual modules. Note: This looks for the record's target
// field which defaults to the module path but can be overwritten with the `target`
// parameter:
// `info!(target="special_target", "This log message is about special_target");`
.level_for("fakeminimap2", level)
// output to stdout
.chain(std::io::stdout())
.apply()
.unwrap();
}

enum WorkQueue<T> {
Work(T),
Done,
Result(T),
}

fn main() {
let cli = Cli::parse();

let mut aligner = Aligner {
threads: 8,
..map_ont()
}
.with_cigar()
.with_index(&cli.reference, None)
.expect("Unable to build index");

let query_file = cli.query;

// Read the first 50 bytes of the file
let mut f = std::fs::File::open(&query_file).unwrap();
let mut buffer = [0; 50];
f.read(&mut buffer).unwrap();
// Close the file
drop(f);

// Check if the file is gzipped
let compression_type = detect_compression_format(&buffer).unwrap();
if compression_type != CompressionType::NONE && compression_type != CompressionType::GZIP {
panic!("Compression type is not supported");
/// Transform a nucleic acid sequence into its "normalized" form.
///
/// The normalized form is:
/// - only AGCTN and possibly - (for gaps)
/// - strip out any whitespace or line endings
/// - lowercase versions of these are uppercased
/// - U is converted to T (make everything a DNA sequence)
/// - some other punctuation is converted to gaps
/// - IUPAC bases may be converted to N's depending on the parameter passed in
/// - everything else is considered a N
pub fn normalize(seq: &[u8]) -> Option<Vec<u8>> {
let mut buf: Vec<u8> = Vec::with_capacity(seq.len());
let mut changed: bool = false;

for n in seq.iter() {
let (new_char, char_changed) = match (*n, false) {
c @ (b'A', _) | c @ (b'C', _) | c @ (b'G', _) | c @ (b'T', _) => (c.0, false),
(b'a', _) => (b'A', true),
(b'c', _) => (b'C', true),
(b'g', _) => (b'G', true),
// normalize uridine to thymine
(b't', _) | (b'u', _) | (b'U', _) => (b'T', true),
// normalize gaps
(b'.', _) | (b'~', _) => (b'-', true),
// remove all whitespace and line endings
(b' ', _) | (b'\t', _) | (b'\r', _) | (b'\n', _) => (b' ', true),
// everything else is an N
_ => (b' ', true),
};
changed = changed || char_changed;
if new_char != b' ' {
buf.push(new_char);
}
}

// If gzipped, open it with a reader...
let mut reader: Box<dyn Read> = if compression_type == CompressionType::GZIP {
Box::new(GzDecoder::new(std::fs::File::open(&query_file).unwrap()))
if changed {
Some(buf)
} else {
Box::new(std::fs::File::open(&query_file).unwrap())
};

// Check the file type
let mut buffer = [0; 4];
reader.read(&mut buffer).unwrap();
let file_type = detect_file_format(&buffer).unwrap();
if file_type != FileFormat::FASTA && file_type != FileFormat::FASTQ {
panic!("File type is not supported");
None
}
}

let work_queue = Arc::new(ArrayQueue::<WorkQueue<Sequence>>::new(128));
let results_queue = Arc::new(ArrayQueue::<WorkQueue<Vec<Mapping>>>::new(128));
fn main() {
set_up_logging(Info);
debug!("Making aligner");
let aligner = Aligner::builder()
.map_ont()
.with_cigar()
.with_index("hg38_chr_M.mmi", None)
.expect("Unable to build index");
info!("Made aligner");
let mut reader: Box<dyn FastxReader> = parse_fastx_file("testing_fake_minimap2_chrM.fasta")
.unwrap_or_else(|_| panic!("Can't find FASTA file at testing_r10_fasta.fasta"));

let work_queue = Arc::new(ArrayQueue::<WorkQueue<String>>::new(20000));
let results_queue = Arc::new(ArrayQueue::<WorkQueue<Vec<Mapping>>>::new(20000));
// TODO: Make threads clap argument

// 8 threads
for i in 0..8 {
let sequences = Arc::new(Mutex::new(vec![]));
let mut x = sequences.lock().unwrap();
while let Some(record) = reader.next() {
let record = record.unwrap();
x.push(format!(
">{}\n{}",
String::from_utf8(record.id().to_vec()).unwrap(),
String::from_utf8(record.sequence().to_vec()).unwrap()
));
}
mem::drop(x);
// spawn 8 threads
info!("Spawnging threads");
for _ in 0..8 {
let work_queue = Arc::clone(&work_queue);
let results_queue = Arc::clone(&results_queue);

// This could/should be done in the new thread, but wanting to test out the ability to move...
let mut aligner = aligner.clone();

debug!("Get clones");
let aligner = aligner.clone();
debug!("Cloned aligner");
std::thread::spawn(move || loop {
let backoff = crossbeam::utils::Backoff::new();
let work = work_queue.pop();
match work {
Some(WorkQueue::Work(sequence)) => {
let alignment = aligner
.map(&sequence.sequence.unwrap(), false, false, None, None)
.map(sequence.as_bytes(), true, false, None, None)
.expect("Unable to align");
results_queue.push(WorkQueue::Work(alignment));
}
Some(WorkQueue::Done) => {
results_queue.push(WorkQueue::Done);
break;
match results_queue.push(WorkQueue::Result(alignment)) {
Ok(()) => {}
Err(_) => {
backoff.snooze();
}
}
}
Some(_) => {}
None => {
backoff.snooze();
}
}
});
}
let sequences_borrow = Arc::clone(&sequences);

let wq = Arc::clone(&work_queue);
std::thread::spawn(move || {
// If gzipped, open it with a reader...
let reader: Box<dyn Read> = if compression_type == CompressionType::GZIP {
Box::new(GzDecoder::new(std::fs::File::open(&query_file).unwrap()))
} else {
Box::new(std::fs::File::open(query_file).unwrap())
};

let mut reader = BufReader::new(reader);

let mut reader: Box<dyn Iterator<Item = Result<Sequence, &'static str>>> =
if file_type == FileFormat::FASTA {
Box::new(Fasta::from_buffer(&mut reader))
} else {
Box::new(Fastq::from_buffer(&mut reader))
// create threead and just spam push things onto the to be mapped queue
std::thread::spawn(move || loop {
let z = &sequences_borrow.lock().unwrap();
for seq in z.iter() {
match wq.push(WorkQueue::Work(seq.clone())) {
Ok(()) => {}
Err(_) => {
// queue full, sleep and try again
std::thread::sleep(Duration::from_millis(1));
}
};

for seq in reader {
let seq = seq.unwrap();
wq.push(WorkQueue::Work(seq));
}

for _ in 0..8 {
wq.push(WorkQueue::Done);
}
});

// Loop and pull results down
let mut num_alignments: usize = 0;
loop {
debug!("{}", results_queue.len());
let result = results_queue.pop();
match result {
Some(WorkQueue::Work(alignment)) => {
println!("{:#?}", alignment);
},
Some(WorkQueue::Done) => {
break;
},
Some(WorkQueue::Result(_alignment)) => num_alignments += 1,
Some(_) => {
error!("Found a random variant in the results queue")
}
None => {
std::thread::sleep(std::time::Duration::from_millis(100));
warn!("Popped nothing");
std::thread::sleep(std::time::Duration::from_millis(2));
}



}

std::thread::sleep(std::time::Duration::from_millis(1));
info!("Iteration over, total alignments {}", num_alignments);
}
// Work thread
}
Loading