Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved to Axum and async #33

Merged
merged 8 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,097 changes: 530 additions & 567 deletions Cargo.lock

Large diffs are not rendered by default.

40 changes: 20 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,48 @@ name = "bigtent"
path = "src/main.rs"

[dependencies]
anyhow = "1.0.86"
regex = "1.10.4"
anyhow = "1.0.95"
regex = "1.11"
md5 = "0.7.0"
sha2 = "0.10.8"
# boring = "4.7.0"
hex = "0.4.3"
bytes = "1.6.0"
bytes = "1.10"
hex-literal = "0.4.1"
shellexpand = "3.1.0"
chrono = "0.4.38"
chrono = "0.4"

# Web Framework stuff
rouille = "3.6.2"
url = "2.5.0"
serde = {version = "1.0.202", features = ["std", "derive"]}
serde_json = "1.0.117"
url = "2.5"
serde = { version = "1.0", features = ["std", "derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"

im = {version = "15.1.0", features = ["serde"]}
axum = {version = "0.8", features = ["json", "macros"]}
axum-streams = {version="0.20", features=["json", "csv"]}
futures = "0.3"
im = { version = "15.1.0", features = ["serde"] }
# tracing = "0.1.40"
# tracing-subscriber = { version = "0.3.18", features = ["env-filter"]}


# Execution stuff
tokio = { version = "1", features = ["macros", "rt-multi-thread", "io-util", "fs", "net", ] }
tokio-util = {version = "0.7", features = ["io"]}
tokio-stream = {version = "0.1"}
signal-hook = "0.3.17"
arc-swap = "1.7.1"
clap = { version = "4.5.4", features = ["derive"] }
toml = "0.8.13"
clap = { version = "4.5", features = ["derive"] }
toml = "0.8"
thousands = "0.2.0"
pipe = "0.4.0"

flume = {version = "0.11.0", features = ["spin"]}
flume = { version = "0.11", features = ["spin"] }
scopeguard = "1.2.0"

rand = "0.8.5"
rand = "0.9.0"

# Logging
log = "0.4.21"
env_logger = "0.11.3"

# override a dependency
idna = "1.0.3"
log = "0.4"
env_logger = "0.11"

[features]

Expand Down
4 changes: 2 additions & 2 deletions info/files_and_formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub struct Item {

`identifier` is the primary key of the record.

`reference` is a tuple of `u64` containing the 8 most significant bytes of the the SHA256 of the file (note that this will be 0 on disk and is populated by
`reference` is a tuple of `u64` containing the 8 most significant bytes of the SHA256 of the file (note that this will be 0 on disk and is populated by
Big Tent when serving the record) and the offset of the record in the file (technically, the offset to the length field).

`connections`: an ordered list of a tuple of `EdgeType` and `String` where `EdgeType` is an enumeration of `AliasTo`, `AliasFrom`, `Contains`, `ContainedBy`,
Expand All @@ -197,7 +197,7 @@ pub struct ItemMetaData {

`file_names` contains the names of the files that were hashed to create the metadata record.

`file_type` the types of the files identified by the the tool that generated the hash (likely Goat Rodeo)
`file_type` the types of the files identified by the tool that generated the hash (likely Goat Rodeo)

`file_sub_type` the subtype of the file.

Expand Down
86 changes: 44 additions & 42 deletions src/cluster_writer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#[cfg(not(test))]
use log::{info, trace};
use tokio::{fs::File, io::AsyncWriteExt};

use std::{
collections::{BTreeMap, BTreeSet, HashSet},
fs::{self, File},
path::PathBuf,
time::Instant,
};
Expand Down Expand Up @@ -44,8 +44,11 @@ pub struct ClusterWriter {
}

impl ClusterWriter {
const MAX_INDEX_CNT: usize = 25 * 1024 * 1024; // 25M
const MAX_DATA_FILE_SIZE: u64 = 15 * 1024 * 1024 * 1024; // 15GB
// 15GB
const MAX_DATA_FILE_SIZE: u64 = 15 * 1024 * 1024 * 1024;
// 25M
const MAX_INDEX_CNT: usize = 25 * 1024 * 1024;

#[inline]
fn make_dest_buffer() -> ShaWriter {
ShaWriter::new(10_000_000)
Expand All @@ -56,10 +59,10 @@ impl ClusterWriter {
Vec::with_capacity(10_000)
}

pub fn new<I: Into<PathBuf>>(dir: I) -> Result<ClusterWriter> {
pub async fn new<I: Into<PathBuf>>(dir: I) -> Result<ClusterWriter> {
let dir_path: PathBuf = dir.into();
if !dir_path.exists() {
fs::create_dir_all(&dir_path)?;
tokio::fs::create_dir_all(&dir_path).await?;
}
if !dir_path.is_dir() {
bail!(
Expand All @@ -80,7 +83,7 @@ impl ClusterWriter {
items_written: 0,
};

my_writer.write_data_envelope_start()?;
my_writer.write_data_envelope_start().await?;

Ok(my_writer)
}
Expand All @@ -93,17 +96,15 @@ impl ClusterWriter {
self.previous_position
}

pub fn write_item(&mut self, item: Item) -> Result<()>
{
use std::io::Write;
pub async fn write_item(&mut self, item: Item) -> Result<()> {
let the_hash = md5hash_str(&item.identifier);
let cur_pos = self.dest_data.pos();

let item_bytes = serde_cbor::to_vec(&item)?;

write_int(&mut self.dest_data, item_bytes.len() as u32)?;
write_int(&mut self.dest_data, item_bytes.len() as u32).await?;

(&mut self.dest_data).write_all(&item_bytes)?;
(&mut self.dest_data).write_all(&item_bytes).await?;
self.index_info.push(IndexInfo {
hash: the_hash,
offset: cur_pos,
Expand All @@ -115,7 +116,7 @@ impl ClusterWriter {
if self.index_info.len() > ClusterWriter::MAX_INDEX_CNT
|| self.dest_data.pos() > ClusterWriter::MAX_DATA_FILE_SIZE
{
self.write_data_and_index()?;
self.write_data_and_index().await?;
}

self.items_written += 1;
Expand All @@ -125,23 +126,22 @@ impl ClusterWriter {
Ok(())
}

pub fn finalize_cluster(&mut self) -> Result<PathBuf> {
use std::io::Write;
pub async fn finalize_cluster(&mut self) -> Result<PathBuf> {
if self.previous_position != 0 {
self.write_data_and_index()?;
self.write_data_and_index().await?;
}
let mut cluster_file = vec![];
{
let cluster_writer = &mut cluster_file;
write_int(cluster_writer, ClusterFileMagicNumber)?;
write_int(cluster_writer, ClusterFileMagicNumber).await?;
let cluster_env = ClusterFileEnvelope {
version: 1,
magic: ClusterFileMagicNumber,
info: BTreeMap::new(),
data_files: self.seen_data_files.iter().map(|v| *v).collect(),
index_files: self.index_files.iter().map(|v| *v).collect(),
};
write_envelope(cluster_writer, &cluster_env)?;
write_envelope(cluster_writer, &cluster_env).await?;
}

// compute sha256 of index
Expand All @@ -151,21 +151,20 @@ impl ClusterWriter {
// write the .grc file

let grc_file_path = path_plus_timed(&self.dir, &format!("{:016x}.grc", grc_sha));
let mut grc_file = File::create(&grc_file_path)?;
grc_file.write_all(&cluster_file)?;
grc_file.flush()?;
let mut grc_file = File::create(&grc_file_path).await?;
grc_file.write_all(&cluster_file).await?;
grc_file.flush().await?;

Ok(grc_file_path)
}

pub fn write_data_and_index(&mut self) -> Result<()> {
use std::io::Write;
pub async fn write_data_and_index(&mut self) -> Result<()> {
let mut grd_sha = 0;
if self.previous_position != 0 {
write_short_signed(&mut self.dest_data, -1)?; // a marker that says end of file
write_short_signed(&mut self.dest_data, -1).await?; // a marker that says end of file

// write final back-pointer (to the last entry record)
write_long(&mut self.dest_data, self.previous_position)?;
write_long(&mut self.dest_data, self.previous_position).await?;

info!(
"computing grd sha {:?}",
Expand All @@ -181,9 +180,9 @@ impl ClusterWriter {

let grd_file_path = self.dir.join(format!("{:016x}.grd", grd_sha));

let mut grd_file = File::create(grd_file_path)?;
grd_file.write_all(&data)?;
grd_file.flush()?;
let mut grd_file = File::create(grd_file_path).await?;
grd_file.write_all(&data).await?;
grd_file.flush().await?;

info!(
"computed grd sha and wrote at {:?}",
Expand All @@ -192,7 +191,7 @@ impl ClusterWriter {
self.previous_position = 0;
self.previous_hash = grd_sha;
self.seen_data_files.insert(grd_sha);
self.write_data_envelope_start()?;
self.write_data_envelope_start().await?;
}

if self.index_info.len() > 0 {
Expand All @@ -209,7 +208,7 @@ impl ClusterWriter {
let mut index_file = vec![];
{
let index_writer = &mut index_file;
write_int(index_writer, IndexFileMagicNumber)?;
write_int(index_writer, IndexFileMagicNumber).await?;
let index_env = IndexEnvelope {
version: 1,
magic: IndexFileMagicNumber,
Expand All @@ -218,9 +217,9 @@ impl ClusterWriter {
encoding: "MD5/Long/Long".into(),
info: BTreeMap::new(),
};
write_envelope(index_writer, &index_env)?;
write_envelope(index_writer, &index_env).await?;
for v in &self.index_info {
index_writer.write_all(&v.hash)?;
std::io::Write::write_all(index_writer, &v.hash)?;
write_long(
index_writer,
if v.file_hash == 0 {
Expand All @@ -231,8 +230,9 @@ impl ClusterWriter {
} else {
v.file_hash
},
)?;
write_long(index_writer, v.offset)?;
)
.await?;
write_long(index_writer, v.offset).await?;
}
}

Expand All @@ -248,9 +248,9 @@ impl ClusterWriter {
// write the .gri file
{
let gri_file_path = self.dir.join(format!("{:016x}.gri", gri_sha));
let mut gri_file = File::create(gri_file_path)?;
gri_file.write_all(&index_file)?;
gri_file.flush()?;
let mut gri_file = File::create(gri_file_path).await?;
gri_file.write_all(&index_file).await?;
gri_file.flush().await?;
}
info!(
"computed gri sha and wrote index file {:?}",
Expand All @@ -262,6 +262,7 @@ impl ClusterWriter {
self.dump_file_names();
Ok(())
}

fn dump_file_names(&self) {
trace!("Data");
for d in &self.seen_data_files {
Expand All @@ -273,22 +274,23 @@ impl ClusterWriter {
trace!("{:016x}", d);
}
}
pub fn add_index(&mut self, hash: MD5Hash, file_hash: u64, offset: u64) -> Result<()> {

pub async fn add_index(&mut self, hash: MD5Hash, file_hash: u64, offset: u64) -> Result<()> {
self.index_info.push(IndexInfo {
hash,
offset,
file_hash,
});

if self.index_info.len() > ClusterWriter::MAX_INDEX_CNT {
self.write_data_and_index()?;
self.write_data_envelope_start()?;
self.write_data_and_index().await?;
self.write_data_envelope_start().await?;
}
Ok(())
}

fn write_data_envelope_start(&mut self) -> Result<()> {
write_int(&mut self.dest_data, DataFileMagicNumber)?;
async fn write_data_envelope_start(&mut self) -> Result<()> {
write_int(&mut self.dest_data, DataFileMagicNumber).await?;

let data_envelope = DataFileEnvelope {
version: 1,
Expand All @@ -299,7 +301,7 @@ impl ClusterWriter {
info: BTreeMap::new(),
};

write_envelope(&mut self.dest_data, &data_envelope)?;
write_envelope(&mut self.dest_data, &data_envelope).await?;

self.previous_position = 0;
Ok(())
Expand Down
22 changes: 6 additions & 16 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use anyhow::{bail, Result};
use clap::Parser;
use tokio::fs::File;
use std::{
fs::File,
io::Read,
net::{SocketAddr, ToSocketAddrs},
path::{Path, PathBuf},
};
Expand All @@ -15,12 +14,6 @@ pub struct Args {
#[arg(short, long)]
pub conf: Option<PathBuf>,

/// Number of threads to allow for servicing.
/// A small number if on spinning disks, larger
/// for SSD
#[arg(short, long)]
pub threads: Option<u16>,

/// hostname to bind to
#[arg(long)]
pub host: Vec<String>,
Expand Down Expand Up @@ -48,25 +41,22 @@ pub struct Args {
}

impl Args {
pub fn read_conf_file(&self) -> Result<Table> {
pub async fn read_conf_file(&self) -> Result<Table> {
use tokio::io::AsyncReadExt;
let mut conf_file = File::open(match &self.conf {
Some(path) if path.exists() && path.is_file() => path,
_ => {
bail!("A configuration file must be supplied")
}
})?;
}).await?;
let mut contents = String::new();
conf_file.read_to_string(&mut contents)?;
conf_file.read_to_string(&mut contents).await?;
match contents.parse::<Table>() {
Ok(v) => Ok(v),
Err(e) => bail!("Must provide a valid toml file: {:?}", e),
}
}

pub fn num_threads(&self) -> u16 {
self.threads.unwrap_or(7)
}


pub fn conf_file(&self) -> Result<PathBuf> {
match &self.conf {
Some(n) if n.exists() && n.is_file() => Ok(
Expand Down
Loading