Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
396 changes: 374 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ path = "src/main.rs"
async-stream = "0.3.6"
base64ct = { version = "1.8.3", features = ["alloc"] }
bytes = "1.11.0"
chrono = "0.4"
clap = { version = "4.5.54", features = ["derive"] }
color-print = "0.3.7"
colored = "3.1.1"
config = "0.15.19"
crossterm = "0.28"
dirs = "6.0.0"
futures = "0.3.31"
http = "1.4.0"
Expand All @@ -28,6 +30,7 @@ indicatif = "0.18.3"
json_to_table = "0.12.0"
miette = { version = "7.6.0", features = ["fancy"] }
rand = "0.9.2"
ratatui = "0.29"
s2-sdk = { version = "0.23.0", features = ["_hidden"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.149", features = ["preserve_order"] }
Expand Down
71 changes: 44 additions & 27 deletions src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ const WRITE_DONE_SENTINEL: u64 = u64::MAX;
type PendingAck =
Pin<Box<dyn Future<Output = (Instant, Result<IndexedAppendAck, S2Error>)> + Send>>;

struct BenchWriteSample {
bytes: u64,
records: u64,
elapsed: Duration,
ack_latencies: Vec<Duration>,
chain_hash: Option<u64>,
pub struct BenchWriteSample {
pub bytes: u64,
pub records: u64,
pub elapsed: Duration,
pub ack_latencies: Vec<Duration>,
pub chain_hash: Option<u64>,
}

struct BenchReadSample {
bytes: u64,
records: u64,
elapsed: Duration,
e2e_latencies: Vec<Duration>,
chain_hash: Option<u64>,
pub struct BenchReadSample {
pub bytes: u64,
pub records: u64,
pub elapsed: Duration,
pub e2e_latencies: Vec<Duration>,
pub chain_hash: Option<u64>,
}

trait BenchSample {
Expand Down Expand Up @@ -136,7 +136,7 @@ fn chain_hash(prev_hash: u64, body: &[u8]) -> u64 {
hasher.digest()
}

fn bench_write(
pub fn bench_write(
stream: S2Stream,
record_size: usize,
target_mibps: NonZeroU64,
Expand Down Expand Up @@ -258,7 +258,7 @@ fn bench_write(
}
}

fn bench_read(
pub fn bench_read(
stream: S2Stream,
record_size: usize,
write_done_records: Arc<AtomicU64>,
Expand All @@ -273,7 +273,7 @@ fn bench_read(
)
}

fn bench_read_catchup(
pub fn bench_read_catchup(
stream: S2Stream,
record_size: usize,
bench_start: Instant,
Expand Down Expand Up @@ -656,19 +656,28 @@ pub async fn run(
let mut catchup_chain_hash: Option<u64> = None;
let catchup_stream = bench_read_catchup(stream.clone(), record_size, bench_start);
let mut catchup_stream = std::pin::pin!(catchup_stream);
while let Some(result) = catchup_stream.next().await {
match result {
Ok(sample) => {
let catchup_timeout = Duration::from_secs(300);
let catchup_deadline = tokio::time::Instant::now() + catchup_timeout;
loop {
match tokio::time::timeout_at(catchup_deadline, catchup_stream.next()).await {
Ok(Some(Ok(sample))) => {
update_bench_bar(&catchup_bar, &sample);
if let Some(hash) = sample.chain_hash {
catchup_chain_hash = Some(hash);
}
catchup_sample = Some(sample);
}
Err(e) => {
Ok(Some(Err(e))) => {
catchup_bar.finish_and_clear();
return Err(e);
}
Ok(None) => break,
Err(_) => {
catchup_bar.finish_and_clear();
return Err(CliError::BenchVerification(
"catchup read timed out after 5 minutes".to_string(),
));
}
}
}

Expand All @@ -690,14 +699,22 @@ pub async fn run(
);
}

if let (Some(write_sample), Some(catchup_sample)) =
(write_sample.as_ref(), catchup_sample.as_ref())
&& write_sample.records != catchup_sample.records
{
return Err(CliError::BenchVerification(format!(
"catchup read record count mismatch: expected {}, got {}",
write_sample.records, catchup_sample.records
)));
match (write_sample.as_ref(), catchup_sample.as_ref()) {
(Some(write_sample), Some(catchup_sample))
if write_sample.records != catchup_sample.records =>
{
return Err(CliError::BenchVerification(format!(
"catchup read record count mismatch: expected {}, got {}",
write_sample.records, catchup_sample.records
)));
}
(Some(write_sample), None) if write_sample.records > 0 => {
return Err(CliError::BenchVerification(format!(
"catchup read returned no records but write produced {}",
write_sample.records
)));
}
_ => {}
}

if let (Some(expected), Some(actual)) = (write_chain_hash, catchup_chain_hash)
Expand Down
6 changes: 5 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ const GENERAL_USAGE: &str = color_print::cstr!(
#[command(name = "s2", version, override_usage = GENERAL_USAGE, styles = STYLES)]
pub struct Cli {
#[command(subcommand)]
pub command: Command,
pub command: Option<Command>,

/// Launch interactive TUI mode.
#[arg(short = 'i', long = "interactive")]
pub interactive: bool,
}

#[derive(Subcommand, Debug)]
Expand Down
18 changes: 15 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod config;
mod error;
mod ops;
mod record_format;
mod tui;
mod types;

use std::pin::Pin;
Expand Down Expand Up @@ -45,7 +46,7 @@ async fn main() -> miette::Result<()> {
}

async fn run() -> Result<(), CliError> {
let commands = Cli::try_parse().unwrap_or_else(|e| {
let cli = Cli::try_parse().unwrap_or_else(|e| {
// Customize error message for metric commands to say "metric" instead of "subcommand"
let msg = e.to_string();
if msg.contains("requires a subcommand") && msg.contains("get-") && msg.contains("-metrics")
Expand All @@ -59,6 +60,17 @@ async fn run() -> Result<(), CliError> {
e.exit()
});

// Launch interactive TUI mode
if cli.interactive {
return tui::run().await;
}

// Require a command when not in interactive mode
let Some(command) = cli.command else {
eprintln!("No command specified. Use --help for usage or -i for interactive mode.");
std::process::exit(1);
};

tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
Expand All @@ -70,7 +82,7 @@ async fn run() -> Result<(), CliError> {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

if let Command::Config(config_cmd) = &commands.command {
if let Command::Config(config_cmd) = &command {
match config_cmd {
ConfigCommand::List => {
let config = load_config_file()?;
Expand Down Expand Up @@ -112,7 +124,7 @@ async fn run() -> Result<(), CliError> {
let sdk_config = sdk_config(&cli_config)?;
let s2 = S2::new(sdk_config.clone()).map_err(CliError::SdkInit)?;

match commands.command {
match command {
Command::Config(..) => unreachable!(),

Command::Ls(args) => {
Expand Down
6 changes: 5 additions & 1 deletion src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,11 @@ pub async fn tail(
let uri = args.uri.clone();
let stream = s2.basin(uri.basin).stream(uri.stream);

let start = ReadStart::new().with_from(ReadFrom::TailOffset(args.lines));
// Use clamp_to_tail to handle empty streams gracefully - if we ask for
// TailOffset(10) but there are fewer records, clamp to the actual start
let start = ReadStart::new()
.with_from(ReadFrom::TailOffset(args.lines))
.with_clamp_to_tail(true);
let stop = if args.follow {
ReadStop::new()
} else {
Expand Down
Loading