Skip to content

Commit

Permalink
Moved to Axum and async (#33)
Browse files Browse the repository at this point in the history
* First bit of refactor of Big Tent to using async and the axum web framework

Signed-off-by: David Pollak <[email protected]>

* Implemented additional features... should be at parity with old version

Signed-off-by: David Pollak <[email protected]>

* Addressed an issue with a nightly feature I was using and also started addressing some performance issues with BufReader

Signed-off-by: David Pollak <[email protected]>

* Fixed performance issues using async bufreader. Fixed routing problems

Signed-off-by: David Pollak <[email protected]>

* Cleaned up some stuff based on PR comments/feedback

Signed-off-by: David Pollak <[email protected]>

* More typo, etc. cleanup

Signed-off-by: David Pollak <[email protected]>

* More cleanup based on Aria's feedback

Signed-off-by: David Pollak <[email protected]>

---------

Signed-off-by: David Pollak <[email protected]>
Signed-off-by: David Pollak <[email protected]>
  • Loading branch information
dpp authored Feb 10, 2025
1 parent ca743e3 commit edff1f6
Show file tree
Hide file tree
Showing 19 changed files with 1,404 additions and 1,610 deletions.
1,097 changes: 530 additions & 567 deletions Cargo.lock

Large diffs are not rendered by default.

40 changes: 20 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,48 @@ name = "bigtent"
path = "src/main.rs"

[dependencies]
anyhow = "1.0.86"
regex = "1.10.4"
anyhow = "1.0.95"
regex = "1.11"
md5 = "0.7.0"
sha2 = "0.10.8"
# boring = "4.7.0"
hex = "0.4.3"
bytes = "1.6.0"
bytes = "1.10"
hex-literal = "0.4.1"
shellexpand = "3.1.0"
chrono = "0.4.38"
chrono = "0.4"

# Web Framework stuff
rouille = "3.6.2"
url = "2.5.0"
serde = {version = "1.0.202", features = ["std", "derive"]}
serde_json = "1.0.117"
url = "2.5"
serde = { version = "1.0", features = ["std", "derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"

im = {version = "15.1.0", features = ["serde"]}
axum = {version = "0.8", features = ["json", "macros"]}
axum-streams = {version="0.20", features=["json", "csv"]}
futures = "0.3"
im = { version = "15.1.0", features = ["serde"] }
# tracing = "0.1.40"
# tracing-subscriber = { version = "0.3.18", features = ["env-filter"]}


# Execution stuff
tokio = { version = "1", features = ["macros", "rt-multi-thread", "io-util", "fs", "net", ] }
tokio-util = {version = "0.7", features = ["io"]}
tokio-stream = {version = "0.1"}
signal-hook = "0.3.17"
arc-swap = "1.7.1"
clap = { version = "4.5.4", features = ["derive"] }
toml = "0.8.13"
clap = { version = "4.5", features = ["derive"] }
toml = "0.8"
thousands = "0.2.0"
pipe = "0.4.0"

flume = {version = "0.11.0", features = ["spin"]}
flume = { version = "0.11", features = ["spin"] }
scopeguard = "1.2.0"

rand = "0.8.5"
rand = "0.9.0"

# Logging
log = "0.4.21"
env_logger = "0.11.3"

# override a dependency
idna = "1.0.3"
log = "0.4"
env_logger = "0.11"

[features]

Expand Down
4 changes: 2 additions & 2 deletions info/files_and_formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub struct Item {

`identifier` is the primary key of the record.

`reference` is a tuple of `u64` containing the 8 most significant bytes of the the SHA256 of the file (note that this will be 0 on disk and is populated by
`reference` is a tuple of `u64` containing the 8 most significant bytes of the SHA256 of the file (note that this will be 0 on disk and is populated by
Big Tent when serving the record) and the offset of the record in the file (technically, the offset to the length field).

`connections`: an ordered list of a tuple of `EdgeType` and `String` where `EdgeType` is an enumeration of `AliasTo`, `AliasFrom`, `Contains`, `ContainedBy`,
Expand All @@ -197,7 +197,7 @@ pub struct ItemMetaData {

`file_names` contains the names of the files that were hashed to create the metadata record.

`file_type` the types of the files identified by the the tool that generated the hash (likely Goat Rodeo)
`file_type` the types of the files identified by the tool that generated the hash (likely Goat Rodeo)

`file_sub_type` the subtype of the file.

Expand Down
86 changes: 44 additions & 42 deletions src/cluster_writer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#[cfg(not(test))]
use log::{info, trace};
use tokio::{fs::File, io::AsyncWriteExt};

use std::{
collections::{BTreeMap, BTreeSet, HashSet},
fs::{self, File},
path::PathBuf,
time::Instant,
};
Expand Down Expand Up @@ -44,8 +44,11 @@ pub struct ClusterWriter {
}

impl ClusterWriter {
const MAX_INDEX_CNT: usize = 25 * 1024 * 1024; // 25M
const MAX_DATA_FILE_SIZE: u64 = 15 * 1024 * 1024 * 1024; // 15GB
// 15GB
const MAX_DATA_FILE_SIZE: u64 = 15 * 1024 * 1024 * 1024;
// 25M
const MAX_INDEX_CNT: usize = 25 * 1024 * 1024;

#[inline]
fn make_dest_buffer() -> ShaWriter {
ShaWriter::new(10_000_000)
Expand All @@ -56,10 +59,10 @@ impl ClusterWriter {
Vec::with_capacity(10_000)
}

pub fn new<I: Into<PathBuf>>(dir: I) -> Result<ClusterWriter> {
pub async fn new<I: Into<PathBuf>>(dir: I) -> Result<ClusterWriter> {
let dir_path: PathBuf = dir.into();
if !dir_path.exists() {
fs::create_dir_all(&dir_path)?;
tokio::fs::create_dir_all(&dir_path).await?;
}
if !dir_path.is_dir() {
bail!(
Expand All @@ -80,7 +83,7 @@ impl ClusterWriter {
items_written: 0,
};

my_writer.write_data_envelope_start()?;
my_writer.write_data_envelope_start().await?;

Ok(my_writer)
}
Expand All @@ -93,17 +96,15 @@ impl ClusterWriter {
self.previous_position
}

pub fn write_item(&mut self, item: Item) -> Result<()>
{
use std::io::Write;
pub async fn write_item(&mut self, item: Item) -> Result<()> {
let the_hash = md5hash_str(&item.identifier);
let cur_pos = self.dest_data.pos();

let item_bytes = serde_cbor::to_vec(&item)?;

write_int(&mut self.dest_data, item_bytes.len() as u32)?;
write_int(&mut self.dest_data, item_bytes.len() as u32).await?;

(&mut self.dest_data).write_all(&item_bytes)?;
(&mut self.dest_data).write_all(&item_bytes).await?;
self.index_info.push(IndexInfo {
hash: the_hash,
offset: cur_pos,
Expand All @@ -115,7 +116,7 @@ impl ClusterWriter {
if self.index_info.len() > ClusterWriter::MAX_INDEX_CNT
|| self.dest_data.pos() > ClusterWriter::MAX_DATA_FILE_SIZE
{
self.write_data_and_index()?;
self.write_data_and_index().await?;
}

self.items_written += 1;
Expand All @@ -125,23 +126,22 @@ impl ClusterWriter {
Ok(())
}

pub fn finalize_cluster(&mut self) -> Result<PathBuf> {
use std::io::Write;
pub async fn finalize_cluster(&mut self) -> Result<PathBuf> {
if self.previous_position != 0 {
self.write_data_and_index()?;
self.write_data_and_index().await?;
}
let mut cluster_file = vec![];
{
let cluster_writer = &mut cluster_file;
write_int(cluster_writer, ClusterFileMagicNumber)?;
write_int(cluster_writer, ClusterFileMagicNumber).await?;
let cluster_env = ClusterFileEnvelope {
version: 1,
magic: ClusterFileMagicNumber,
info: BTreeMap::new(),
data_files: self.seen_data_files.iter().map(|v| *v).collect(),
index_files: self.index_files.iter().map(|v| *v).collect(),
};
write_envelope(cluster_writer, &cluster_env)?;
write_envelope(cluster_writer, &cluster_env).await?;
}

// compute sha256 of index
Expand All @@ -151,21 +151,20 @@ impl ClusterWriter {
// write the .grc file

let grc_file_path = path_plus_timed(&self.dir, &format!("{:016x}.grc", grc_sha));
let mut grc_file = File::create(&grc_file_path)?;
grc_file.write_all(&cluster_file)?;
grc_file.flush()?;
let mut grc_file = File::create(&grc_file_path).await?;
grc_file.write_all(&cluster_file).await?;
grc_file.flush().await?;

Ok(grc_file_path)
}

pub fn write_data_and_index(&mut self) -> Result<()> {
use std::io::Write;
pub async fn write_data_and_index(&mut self) -> Result<()> {
let mut grd_sha = 0;
if self.previous_position != 0 {
write_short_signed(&mut self.dest_data, -1)?; // a marker that says end of file
write_short_signed(&mut self.dest_data, -1).await?; // a marker that says end of file

// write final back-pointer (to the last entry record)
write_long(&mut self.dest_data, self.previous_position)?;
write_long(&mut self.dest_data, self.previous_position).await?;

info!(
"computing grd sha {:?}",
Expand All @@ -181,9 +180,9 @@ impl ClusterWriter {

let grd_file_path = self.dir.join(format!("{:016x}.grd", grd_sha));

let mut grd_file = File::create(grd_file_path)?;
grd_file.write_all(&data)?;
grd_file.flush()?;
let mut grd_file = File::create(grd_file_path).await?;
grd_file.write_all(&data).await?;
grd_file.flush().await?;

info!(
"computed grd sha and wrote at {:?}",
Expand All @@ -192,7 +191,7 @@ impl ClusterWriter {
self.previous_position = 0;
self.previous_hash = grd_sha;
self.seen_data_files.insert(grd_sha);
self.write_data_envelope_start()?;
self.write_data_envelope_start().await?;
}

if self.index_info.len() > 0 {
Expand All @@ -209,7 +208,7 @@ impl ClusterWriter {
let mut index_file = vec![];
{
let index_writer = &mut index_file;
write_int(index_writer, IndexFileMagicNumber)?;
write_int(index_writer, IndexFileMagicNumber).await?;
let index_env = IndexEnvelope {
version: 1,
magic: IndexFileMagicNumber,
Expand All @@ -218,9 +217,9 @@ impl ClusterWriter {
encoding: "MD5/Long/Long".into(),
info: BTreeMap::new(),
};
write_envelope(index_writer, &index_env)?;
write_envelope(index_writer, &index_env).await?;
for v in &self.index_info {
index_writer.write_all(&v.hash)?;
std::io::Write::write_all(index_writer, &v.hash)?;
write_long(
index_writer,
if v.file_hash == 0 {
Expand All @@ -231,8 +230,9 @@ impl ClusterWriter {
} else {
v.file_hash
},
)?;
write_long(index_writer, v.offset)?;
)
.await?;
write_long(index_writer, v.offset).await?;
}
}

Expand All @@ -248,9 +248,9 @@ impl ClusterWriter {
// write the .gri file
{
let gri_file_path = self.dir.join(format!("{:016x}.gri", gri_sha));
let mut gri_file = File::create(gri_file_path)?;
gri_file.write_all(&index_file)?;
gri_file.flush()?;
let mut gri_file = File::create(gri_file_path).await?;
gri_file.write_all(&index_file).await?;
gri_file.flush().await?;
}
info!(
"computed gri sha and wrote index file {:?}",
Expand All @@ -262,6 +262,7 @@ impl ClusterWriter {
self.dump_file_names();
Ok(())
}

fn dump_file_names(&self) {
trace!("Data");
for d in &self.seen_data_files {
Expand All @@ -273,22 +274,23 @@ impl ClusterWriter {
trace!("{:016x}", d);
}
}
pub fn add_index(&mut self, hash: MD5Hash, file_hash: u64, offset: u64) -> Result<()> {

pub async fn add_index(&mut self, hash: MD5Hash, file_hash: u64, offset: u64) -> Result<()> {
self.index_info.push(IndexInfo {
hash,
offset,
file_hash,
});

if self.index_info.len() > ClusterWriter::MAX_INDEX_CNT {
self.write_data_and_index()?;
self.write_data_envelope_start()?;
self.write_data_and_index().await?;
self.write_data_envelope_start().await?;
}
Ok(())
}

fn write_data_envelope_start(&mut self) -> Result<()> {
write_int(&mut self.dest_data, DataFileMagicNumber)?;
async fn write_data_envelope_start(&mut self) -> Result<()> {
write_int(&mut self.dest_data, DataFileMagicNumber).await?;

let data_envelope = DataFileEnvelope {
version: 1,
Expand All @@ -299,7 +301,7 @@ impl ClusterWriter {
info: BTreeMap::new(),
};

write_envelope(&mut self.dest_data, &data_envelope)?;
write_envelope(&mut self.dest_data, &data_envelope).await?;

self.previous_position = 0;
Ok(())
Expand Down
22 changes: 6 additions & 16 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use anyhow::{bail, Result};
use clap::Parser;
use tokio::fs::File;
use std::{
fs::File,
io::Read,
net::{SocketAddr, ToSocketAddrs},
path::{Path, PathBuf},
};
Expand All @@ -15,12 +14,6 @@ pub struct Args {
#[arg(short, long)]
pub conf: Option<PathBuf>,

/// Number of threads to allow for servicing.
/// A small number if on spinning disks, larger
/// for SSD
#[arg(short, long)]
pub threads: Option<u16>,

/// hostname to bind to
#[arg(long)]
pub host: Vec<String>,
Expand Down Expand Up @@ -48,25 +41,22 @@ pub struct Args {
}

impl Args {
pub fn read_conf_file(&self) -> Result<Table> {
pub async fn read_conf_file(&self) -> Result<Table> {
use tokio::io::AsyncReadExt;
let mut conf_file = File::open(match &self.conf {
Some(path) if path.exists() && path.is_file() => path,
_ => {
bail!("A configuration file must be supplied")
}
})?;
}).await?;
let mut contents = String::new();
conf_file.read_to_string(&mut contents)?;
conf_file.read_to_string(&mut contents).await?;
match contents.parse::<Table>() {
Ok(v) => Ok(v),
Err(e) => bail!("Must provide a valid toml file: {:?}", e),
}
}

pub fn num_threads(&self) -> u16 {
self.threads.unwrap_or(7)
}


pub fn conf_file(&self) -> Result<PathBuf> {
match &self.conf {
Some(n) if n.exists() && n.is_file() => Ok(
Expand Down
Loading

0 comments on commit edff1f6

Please sign in to comment.