Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved to Axum and async #33

Merged
merged 8 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,097 changes: 530 additions & 567 deletions Cargo.lock

Large diffs are not rendered by default.

40 changes: 20 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,48 @@ name = "bigtent"
path = "src/main.rs"

[dependencies]
anyhow = "1.0.86"
regex = "1.10.4"
anyhow = "1.0.95"
regex = "1.11"
md5 = "0.7.0"
sha2 = "0.10.8"
# boring = "4.7.0"
hex = "0.4.3"
bytes = "1.6.0"
bytes = "1.10"
hex-literal = "0.4.1"
shellexpand = "3.1.0"
chrono = "0.4.38"
chrono = "0.4"

# Web Framework stuff
rouille = "3.6.2"
url = "2.5.0"
serde = {version = "1.0.202", features = ["std", "derive"]}
serde_json = "1.0.117"
url = "2.5"
serde = { version = "1.0", features = ["std", "derive"] }
serde_json = "1.0"
serde_cbor = "0.11.2"

im = {version = "15.1.0", features = ["serde"]}
axum = {version = "0.8", features = ["json", "macros"]}
axum-streams = {version="0.20", features=["json", "csv"]}
futures = "0.3"
im = { version = "15.1.0", features = ["serde"] }
# tracing = "0.1.40"
# tracing-subscriber = { version = "0.3.18", features = ["env-filter"]}


# Execution stuff
tokio = { version = "1", features = ["macros", "rt-multi-thread", "io-util", "fs", "net", ] }
tokio-util = {version = "0.7", features = ["io"]}
tokio-stream = {version = "0.1"}
signal-hook = "0.3.17"
arc-swap = "1.7.1"
clap = { version = "4.5.4", features = ["derive"] }
toml = "0.8.13"
clap = { version = "4.5", features = ["derive"] }
toml = "0.8"
thousands = "0.2.0"
pipe = "0.4.0"

flume = {version = "0.11.0", features = ["spin"]}
flume = { version = "0.11", features = ["spin"] }
scopeguard = "1.2.0"

rand = "0.8.5"
rand = "0.9.0"

# Logging
log = "0.4.21"
env_logger = "0.11.3"

# override a dependency
idna = "1.0.3"
log = "0.4"
env_logger = "0.11"

[features]

Expand Down
86 changes: 44 additions & 42 deletions src/cluster_writer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#[cfg(not(test))]
use log::{info, trace};
use tokio::{fs::File, io::AsyncWriteExt};

use std::{
collections::{BTreeMap, BTreeSet, HashSet},
fs::{self, File},
path::PathBuf,
time::Instant,
};
Expand Down Expand Up @@ -44,8 +44,11 @@ pub struct ClusterWriter {
}

impl ClusterWriter {
const MAX_INDEX_CNT: usize = 25 * 1024 * 1024; // 25M
const MAX_DATA_FILE_SIZE: u64 = 15 * 1024 * 1024 * 1024; // 15GB
// 25M
const MAX_DATA_FILE_SIZE: u64 = 15 * 1024 * 1024 * 1024;
const MAX_INDEX_CNT: usize = 25 * 1024 * 1024;

// 15GB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments got dissociated from their target

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

#[inline]
fn make_dest_buffer() -> ShaWriter {
ShaWriter::new(10_000_000)
Expand All @@ -56,10 +59,10 @@ impl ClusterWriter {
Vec::with_capacity(10_000)
}

pub fn new<I: Into<PathBuf>>(dir: I) -> Result<ClusterWriter> {
pub async fn new<I: Into<PathBuf>>(dir: I) -> Result<ClusterWriter> {
let dir_path: PathBuf = dir.into();
if !dir_path.exists() {
fs::create_dir_all(&dir_path)?;
tokio::fs::create_dir_all(&dir_path).await?;
}
if !dir_path.is_dir() {
bail!(
Expand All @@ -80,7 +83,7 @@ impl ClusterWriter {
items_written: 0,
};

my_writer.write_data_envelope_start()?;
my_writer.write_data_envelope_start().await?;

Ok(my_writer)
}
Expand All @@ -93,17 +96,15 @@ impl ClusterWriter {
self.previous_position
}

pub fn write_item(&mut self, item: Item) -> Result<()>
{
use std::io::Write;
pub async fn write_item(&mut self, item: Item) -> Result<()> {
let the_hash = md5hash_str(&item.identifier);
let cur_pos = self.dest_data.pos();

let item_bytes = serde_cbor::to_vec(&item)?;

write_int(&mut self.dest_data, item_bytes.len() as u32)?;
write_int(&mut self.dest_data, item_bytes.len() as u32).await?;

(&mut self.dest_data).write_all(&item_bytes)?;
(&mut self.dest_data).write_all(&item_bytes).await?;
self.index_info.push(IndexInfo {
hash: the_hash,
offset: cur_pos,
Expand All @@ -115,7 +116,7 @@ impl ClusterWriter {
if self.index_info.len() > ClusterWriter::MAX_INDEX_CNT
|| self.dest_data.pos() > ClusterWriter::MAX_DATA_FILE_SIZE
{
self.write_data_and_index()?;
self.write_data_and_index().await?;
}

self.items_written += 1;
Expand All @@ -125,23 +126,22 @@ impl ClusterWriter {
Ok(())
}

pub fn finalize_cluster(&mut self) -> Result<PathBuf> {
use std::io::Write;
pub async fn finalize_cluster(&mut self) -> Result<PathBuf> {
if self.previous_position != 0 {
self.write_data_and_index()?;
self.write_data_and_index().await?;
}
let mut cluster_file = vec![];
{
let cluster_writer = &mut cluster_file;
write_int(cluster_writer, ClusterFileMagicNumber)?;
write_int(cluster_writer, ClusterFileMagicNumber).await?;
let cluster_env = ClusterFileEnvelope {
version: 1,
magic: ClusterFileMagicNumber,
info: BTreeMap::new(),
data_files: self.seen_data_files.iter().map(|v| *v).collect(),
index_files: self.index_files.iter().map(|v| *v).collect(),
};
write_envelope(cluster_writer, &cluster_env)?;
write_envelope(cluster_writer, &cluster_env).await?;
}

// compute sha256 of index
Expand All @@ -151,21 +151,20 @@ impl ClusterWriter {
// write the .grc file

let grc_file_path = path_plus_timed(&self.dir, &format!("{:016x}.grc", grc_sha));
let mut grc_file = File::create(&grc_file_path)?;
grc_file.write_all(&cluster_file)?;
grc_file.flush()?;
let mut grc_file = File::create(&grc_file_path).await?;
grc_file.write_all(&cluster_file).await?;
grc_file.flush().await?;

Ok(grc_file_path)
}

pub fn write_data_and_index(&mut self) -> Result<()> {
use std::io::Write;
pub async fn write_data_and_index(&mut self) -> Result<()> {
let mut grd_sha = 0;
if self.previous_position != 0 {
write_short_signed(&mut self.dest_data, -1)?; // a marker that says end of file
write_short_signed(&mut self.dest_data, -1).await?; // a marker that says end of file

// write final back-pointer (to the last entry record)
write_long(&mut self.dest_data, self.previous_position)?;
write_long(&mut self.dest_data, self.previous_position).await?;

info!(
"computing grd sha {:?}",
Expand All @@ -181,9 +180,9 @@ impl ClusterWriter {

let grd_file_path = self.dir.join(format!("{:016x}.grd", grd_sha));

let mut grd_file = File::create(grd_file_path)?;
grd_file.write_all(&data)?;
grd_file.flush()?;
let mut grd_file = File::create(grd_file_path).await?;
grd_file.write_all(&data).await?;
grd_file.flush().await?;

info!(
"computed grd sha and wrote at {:?}",
Expand All @@ -192,7 +191,7 @@ impl ClusterWriter {
self.previous_position = 0;
self.previous_hash = grd_sha;
self.seen_data_files.insert(grd_sha);
self.write_data_envelope_start()?;
self.write_data_envelope_start().await?;
}

if self.index_info.len() > 0 {
Expand All @@ -209,7 +208,7 @@ impl ClusterWriter {
let mut index_file = vec![];
{
let index_writer = &mut index_file;
write_int(index_writer, IndexFileMagicNumber)?;
write_int(index_writer, IndexFileMagicNumber).await?;
let index_env = IndexEnvelope {
version: 1,
magic: IndexFileMagicNumber,
Expand All @@ -218,9 +217,9 @@ impl ClusterWriter {
encoding: "MD5/Long/Long".into(),
info: BTreeMap::new(),
};
write_envelope(index_writer, &index_env)?;
write_envelope(index_writer, &index_env).await?;
for v in &self.index_info {
index_writer.write_all(&v.hash)?;
std::io::Write::write_all( index_writer, &v.hash)?;
write_long(
index_writer,
if v.file_hash == 0 {
Expand All @@ -231,8 +230,9 @@ impl ClusterWriter {
} else {
v.file_hash
},
)?;
write_long(index_writer, v.offset)?;
)
.await?;
write_long(index_writer, v.offset).await?;
}
}

Expand All @@ -248,9 +248,9 @@ impl ClusterWriter {
// write the .gri file
{
let gri_file_path = self.dir.join(format!("{:016x}.gri", gri_sha));
let mut gri_file = File::create(gri_file_path)?;
gri_file.write_all(&index_file)?;
gri_file.flush()?;
let mut gri_file = File::create(gri_file_path).await?;
gri_file.write_all(&index_file).await?;
gri_file.flush().await?;
}
info!(
"computed gri sha and wrote index file {:?}",
Expand All @@ -262,6 +262,7 @@ impl ClusterWriter {
self.dump_file_names();
Ok(())
}

fn dump_file_names(&self) {
trace!("Data");
for d in &self.seen_data_files {
Expand All @@ -273,22 +274,23 @@ impl ClusterWriter {
trace!("{:016x}", d);
}
}
pub fn add_index(&mut self, hash: MD5Hash, file_hash: u64, offset: u64) -> Result<()> {

pub async fn add_index(&mut self, hash: MD5Hash, file_hash: u64, offset: u64) -> Result<()> {
self.index_info.push(IndexInfo {
hash,
offset,
file_hash,
});

if self.index_info.len() > ClusterWriter::MAX_INDEX_CNT {
self.write_data_and_index()?;
self.write_data_envelope_start()?;
self.write_data_and_index().await?;
self.write_data_envelope_start().await?;
}
Ok(())
}

fn write_data_envelope_start(&mut self) -> Result<()> {
write_int(&mut self.dest_data, DataFileMagicNumber)?;
async fn write_data_envelope_start(&mut self) -> Result<()> {
write_int(&mut self.dest_data, DataFileMagicNumber).await?;

let data_envelope = DataFileEnvelope {
version: 1,
Expand All @@ -299,7 +301,7 @@ impl ClusterWriter {
info: BTreeMap::new(),
};

write_envelope(&mut self.dest_data, &data_envelope)?;
write_envelope(&mut self.dest_data, &data_envelope).await?;

self.previous_position = 0;
Ok(())
Expand Down
26 changes: 13 additions & 13 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use anyhow::{bail, Result};
use clap::Parser;
use tokio::fs::File;
use std::{
fs::File,
io::Read,
net::{SocketAddr, ToSocketAddrs},
path::{Path, PathBuf},
};
Expand All @@ -15,11 +14,11 @@ pub struct Args {
#[arg(short, long)]
pub conf: Option<PathBuf>,

/// Number of threads to allow for servicing.
/// A small number if on spinning disks, larger
/// for SSD
#[arg(short, long)]
pub threads: Option<u16>,
// /// Number of threads to allow for servicing.
// /// A small number if on spinning disks, larger
// /// for SSD
// #[arg(short, long)]
// pub threads: Option<u16>,

/// hostname to bind to
#[arg(long)]
Expand Down Expand Up @@ -48,24 +47,25 @@ pub struct Args {
}

impl Args {
pub fn read_conf_file(&self) -> Result<Table> {
pub async fn read_conf_file(&self) -> Result<Table> {
use tokio::io::AsyncReadExt;
let mut conf_file = File::open(match &self.conf {
Some(path) if path.exists() && path.is_file() => path,
_ => {
bail!("A configuration file must be supplied")
}
})?;
}).await?;
let mut contents = String::new();
conf_file.read_to_string(&mut contents)?;
conf_file.read_to_string(&mut contents).await?;
match contents.parse::<Table>() {
Ok(v) => Ok(v),
Err(e) => bail!("Must provide a valid toml file: {:?}", e),
}
}

pub fn num_threads(&self) -> u16 {
self.threads.unwrap_or(7)
}
// pub fn num_threads(&self) -> u16 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this no longer needed? Should it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// self.threads.unwrap_or(7)
// }

pub fn conf_file(&self) -> Result<PathBuf> {
match &self.conf {
Expand Down
Loading