From 5905360baabda61d206034675d6229428e500ecc Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 09:19:25 +0530 Subject: [PATCH 01/12] feat: support creating resources from spec --- Cargo.lock | 1 + cli/src/apply.rs | 265 ++++++++++++++++++++++++++++ cli/src/cli.rs | 20 ++- cli/src/error.rs | 4 + cli/src/main.rs | 11 +- lite/Cargo.toml | 3 +- lite/src/init.rs | 431 +++++++++++++++++++++++++++++++++++++++++++++ lite/src/lib.rs | 1 + lite/src/server.rs | 14 +- 9 files changed, 746 insertions(+), 4 deletions(-) create mode 100644 cli/src/apply.rs create mode 100644 lite/src/init.rs diff --git a/Cargo.lock b/Cargo.lock index 727b9e5d..248d3a3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4630,6 +4630,7 @@ dependencies = [ "eyre", "futures", "http 1.4.0", + "humantime", "indexmap", "itertools", "mimalloc", diff --git a/cli/src/apply.rs b/cli/src/apply.rs new file mode 100644 index 00000000..36d774ec --- /dev/null +++ b/cli/src/apply.rs @@ -0,0 +1,265 @@ +//! Declarative basin/stream configuration via a JSON spec file. + +use std::path::Path; + +use colored::Colorize; +use s2_lite::init::{ + BasinConfigSpec, DeleteOnEmptySpec, ResourcesSpec, RetentionPolicySpec, StorageClassSpec, + StreamConfigSpec, TimestampingModeSpec, TimestampingSpec, +}; +use s2_sdk::{ + S2, + types::{ + BasinName, BasinReconfiguration, CreateBasinInput, CreateStreamInput, DeleteOnEmptyConfig, + DeleteOnEmptyReconfiguration, ErrorResponse, ReconfigureBasinInput, ReconfigureStreamInput, + S2Error, StreamName, StreamReconfiguration, TimestampingConfig, TimestampingReconfiguration, + }, +}; + +fn stream_config_to_sdk(s: StreamConfigSpec) -> s2_sdk::types::StreamConfig { + let mut c = s2_sdk::types::StreamConfig::new(); + if let Some(sc) = s.storage_class { + c = c.with_storage_class(storage_class_to_sdk(sc)); + } + if let Some(rp) = s.retention_policy { + c = c.with_retention_policy(retention_policy_to_sdk(rp)); + } + if let Some(ts) = s.timestamping { + c = c.with_timestamping(timestamping_to_sdk(ts)); + } + if let Some(doe) = s.delete_on_empty { + c = c.with_delete_on_empty(delete_on_empty_to_sdk(doe)); + } + c +} + +fn basin_config_to_sdk(s: BasinConfigSpec) -> s2_sdk::types::BasinConfig { + let mut c = s2_sdk::types::BasinConfig::new(); + if let Some(dsc) = s.default_stream_config { + c = c.with_default_stream_config(stream_config_to_sdk(dsc)); + } + if let Some(v) = s.create_stream_on_append { + c = c.with_create_stream_on_append(v); + } + if let Some(v) = s.create_stream_on_read { + c = c.with_create_stream_on_read(v); + } + c +} + +fn stream_config_to_reconfig(s: StreamConfigSpec) -> StreamReconfiguration { + let mut r = StreamReconfiguration::new(); + if let Some(sc) = s.storage_class { + r = r.with_storage_class(storage_class_to_sdk(sc)); + } + if let Some(rp) = s.retention_policy { + r = r.with_retention_policy(retention_policy_to_sdk(rp)); + } + if let Some(ts) = s.timestamping { + let mut tsr = TimestampingReconfiguration::new(); + if let Some(m) = ts.mode { + tsr = tsr.with_mode(timestamping_mode_to_sdk(m)); + } + if let Some(u) = ts.uncapped { + tsr = tsr.with_uncapped(u); + } + r = r.with_timestamping(tsr); + } + if let Some(doe) = s.delete_on_empty { + let mut doer = DeleteOnEmptyReconfiguration::new(); + if let Some(ma) = doe.min_age { + doer = doer.with_min_age(ma.0); + } + r = r.with_delete_on_empty(doer); + } + r +} + +fn basin_config_to_reconfig(s: BasinConfigSpec) -> BasinReconfiguration { + let mut r = BasinReconfiguration::new(); + if let Some(dsc) = s.default_stream_config { + r = r.with_default_stream_config(stream_config_to_reconfig(dsc)); + } + if let Some(v) = s.create_stream_on_append { + r = r.with_create_stream_on_append(v); + } + if let Some(v) = s.create_stream_on_read { + r = r.with_create_stream_on_read(v); + } + r +} + +fn storage_class_to_sdk(s: StorageClassSpec) -> s2_sdk::types::StorageClass { + match s { + StorageClassSpec::Standard => s2_sdk::types::StorageClass::Standard, + StorageClassSpec::Express => s2_sdk::types::StorageClass::Express, + } +} + +fn retention_policy_to_sdk(rp: RetentionPolicySpec) -> s2_sdk::types::RetentionPolicy { + match rp.age_secs() { + Some(secs) => s2_sdk::types::RetentionPolicy::Age(secs), + None => s2_sdk::types::RetentionPolicy::Infinite, + } +} + +fn timestamping_mode_to_sdk(m: TimestampingModeSpec) -> s2_sdk::types::TimestampingMode { + match m { + TimestampingModeSpec::ClientPrefer => s2_sdk::types::TimestampingMode::ClientPrefer, + TimestampingModeSpec::ClientRequire => s2_sdk::types::TimestampingMode::ClientRequire, + TimestampingModeSpec::Arrival => s2_sdk::types::TimestampingMode::Arrival, + } +} + +fn timestamping_to_sdk(ts: TimestampingSpec) -> TimestampingConfig { + let mut tsc = TimestampingConfig::new(); + if let Some(m) = ts.mode { + tsc = tsc.with_mode(timestamping_mode_to_sdk(m)); + } + if let Some(u) = ts.uncapped { + tsc = tsc.with_uncapped(u); + } + tsc +} + +fn delete_on_empty_to_sdk(doe: DeleteOnEmptySpec) -> DeleteOnEmptyConfig { + let mut doec = DeleteOnEmptyConfig::new(); + if let Some(ma) = doe.min_age { + doec = doec.with_min_age(ma.0); + } + doec +} + +fn is_already_exists(err: &S2Error) -> bool { + matches!( + err, + S2Error::Server(ErrorResponse { code, .. }) if code == "resource_already_exists" + ) +} + +pub fn load(path: &Path) -> miette::Result { + let contents = std::fs::read_to_string(path) + .map_err(|e| miette::miette!("failed to read spec file {:?}: {}", path.display(), e))?; + let spec: ResourcesSpec = serde_json::from_str(&contents) + .map_err(|e| miette::miette!("failed to parse spec file {:?}: {}", path.display(), e))?; + Ok(spec) +} + +pub async fn apply(s2: &S2, spec: ResourcesSpec) -> miette::Result<()> { + for basin_spec in spec.basins { + let basin: BasinName = basin_spec + .name + .parse() + .map_err(|e| miette::miette!("invalid basin name {:?}: {}", basin_spec.name, e))?; + + apply_basin(s2, basin.clone(), basin_spec.config).await?; + + for stream_spec in basin_spec.streams { + let stream: StreamName = stream_spec.name.parse().map_err(|e| { + miette::miette!("invalid stream name {:?}: {}", stream_spec.name, e) + })?; + apply_stream(s2, basin.clone(), stream, stream_spec.config).await?; + } + } + Ok(()) +} + +async fn apply_basin( + s2: &S2, + basin: BasinName, + config: Option, +) -> miette::Result<()> { + let sdk_config = config + .as_ref() + .cloned() + .map(basin_config_to_sdk) + .unwrap_or_default(); + + let input = CreateBasinInput::new(basin.clone()).with_config(sdk_config); + match s2.create_basin(input).await { + Ok(_) => { + eprintln!("{}", format!(" basin {basin}").green().bold()); + } + Err(ref e) if is_already_exists(e) => { + if let Some(config) = config { + let reconfig = basin_config_to_reconfig(config); + s2.reconfigure_basin(ReconfigureBasinInput::new(basin.clone(), reconfig)) + .await + .map_err(|e| { + miette::miette!("failed to reconfigure basin {:?}: {}", basin.as_ref(), e) + })?; + eprintln!( + "{}", + format!(" basin {basin} (reconfigured)").yellow().bold() + ); + } else { + eprintln!("{}", format!(" basin {basin} (exists)").blue().bold()); + } + } + Err(e) => { + return Err(miette::miette!( + "failed to create basin {:?}: {}", + basin.as_ref(), + e + )); + } + } + Ok(()) +} + +async fn apply_stream( + s2: &S2, + basin: BasinName, + stream: StreamName, + config: Option, +) -> miette::Result<()> { + let sdk_config = config + .as_ref() + .cloned() + .map(stream_config_to_sdk) + .unwrap_or_default(); + + let basin_client = s2.basin(basin.clone()); + let input = CreateStreamInput::new(stream.clone()).with_config(sdk_config); + match basin_client.create_stream(input).await { + Ok(_) => { + eprintln!("{}", format!(" stream {basin}/{stream}").green().bold()); + } + Err(ref e) if is_already_exists(e) => { + if let Some(config) = config { + let reconfig = stream_config_to_reconfig(config); + basin_client + .reconfigure_stream(ReconfigureStreamInput::new(stream.clone(), reconfig)) + .await + .map_err(|e| { + miette::miette!( + "failed to reconfigure stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + ) + })?; + eprintln!( + "{}", + format!(" stream {basin}/{stream} (reconfigured)") + .yellow() + .bold() + ); + } else { + eprintln!( + "{}", + format!(" stream {basin}/{stream} (exists)").blue().bold() + ); + } + } + Err(e) => { + return Err(miette::miette!( + "failed to create stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + )); + } + } + Ok(()) +} diff --git a/cli/src/cli.rs b/cli/src/cli.rs index fa2a49ca..1f39ce96 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU64; +use std::{num::NonZeroU64, path::PathBuf}; use clap::{Args, Parser, Subcommand, builder::styling}; use s2_sdk::types::{ @@ -161,6 +161,17 @@ pub enum Command { /// Benchmark a stream to measure throughput and latency. Bench(BenchArgs), + /// Apply a declarative spec file, creating or reconfiguring basins and streams. + /// + /// Reads a JSON file and ensures the declared basins and streams exist with the + /// specified configuration. Basins and streams that already exist + /// are reconfigured to match the spec. Only the fields present in the spec are + /// updated. + /// + /// Example spec file: + /// {"basins":[{"name":"my-basin","streams":[{"name":"events"}]}]} + Apply(ApplyArgs), + /// Run S2 Lite server backed by object storage. /// /// Starts a lightweight S2-compatible server that can be backed by @@ -519,6 +530,13 @@ pub struct TailArgs { pub output: RecordsOut, } +#[derive(Args, Debug)] +pub struct ApplyArgs { + /// Path to a JSON spec file defining basins and streams to create or reconfigure. + #[arg(short = 'f', long, value_name = "FILE")] + pub file: PathBuf, +} + #[derive(Args, Debug)] pub struct BenchArgs { /// Name of the basin to use for the test. diff --git a/cli/src/error.rs b/cli/src/error.rs index 927abca7..c217a123 100644 --- a/cli/src/error.rs +++ b/cli/src/error.rs @@ -61,6 +61,10 @@ pub enum CliError { #[error("S2 Lite server error: {0}")] #[diagnostic(help("{}", HELP))] LiteServer(String), + + #[error("Apply failed: {0}")] + #[diagnostic(help("{}", HELP))] + Apply(String), } impl CliError { diff --git a/cli/src/main.rs b/cli/src/main.rs index 28bd79e8..5b95ba8f 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,5 +1,6 @@ //! S2 command-line interface. +mod apply; mod bench; mod cli; mod config; @@ -16,7 +17,7 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; use std::{pin::Pin, time::Duration}; use clap::{CommandFactory, Parser}; -use cli::{Cli, Command, ConfigCommand, ListBasinsArgs, ListStreamsArgs}; +use cli::{ApplyArgs, Cli, Command, ConfigCommand, ListBasinsArgs, ListStreamsArgs}; use colored::Colorize; use config::{ ConfigKey, load_cli_config, load_config_file, sdk_config, set_config_value, unset_config_value, @@ -525,6 +526,14 @@ async fn run() -> Result<(), CliError> { } } + Command::Apply(ApplyArgs { file }) => { + let spec = apply::load(&file).map_err(|e| CliError::InvalidArgs(e.into()))?; + apply::apply(&s2, spec) + .await + .map_err(|e| CliError::Apply(e.to_string()))?; + eprintln!("{}", "✓ Done".green().bold()); + } + Command::Bench(args) => { let basin_name = args.basin.0.clone(); let stream_name: StreamName = format!("bench/{}", uuid::Uuid::new_v4()) diff --git a/lite/Cargo.toml b/lite/Cargo.toml index ecd5164c..69e06487 100644 --- a/lite/Cargo.toml +++ b/lite/Cargo.toml @@ -29,13 +29,14 @@ axum-server = { workspace = true, features = ["tls-rustls"] } blake3 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } -clap = { workspace = true, features = ["derive"] } +clap = { workspace = true, features = ["derive", "env"] } compact_str = { workspace = true } dashmap = { workspace = true } enum-ordinalize = { workspace = true } eyre = { workspace = true } futures = { workspace = true } http = { workspace = true } +humantime = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } mimalloc = { workspace = true } diff --git a/lite/src/init.rs b/lite/src/init.rs new file mode 100644 index 00000000..b5ab16b7 --- /dev/null +++ b/lite/src/init.rs @@ -0,0 +1,431 @@ +//! Declarative basin/stream initialization from a JSON spec file. +//! +//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set. + +use std::{path::Path, time::Duration}; + +use s2_common::types::{ + basin::BasinName, + config::{ + BasinConfig, OptionalDeleteOnEmptyConfig, OptionalStreamConfig, OptionalTimestampingConfig, + RetentionPolicy, StorageClass, TimestampingMode, + }, + resources::CreateMode, + stream::StreamName, +}; +use serde::Deserialize; +use tracing::info; + +use crate::backend::{ + Backend, + error::{CreateBasinError, CreateStreamError}, +}; + +#[derive(Debug, Deserialize, Default)] +pub struct ResourcesSpec { + #[serde(default)] + pub basins: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct BasinSpec { + pub name: String, + #[serde(default)] + pub config: Option, + #[serde(default)] + pub streams: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct StreamSpec { + pub name: String, + #[serde(default)] + pub config: Option, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct BasinConfigSpec { + #[serde(default)] + pub default_stream_config: Option, + #[serde(default)] + pub create_stream_on_append: Option, + #[serde(default)] + pub create_stream_on_read: Option, +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct StreamConfigSpec { + #[serde(default)] + pub storage_class: Option, + #[serde(default)] + pub retention_policy: Option, + #[serde(default)] + pub timestamping: Option, + #[serde(default)] + pub delete_on_empty: Option, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum StorageClassSpec { + Standard, + Express, +} + +impl From for StorageClass { + fn from(s: StorageClassSpec) -> Self { + match s { + StorageClassSpec::Standard => StorageClass::Standard, + StorageClassSpec::Express => StorageClass::Express, + } + } +} + +/// Accepts `"infinite"` or a humantime duration string such as `"7d"`, `"1w"`. +#[derive(Debug, Clone, Copy)] +pub struct RetentionPolicySpec(pub RetentionPolicy); + +impl RetentionPolicySpec { + pub fn age_secs(self) -> Option { + self.0.age().map(|d| d.as_secs()) + } +} + +impl TryFrom for RetentionPolicySpec { + type Error = String; + + fn try_from(s: String) -> Result { + if s.eq_ignore_ascii_case("infinite") { + return Ok(RetentionPolicySpec(RetentionPolicy::Infinite())); + } + let d = humantime::parse_duration(&s) + .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?; + Ok(RetentionPolicySpec(RetentionPolicy::Age(d))) + } +} + +impl<'de> Deserialize<'de> for RetentionPolicySpec { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom) + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct TimestampingSpec { + #[serde(default)] + pub mode: Option, + #[serde(default)] + pub uncapped: Option, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum TimestampingModeSpec { + ClientPrefer, + ClientRequire, + Arrival, +} + +impl From for TimestampingMode { + fn from(m: TimestampingModeSpec) -> Self { + match m { + TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer, + TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire, + TimestampingModeSpec::Arrival => TimestampingMode::Arrival, + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DeleteOnEmptySpec { + #[serde(default)] + pub min_age: Option, +} + +/// A `std::time::Duration` deserialized from a humantime string (e.g. `"1d"`, `"2h 30m"`). +#[derive(Debug, Clone, Copy)] +pub struct HumanDuration(pub Duration); + +impl TryFrom for HumanDuration { + type Error = String; + + fn try_from(s: String) -> Result { + humantime::parse_duration(&s) + .map(HumanDuration) + .map_err(|e| format!("invalid duration {:?}: {}", s, e)) + } +} + +impl<'de> Deserialize<'de> for HumanDuration { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + HumanDuration::try_from(s).map_err(serde::de::Error::custom) + } +} + +impl From for BasinConfig { + fn from(s: BasinConfigSpec) -> Self { + BasinConfig { + default_stream_config: s + .default_stream_config + .map(OptionalStreamConfig::from) + .unwrap_or_default(), + create_stream_on_append: s.create_stream_on_append.unwrap_or(false), + create_stream_on_read: s.create_stream_on_read.unwrap_or(false), + } + } +} + +impl From for OptionalStreamConfig { + fn from(s: StreamConfigSpec) -> Self { + OptionalStreamConfig { + storage_class: s.storage_class.map(StorageClass::from), + retention_policy: s.retention_policy.map(|r| r.0), + timestamping: s + .timestamping + .map(|t| OptionalTimestampingConfig { + mode: t.mode.map(TimestampingMode::from), + uncapped: t.uncapped, + }) + .unwrap_or_default(), + delete_on_empty: s + .delete_on_empty + .map(|d| OptionalDeleteOnEmptyConfig { + min_age: d.min_age.map(|h| h.0), + }) + .unwrap_or_default(), + } + } +} + +pub fn load(path: &Path) -> eyre::Result { + let contents = std::fs::read_to_string(path) + .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?; + let spec: ResourcesSpec = serde_json::from_str(&contents) + .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?; + Ok(spec) +} + +pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> { + for basin_spec in spec.basins { + let basin: BasinName = basin_spec + .name + .parse() + .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?; + + let mode = if basin_spec.config.is_some() { + CreateMode::CreateOrReconfigure + } else { + CreateMode::CreateOnly(None) + }; + let config = basin_spec.config.map(BasinConfig::from).unwrap_or_default(); + + match backend.create_basin(basin.clone(), config, mode).await { + Ok(_) => info!(basin = basin.as_ref(), "basin applied"), + Err(CreateBasinError::BasinAlreadyExists(_)) => { + info!(basin = basin.as_ref(), "basin already exists, skipping") + } + Err(e) => { + return Err(eyre::eyre!( + "failed to create/reconfigure basin {:?}: {}", + basin, + e + )); + } + } + + for stream_spec in basin_spec.streams { + let stream: StreamName = stream_spec + .name + .parse() + .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?; + + let mode = if stream_spec.config.is_some() { + CreateMode::CreateOrReconfigure + } else { + CreateMode::CreateOnly(None) + }; + let config = stream_spec + .config + .map(OptionalStreamConfig::from) + .unwrap_or_default(); + + match backend + .create_stream(basin.clone(), stream.clone(), config, mode) + .await + { + Ok(_) => info!( + basin = basin.as_ref(), + stream = stream.as_ref(), + "stream applied" + ), + Err(CreateStreamError::StreamAlreadyExists(_)) => info!( + basin = basin.as_ref(), + stream = stream.as_ref(), + "stream already exists, skipping" + ), + Err(e) => { + return Err(eyre::eyre!( + "failed to create/reconfigure stream {:?}/{:?}: {}", + basin, + stream, + e + )); + } + } + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_spec(json: &str) -> ResourcesSpec { + serde_json::from_str(json).expect("valid JSON") + } + + #[test] + fn empty_spec() { + let spec = parse_spec("{}"); + assert!(spec.basins.is_empty()); + } + + #[test] + fn basin_no_config() { + let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#); + assert_eq!(spec.basins.len(), 1); + assert_eq!(spec.basins[0].name, "my-basin"); + assert!(spec.basins[0].config.is_none()); + assert!(spec.basins[0].streams.is_empty()); + } + + #[test] + fn retention_policy_infinite() { + let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize"); + assert!(matches!(rp.0, RetentionPolicy::Infinite())); + } + + #[test] + fn retention_policy_duration() { + let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize"); + assert!(matches!(rp.0, RetentionPolicy::Age(_))); + if let RetentionPolicy::Age(d) = rp.0 { + assert_eq!(d, Duration::from_secs(7 * 24 * 3600)); + } + } + + #[test] + fn retention_policy_invalid() { + let err = serde_json::from_str::(r#""not-a-duration""#); + assert!(err.is_err()); + } + + #[test] + fn human_duration() { + let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize"); + assert_eq!(hd.0, Duration::from_secs(86400)); + } + + #[test] + fn full_spec_roundtrip() { + let json = r#" + { + "basins": [ + { + "name": "my-basin", + "config": { + "create_stream_on_append": true, + "create_stream_on_read": false, + "default_stream_config": { + "storage_class": "express", + "retention_policy": "7days", + "timestamping": { + "mode": "client-prefer", + "uncapped": false + }, + "delete_on_empty": { + "min_age": "1day" + } + } + }, + "streams": [ + { + "name": "events", + "config": { + "storage_class": "standard", + "retention_policy": "infinite" + } + } + ] + } + ] + }"#; + + let spec = parse_spec(json); + assert_eq!(spec.basins.len(), 1); + let basin = &spec.basins[0]; + assert_eq!(basin.name, "my-basin"); + + let config = basin.config.as_ref().unwrap(); + assert_eq!(config.create_stream_on_append, Some(true)); + assert_eq!(config.create_stream_on_read, Some(false)); + + let dsc = config.default_stream_config.as_ref().unwrap(); + assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express))); + assert!(matches!( + dsc.retention_policy.as_ref().map(|r| &r.0), + Some(RetentionPolicy::Age(_)) + )); + + let ts = dsc.timestamping.as_ref().unwrap(); + assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer))); + assert_eq!(ts.uncapped, Some(false)); + + let doe = dsc.delete_on_empty.as_ref().unwrap(); + assert_eq!( + doe.min_age.as_ref().map(|h| h.0), + Some(Duration::from_secs(86400)) + ); + + assert_eq!(basin.streams.len(), 1); + let stream = &basin.streams[0]; + assert_eq!(stream.name, "events"); + let sc = stream.config.as_ref().unwrap(); + assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard))); + assert!(matches!( + sc.retention_policy.as_ref().map(|r| &r.0), + Some(RetentionPolicy::Infinite()) + )); + } + + #[test] + fn basin_config_conversion() { + let spec = BasinConfigSpec { + default_stream_config: None, + create_stream_on_append: Some(true), + create_stream_on_read: None, + }; + let config = BasinConfig::from(spec); + assert!(config.create_stream_on_append); + assert!(!config.create_stream_on_read); + } + + #[test] + fn stream_config_conversion() { + let spec = StreamConfigSpec { + storage_class: Some(StorageClassSpec::Standard), + retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())), + timestamping: None, + delete_on_empty: None, + }; + let config = OptionalStreamConfig::from(spec); + assert!(matches!(config.storage_class, Some(StorageClass::Standard))); + assert!(matches!( + config.retention_policy, + Some(RetentionPolicy::Infinite()) + )); + } +} diff --git a/lite/src/lib.rs b/lite/src/lib.rs index 1935ea41..60eb0ec5 100644 --- a/lite/src/lib.rs +++ b/lite/src/lib.rs @@ -2,5 +2,6 @@ pub mod backend; pub mod handlers; +pub mod init; pub mod metrics; pub mod server; diff --git a/lite/src/server.rs b/lite/src/server.rs index 2459ee7d..afef0f27 100644 --- a/lite/src/server.rs +++ b/lite/src/server.rs @@ -15,7 +15,7 @@ use tower_http::{ }; use tracing::info; -use crate::{backend::Backend, handlers}; +use crate::{backend::Backend, handlers, init}; #[derive(clap::Args, Debug, Clone)] pub struct TlsConfig { @@ -68,6 +68,13 @@ pub struct LiteArgs { /// should be denied at the HTTP layer. #[arg(long)] pub no_cors: bool, + + /// Path to a JSON file defining basins and streams to create at startup. + /// + /// Uses create-or-reconfigure semantics, so it is safe to run on repeated + /// restarts. Can also be set via S2LITE_INIT_FILE environment variable. + #[arg(long, env = "S2LITE_INIT_FILE")] + pub init_file: Option, } #[derive(Debug, Clone)] @@ -145,6 +152,11 @@ pub async fn run(args: LiteArgs) -> eyre::Result<()> { let backend = Backend::new(db, append_inflight_max); crate::backend::bgtasks::spawn(&backend); + if let Some(init_file) = &args.init_file { + let spec = init::load(init_file)?; + init::apply(&backend, spec).await?; + } + let mut app = handlers::router().with_state(backend).layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO)) From acaebc53ab1f72ee5e38cd477f5846149ca1c8a4 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 09:26:50 +0530 Subject: [PATCH 02/12] .. --- cli/src/apply.rs | 67 ++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/cli/src/apply.rs b/cli/src/apply.rs index 36d774ec..98726d84 100644 --- a/cli/src/apply.rs +++ b/cli/src/apply.rs @@ -181,20 +181,16 @@ async fn apply_basin( eprintln!("{}", format!(" basin {basin}").green().bold()); } Err(ref e) if is_already_exists(e) => { - if let Some(config) = config { - let reconfig = basin_config_to_reconfig(config); - s2.reconfigure_basin(ReconfigureBasinInput::new(basin.clone(), reconfig)) - .await - .map_err(|e| { - miette::miette!("failed to reconfigure basin {:?}: {}", basin.as_ref(), e) - })?; - eprintln!( - "{}", - format!(" basin {basin} (reconfigured)").yellow().bold() - ); - } else { - eprintln!("{}", format!(" basin {basin} (exists)").blue().bold()); - } + let reconfig = config.map(basin_config_to_reconfig).unwrap_or_default(); + s2.reconfigure_basin(ReconfigureBasinInput::new(basin.clone(), reconfig)) + .await + .map_err(|e| { + miette::miette!("failed to reconfigure basin {:?}: {}", basin.as_ref(), e) + })?; + eprintln!( + "{}", + format!(" basin {basin} (reconfigured)").yellow().bold() + ); } Err(e) => { return Err(miette::miette!( @@ -226,31 +222,24 @@ async fn apply_stream( eprintln!("{}", format!(" stream {basin}/{stream}").green().bold()); } Err(ref e) if is_already_exists(e) => { - if let Some(config) = config { - let reconfig = stream_config_to_reconfig(config); - basin_client - .reconfigure_stream(ReconfigureStreamInput::new(stream.clone(), reconfig)) - .await - .map_err(|e| { - miette::miette!( - "failed to reconfigure stream {:?}/{:?}: {}", - basin.as_ref(), - stream.as_ref(), - e - ) - })?; - eprintln!( - "{}", - format!(" stream {basin}/{stream} (reconfigured)") - .yellow() - .bold() - ); - } else { - eprintln!( - "{}", - format!(" stream {basin}/{stream} (exists)").blue().bold() - ); - } + let reconfig = config.map(stream_config_to_reconfig).unwrap_or_default(); + basin_client + .reconfigure_stream(ReconfigureStreamInput::new(stream.clone(), reconfig)) + .await + .map_err(|e| { + miette::miette!( + "failed to reconfigure stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + ) + })?; + eprintln!( + "{}", + format!(" stream {basin}/{stream} (reconfigured)") + .yellow() + .bold() + ); } Err(e) => { return Err(miette::miette!( From 04fee8ab775891a8ddffaa2c14d54121706a066f Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 10:49:53 +0530 Subject: [PATCH 03/12] .. --- cli/src/apply.rs | 164 +++++++++++++--------------------------------- sdk/src/api.rs | 93 ++++++++++++++++++-------- sdk/src/client.rs | 4 ++ sdk/src/ops.rs | 57 ++++++++++++++-- sdk/src/types.rs | 122 ++++++++++++++++++++++++++++++++++ 5 files changed, 291 insertions(+), 149 deletions(-) diff --git a/cli/src/apply.rs b/cli/src/apply.rs index 98726d84..47f793cf 100644 --- a/cli/src/apply.rs +++ b/cli/src/apply.rs @@ -10,14 +10,14 @@ use s2_lite::init::{ use s2_sdk::{ S2, types::{ - BasinName, BasinReconfiguration, CreateBasinInput, CreateStreamInput, DeleteOnEmptyConfig, - DeleteOnEmptyReconfiguration, ErrorResponse, ReconfigureBasinInput, ReconfigureStreamInput, - S2Error, StreamName, StreamReconfiguration, TimestampingConfig, TimestampingReconfiguration, + BasinConfig, BasinName, CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, + CreateOrReconfigured, DeleteOnEmptyConfig, RetentionPolicy, StorageClass, StreamConfig, + StreamName, TimestampingConfig, TimestampingMode, }, }; -fn stream_config_to_sdk(s: StreamConfigSpec) -> s2_sdk::types::StreamConfig { - let mut c = s2_sdk::types::StreamConfig::new(); +fn stream_config_to_sdk(s: StreamConfigSpec) -> StreamConfig { + let mut c = StreamConfig::new(); if let Some(sc) = s.storage_class { c = c.with_storage_class(storage_class_to_sdk(sc)); } @@ -33,8 +33,8 @@ fn stream_config_to_sdk(s: StreamConfigSpec) -> s2_sdk::types::StreamConfig { c } -fn basin_config_to_sdk(s: BasinConfigSpec) -> s2_sdk::types::BasinConfig { - let mut c = s2_sdk::types::BasinConfig::new(); +fn basin_config_to_sdk(s: BasinConfigSpec) -> BasinConfig { + let mut c = BasinConfig::new(); if let Some(dsc) = s.default_stream_config { c = c.with_default_stream_config(stream_config_to_sdk(dsc)); } @@ -47,67 +47,25 @@ fn basin_config_to_sdk(s: BasinConfigSpec) -> s2_sdk::types::BasinConfig { c } -fn stream_config_to_reconfig(s: StreamConfigSpec) -> StreamReconfiguration { - let mut r = StreamReconfiguration::new(); - if let Some(sc) = s.storage_class { - r = r.with_storage_class(storage_class_to_sdk(sc)); - } - if let Some(rp) = s.retention_policy { - r = r.with_retention_policy(retention_policy_to_sdk(rp)); - } - if let Some(ts) = s.timestamping { - let mut tsr = TimestampingReconfiguration::new(); - if let Some(m) = ts.mode { - tsr = tsr.with_mode(timestamping_mode_to_sdk(m)); - } - if let Some(u) = ts.uncapped { - tsr = tsr.with_uncapped(u); - } - r = r.with_timestamping(tsr); - } - if let Some(doe) = s.delete_on_empty { - let mut doer = DeleteOnEmptyReconfiguration::new(); - if let Some(ma) = doe.min_age { - doer = doer.with_min_age(ma.0); - } - r = r.with_delete_on_empty(doer); - } - r -} - -fn basin_config_to_reconfig(s: BasinConfigSpec) -> BasinReconfiguration { - let mut r = BasinReconfiguration::new(); - if let Some(dsc) = s.default_stream_config { - r = r.with_default_stream_config(stream_config_to_reconfig(dsc)); - } - if let Some(v) = s.create_stream_on_append { - r = r.with_create_stream_on_append(v); - } - if let Some(v) = s.create_stream_on_read { - r = r.with_create_stream_on_read(v); - } - r -} - -fn storage_class_to_sdk(s: StorageClassSpec) -> s2_sdk::types::StorageClass { +fn storage_class_to_sdk(s: StorageClassSpec) -> StorageClass { match s { - StorageClassSpec::Standard => s2_sdk::types::StorageClass::Standard, - StorageClassSpec::Express => s2_sdk::types::StorageClass::Express, + StorageClassSpec::Standard => StorageClass::Standard, + StorageClassSpec::Express => StorageClass::Express, } } -fn retention_policy_to_sdk(rp: RetentionPolicySpec) -> s2_sdk::types::RetentionPolicy { +fn retention_policy_to_sdk(rp: RetentionPolicySpec) -> RetentionPolicy { match rp.age_secs() { - Some(secs) => s2_sdk::types::RetentionPolicy::Age(secs), - None => s2_sdk::types::RetentionPolicy::Infinite, + Some(secs) => RetentionPolicy::Age(secs), + None => RetentionPolicy::Infinite, } } -fn timestamping_mode_to_sdk(m: TimestampingModeSpec) -> s2_sdk::types::TimestampingMode { +fn timestamping_mode_to_sdk(m: TimestampingModeSpec) -> TimestampingMode { match m { - TimestampingModeSpec::ClientPrefer => s2_sdk::types::TimestampingMode::ClientPrefer, - TimestampingModeSpec::ClientRequire => s2_sdk::types::TimestampingMode::ClientRequire, - TimestampingModeSpec::Arrival => s2_sdk::types::TimestampingMode::Arrival, + TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer, + TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire, + TimestampingModeSpec::Arrival => TimestampingMode::Arrival, } } @@ -130,13 +88,6 @@ fn delete_on_empty_to_sdk(doe: DeleteOnEmptySpec) -> DeleteOnEmptyConfig { doec } -fn is_already_exists(err: &S2Error) -> bool { - matches!( - err, - S2Error::Server(ErrorResponse { code, .. }) if code == "resource_already_exists" - ) -} - pub fn load(path: &Path) -> miette::Result { let contents = std::fs::read_to_string(path) .map_err(|e| miette::miette!("failed to read spec file {:?}: {}", path.display(), e))?; @@ -169,36 +120,24 @@ async fn apply_basin( basin: BasinName, config: Option, ) -> miette::Result<()> { - let sdk_config = config - .as_ref() - .cloned() - .map(basin_config_to_sdk) - .unwrap_or_default(); - - let input = CreateBasinInput::new(basin.clone()).with_config(sdk_config); - match s2.create_basin(input).await { - Ok(_) => { + let mut input = CreateOrReconfigureBasinInput::new(basin.clone()); + if let Some(c) = config { + input = input.with_config(basin_config_to_sdk(c)); + } + match s2 + .create_or_reconfigure_basin(input) + .await + .map_err(|e| miette::miette!("failed to apply basin {:?}: {}", basin.as_ref(), e))? + { + CreateOrReconfigured::Created(_) => { eprintln!("{}", format!(" basin {basin}").green().bold()); } - Err(ref e) if is_already_exists(e) => { - let reconfig = config.map(basin_config_to_reconfig).unwrap_or_default(); - s2.reconfigure_basin(ReconfigureBasinInput::new(basin.clone(), reconfig)) - .await - .map_err(|e| { - miette::miette!("failed to reconfigure basin {:?}: {}", basin.as_ref(), e) - })?; + CreateOrReconfigured::Reconfigured(_) => { eprintln!( "{}", format!(" basin {basin} (reconfigured)").yellow().bold() ); } - Err(e) => { - return Err(miette::miette!( - "failed to create basin {:?}: {}", - basin.as_ref(), - e - )); - } } Ok(()) } @@ -209,31 +148,26 @@ async fn apply_stream( stream: StreamName, config: Option, ) -> miette::Result<()> { - let sdk_config = config - .as_ref() - .cloned() - .map(stream_config_to_sdk) - .unwrap_or_default(); - + let mut input = CreateOrReconfigureStreamInput::new(stream.clone()); + if let Some(c) = config { + input = input.with_config(stream_config_to_sdk(c)); + } let basin_client = s2.basin(basin.clone()); - let input = CreateStreamInput::new(stream.clone()).with_config(sdk_config); - match basin_client.create_stream(input).await { - Ok(_) => { + match basin_client + .create_or_reconfigure_stream(input) + .await + .map_err(|e| { + miette::miette!( + "failed to apply stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + ) + })? { + CreateOrReconfigured::Created(_) => { eprintln!("{}", format!(" stream {basin}/{stream}").green().bold()); } - Err(ref e) if is_already_exists(e) => { - let reconfig = config.map(stream_config_to_reconfig).unwrap_or_default(); - basin_client - .reconfigure_stream(ReconfigureStreamInput::new(stream.clone(), reconfig)) - .await - .map_err(|e| { - miette::miette!( - "failed to reconfigure stream {:?}/{:?}: {}", - basin.as_ref(), - stream.as_ref(), - e - ) - })?; + CreateOrReconfigured::Reconfigured(_) => { eprintln!( "{}", format!(" stream {basin}/{stream} (reconfigured)") @@ -241,14 +175,6 @@ async fn apply_stream( .bold() ); } - Err(e) => { - return Err(miette::miette!( - "failed to create stream {:?}/{:?}: {}", - basin.as_ref(), - stream.as_ref(), - e - )); - } } Ok(()) } diff --git a/sdk/src/api.rs b/sdk/src/api.rs index 34939a26..73c75b7e 100644 --- a/sdk/src/api.rs +++ b/sdk/src/api.rs @@ -1,40 +1,48 @@ -use crate::client::{self, StreamingResponse, UnaryResponse}; -use crate::retry::{RetryBackoff, RetryBackoffBuilder}; -use crate::types::{ - AccessTokenId, BasinAuthority, BasinName, Compression, RetryConfig, S2Config, S2Endpoints, - StreamName, -}; +use std::{ops::Deref, pin::Pin, sync::Arc, time::Duration}; + use async_stream::try_stream; use async_trait::async_trait; use bytes::BytesMut; use futures::{Stream, StreamExt}; -use http::header::InvalidHeaderValue; -use http::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE}; -use http::{HeaderMap, HeaderValue, StatusCode}; -use prost::{self, Message}; -use s2_api::v1::access::{ - AccessTokenInfo, IssueAccessTokenResponse, ListAccessTokensRequest, ListAccessTokensResponse, -}; -use s2_api::v1::basin::{BasinInfo, CreateBasinRequest, ListBasinsRequest, ListBasinsResponse}; -use s2_api::v1::config::{BasinConfig, BasinReconfiguration, StreamConfig, StreamReconfiguration}; -use s2_api::v1::metrics::{ - AccountMetricSetRequest, BasinMetricSetRequest, MetricSetResponse, StreamMetricSetRequest, +use http::{ + HeaderMap, HeaderValue, StatusCode, + header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, InvalidHeaderValue}, }; -use s2_api::v1::stream::s2s::{self, FrameDecoder, SessionMessage, TerminalMessage}; -use s2_api::v1::stream::{ - AppendConditionFailed, CreateStreamRequest, ListStreamsRequest, ListStreamsResponse, ReadEnd, - ReadStart, StreamInfo, TailResponse, - proto::{AppendAck, AppendInput, ReadBatch}, +use prost::{self, Message}; +use s2_api::v1::{ + access::{ + AccessTokenInfo, IssueAccessTokenResponse, ListAccessTokensRequest, + ListAccessTokensResponse, + }, + basin::{ + BasinInfo, CreateBasinRequest, CreateOrReconfigureBasinRequest, ListBasinsRequest, + ListBasinsResponse, + }, + config::{BasinConfig, BasinReconfiguration, StreamConfig, StreamReconfiguration}, + metrics::{ + AccountMetricSetRequest, BasinMetricSetRequest, MetricSetResponse, StreamMetricSetRequest, + }, + stream::{ + AppendConditionFailed, CreateStreamRequest, ListStreamsRequest, ListStreamsResponse, + ReadEnd, ReadStart, StreamInfo, TailResponse, + proto::{AppendAck, AppendInput, ReadBatch}, + s2s::{self, FrameDecoder, SessionMessage, TerminalMessage}, + }, }; use secrecy::ExposeSecret; -use std::ops::Deref; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; use tokio_util::codec::Decoder; use tracing::{debug, warn}; use url::Url; +use crate::{ + client::{self, StreamingResponse, UnaryResponse}, + retry::{RetryBackoff, RetryBackoffBuilder}, + types::{ + AccessTokenId, BasinAuthority, BasinName, Compression, RetryConfig, S2Config, S2Endpoints, + StreamName, + }, +}; + const CONTENT_TYPE_S2S: &str = "s2s/proto"; const CONTENT_TYPE_PROTO: &str = "application/protobuf"; const ACCEPT_PROTO: &str = "application/protobuf"; @@ -135,6 +143,18 @@ impl AccountClient { Ok(response.json::()?) } + pub async fn create_or_reconfigure_basin( + &self, + name: BasinName, + request: Option, + ) -> Result<(bool, BasinInfo), ApiError> { + let url = self.base_url.join(&format!("v1/basins/{name}"))?; + let request = self.put(url).json(&request).build()?; + let response = self.request(request).send().await?; + let was_created = response.status() == StatusCode::CREATED; + Ok((was_created, response.json::()?)) + } + pub async fn delete_basin( &self, name: BasinName, @@ -273,6 +293,20 @@ impl BasinClient { Ok(response.json::()?) } + pub async fn create_or_reconfigure_stream( + &self, + name: StreamName, + config: Option, + ) -> Result<(bool, StreamInfo), ApiError> { + let url = self + .base_url + .join(&format!("v1/streams/{}", urlencoding::encode(&name)))?; + let request = self.put(url).json(&config).build()?; + let response = self.request(request).send().await?; + let was_created = response.status() == StatusCode::CREATED; + Ok((was_created, response.json::()?)) + } + pub async fn delete_stream( &self, name: StreamName, @@ -753,6 +787,13 @@ impl BaseClient { .compression(self.compression) } + pub fn put(&self, url: Url) -> client::RequestBuilder { + client::RequestBuilder::put(url) + .timeout(self.request_timeout) + .headers(&self.default_headers) + .compression(self.compression) + } + pub fn delete(&self, url: Url) -> client::RequestBuilder { client::RequestBuilder::delete(url) .timeout(self.request_timeout) diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 390c20ae..78ad9f10 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -213,6 +213,10 @@ impl RequestBuilder { Self::new(Method::PATCH, url) } + pub fn put(url: Url) -> Self { + Self::new(Method::PUT, url) + } + pub fn delete(url: Url) -> Self { Self::new(Method::DELETE, url) } diff --git a/sdk/src/ops.rs b/sdk/src/ops.rs index 7f291025..7b6b3fbe 100644 --- a/sdk/src/ops.rs +++ b/sdk/src/ops.rs @@ -8,10 +8,11 @@ use crate::{ session::{self, AppendSession, AppendSessionConfig}, types::{ AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName, - BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, - GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput, - ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput, - ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput, + BasinState, CreateBasinInput, CreateOrReconfigureBasinInput, + CreateOrReconfigureStreamInput, CreateOrReconfigured, CreateStreamInput, DeleteBasinInput, + DeleteStreamInput, GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, + IssueAccessTokenInput, ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, + ListAllStreamsInput, ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput, ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo, StreamName, StreamPosition, Streaming, }, @@ -103,6 +104,30 @@ impl S2 { Ok(info.into()) } + /// Create or reconfigure a basin. + /// + /// Creates the basin if it doesn't exist, or reconfigures it to match the provided + /// configuration if it does. Uses HTTP PUT semantics — always idempotent. + /// + /// Returns [`CreateOrReconfigured::Created`] with the basin info if the basin was newly + /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed. + pub async fn create_or_reconfigure_basin( + &self, + input: CreateOrReconfigureBasinInput, + ) -> Result, S2Error> { + let (name, request) = input.into(); + let (was_created, info) = self + .client + .create_or_reconfigure_basin(name, request) + .await?; + let info = info.into(); + Ok(if was_created { + CreateOrReconfigured::Created(info) + } else { + CreateOrReconfigured::Reconfigured(info) + }) + } + /// Get basin configuration. pub async fn get_basin_config(&self, name: BasinName) -> Result { let config = self.client.get_basin_config(name).await?; @@ -295,6 +320,30 @@ impl S2Basin { Ok(info.try_into()?) } + /// Create or reconfigure a stream. + /// + /// Creates the stream if it doesn't exist, or reconfigures it to match the provided + /// configuration if it does. Uses HTTP PUT semantics — always idempotent. + /// + /// Returns [`CreateOrReconfigured::Created`] with the stream info if the stream was newly + /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed. + pub async fn create_or_reconfigure_stream( + &self, + input: CreateOrReconfigureStreamInput, + ) -> Result, S2Error> { + let (name, config) = input.into(); + let (was_created, info) = self + .client + .create_or_reconfigure_stream(name, config) + .await?; + let info = info.try_into()?; + Ok(if was_created { + CreateOrReconfigured::Created(info) + } else { + CreateOrReconfigured::Reconfigured(info) + }) + } + /// Get stream configuration. pub async fn get_stream_config(&self, name: StreamName) -> Result { let config = self.client.get_stream_config(name).await?; diff --git a/sdk/src/types.rs b/sdk/src/types.rs index aaa41a36..09257b7e 100644 --- a/sdk/src/types.rs +++ b/sdk/src/types.rs @@ -862,6 +862,32 @@ impl From for api::basin::BasinScope { } } +/// Result of a create-or-reconfigure operation. +/// +/// Indicates whether the resource was newly created or already existed and was +/// reconfigured. Both variants hold the resource's current state. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CreateOrReconfigured { + /// Resource was newly created. + Created(T), + /// Resource already existed and was reconfigured to match the spec. + Reconfigured(T), +} + +impl CreateOrReconfigured { + /// Returns `true` if the resource was newly created. + pub fn is_created(&self) -> bool { + matches!(self, Self::Created(_)) + } + + /// Unwrap the inner value regardless of variant. + pub fn into_inner(self) -> T { + match self { + Self::Created(t) | Self::Reconfigured(t) => t, + } + } +} + #[derive(Debug, Clone)] #[non_exhaustive] /// Input for [`create_basin`](crate::S2::create_basin) operation. @@ -920,6 +946,68 @@ impl From for (api::basin::CreateBasinRequest, String) { } } +#[derive(Debug, Clone)] +#[non_exhaustive] +/// Input for [`create_or_reconfigure_basin`](crate::S2::create_or_reconfigure_basin) operation. +pub struct CreateOrReconfigureBasinInput { + /// Basin name. + pub name: BasinName, + /// Configuration for the basin. + /// + /// If `None`, the basin is created with default configuration or left unchanged if it exists. + pub config: Option, + /// Scope of the basin. + /// + /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set. + pub scope: Option, +} + +impl CreateOrReconfigureBasinInput { + /// Create a new [`CreateOrReconfigureBasinInput`] with the given basin name. + pub fn new(name: BasinName) -> Self { + Self { + name, + config: None, + scope: None, + } + } + + /// Set the configuration for the basin. + pub fn with_config(self, config: BasinConfig) -> Self { + Self { + config: Some(config), + ..self + } + } + + /// Set the scope of the basin. + pub fn with_scope(self, scope: BasinScope) -> Self { + Self { + scope: Some(scope), + ..self + } + } +} + +impl From + for ( + BasinName, + Option, + ) +{ + fn from(value: CreateOrReconfigureBasinInput) -> Self { + let request = if value.config.is_some() || value.scope.is_some() { + Some(api::basin::CreateOrReconfigureBasinRequest { + config: value.config.map(Into::into), + scope: value.scope.map(Into::into), + }) + } else { + None + }; + (value.name, request) + } +} + #[derive(Debug, Clone, Default)] #[non_exhaustive] /// Input for [`list_basins`](crate::S2::list_basins) operation. @@ -2577,6 +2665,40 @@ impl From for (api::stream::CreateStreamRequest, String) { } } +#[derive(Debug, Clone)] +#[non_exhaustive] +/// Input for [`create_or_reconfigure_stream`](crate::S2Basin::create_or_reconfigure_stream) +/// operation. +pub struct CreateOrReconfigureStreamInput { + /// Stream name. + pub name: StreamName, + /// Configuration for the stream. + /// + /// If `None`, the stream is created with default configuration or left unchanged if it exists. + pub config: Option, +} + +impl CreateOrReconfigureStreamInput { + /// Create a new [`CreateOrReconfigureStreamInput`] with the given stream name. + pub fn new(name: StreamName) -> Self { + Self { name, config: None } + } + + /// Set the configuration for the stream. + pub fn with_config(self, config: StreamConfig) -> Self { + Self { + config: Some(config), + ..self + } + } +} + +impl From for (StreamName, Option) { + fn from(value: CreateOrReconfigureStreamInput) -> Self { + (value.name, value.config.map(Into::into)) + } +} + #[derive(Debug, Clone)] #[non_exhaustive] /// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation. From 40f128a159bb45ea108b60082c509760f5acac0b Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 18:40:06 +0530 Subject: [PATCH 04/12] .. --- lite/src/backend/basins.rs | 32 ++++--- lite/src/backend/read.rs | 4 +- lite/src/backend/streams.rs | 37 +++++--- lite/src/init.rs | 185 ++++++++++++++++++++---------------- 4 files changed, 150 insertions(+), 108 deletions(-) diff --git a/lite/src/backend/basins.rs b/lite/src/backend/basins.rs index b4bdd9b6..2bea1bf2 100644 --- a/lite/src/backend/basins.rs +++ b/lite/src/backend/basins.rs @@ -75,22 +75,23 @@ impl Backend { pub async fn create_basin( &self, basin: BasinName, - config: BasinConfig, + config: impl Into, mode: CreateMode, ) -> Result, CreateBasinError> { + let config = config.into(); let meta_key = kv::basin_meta::ser_key(&basin); let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?; let new_creation_idempotency_key = match &mode { CreateMode::CreateOnly(Some(req_token)) => { - Some(creation_idempotency_key(req_token, &config)) + let resolved = BasinConfig::default().reconfigure(config.clone()); + Some(creation_idempotency_key(req_token, &resolved)) } _ => None, }; - let mut existing_created_at = None; - let mut existing_creation_idempotency_key = None; + let mut existing_meta_opt = None; if let Some(existing_meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? { @@ -112,18 +113,27 @@ impl Backend { }; } CreateMode::CreateOrReconfigure => { - existing_created_at = Some(existing_meta.created_at); - existing_creation_idempotency_key = existing_meta.creation_idempotency_key; + existing_meta_opt = Some(existing_meta); } } } - let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc); - let creation_idempotency_key = - existing_creation_idempotency_key.or(new_creation_idempotency_key); + let is_reconfigure = existing_meta_opt.is_some(); + let (resolved, created_at, creation_idempotency_key) = match existing_meta_opt { + Some(existing) => ( + existing.config.reconfigure(config), + existing.created_at, + existing.creation_idempotency_key, + ), + None => ( + BasinConfig::default().reconfigure(config), + OffsetDateTime::now_utc(), + new_creation_idempotency_key, + ), + }; let meta = kv::basin_meta::BasinMeta { - config, + config: resolved, created_at, deleted_at: None, creation_idempotency_key, @@ -142,7 +152,7 @@ impl Backend { state: BasinState::Active, }; - Ok(if existing_created_at.is_some() { + Ok(if is_reconfigure { CreatedOrReconfigured::Reconfigured(info) } else { CreatedOrReconfigured::Created(info) diff --git a/lite/src/backend/read.rs b/lite/src/backend/read.rs index af148377..fd19be48 100644 --- a/lite/src/backend/read.rs +++ b/lite/src/backend/read.rs @@ -349,7 +349,7 @@ mod tests { read_extent::{ReadLimit, ReadUntil}, types::{ basin::BasinName, - config::OptionalStreamConfig, + config::{BasinConfig, OptionalStreamConfig}, resources::CreateMode, stream::{ AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart, @@ -424,7 +424,7 @@ mod tests { backend .create_basin( basin.clone(), - Default::default(), + BasinConfig::default(), CreateMode::CreateOnly(None), ) .await diff --git a/lite/src/backend/streams.rs b/lite/src/backend/streams.rs index 81476cc6..9a7c5f6d 100644 --- a/lite/src/backend/streams.rs +++ b/lite/src/backend/streams.rs @@ -77,9 +77,10 @@ impl Backend { &self, basin: BasinName, stream: StreamName, - mut config: OptionalStreamConfig, + config: impl Into, mode: CreateMode, ) -> Result, CreateStreamError> { + let config = config.into(); let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?; let Some(basin_meta) = db_txn_get( @@ -100,12 +101,13 @@ impl Backend { let creation_idempotency_key = match &mode { CreateMode::CreateOnly(Some(req_token)) => { - Some(creation_idempotency_key(req_token, &config)) + let resolved = OptionalStreamConfig::default().reconfigure(config.clone()); + Some(creation_idempotency_key(req_token, &resolved)) } _ => None, }; - let mut existing_created_at = None; + let mut existing_meta_opt = None; let mut prior_doe_min_age = None; if let Some(existing_meta) = @@ -136,16 +138,27 @@ impl Backend { }; } CreateMode::CreateOrReconfigure => { - existing_created_at = Some(existing_meta.created_at); + existing_meta_opt = Some(existing_meta); } } } - config = config.merge(basin_meta.config.default_stream_config).into(); + let is_reconfigure = existing_meta_opt.is_some(); + let (resolved, created_at) = match existing_meta_opt { + Some(existing) => ( + existing.config.reconfigure(config), + existing.created_at, + ), + None => ( + OptionalStreamConfig::default().reconfigure(config), + OffsetDateTime::now_utc(), + ), + }; + let resolved: OptionalStreamConfig = + resolved.merge(basin_meta.config.default_stream_config).into(); - let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc); let meta = kv::stream_meta::StreamMeta { - config: config.clone(), + config: resolved.clone(), created_at, deleted_at: None, creation_idempotency_key, @@ -153,7 +166,7 @@ impl Backend { txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?; let stream_id = StreamId::new(&basin, &stream); - if existing_created_at.is_none() { + if !is_reconfigure { txn.put( kv::stream_id_mapping::ser_key(stream_id), kv::stream_id_mapping::ser_value(&basin, &stream), @@ -195,10 +208,8 @@ impl Backend { }; txn.commit_with_options(&WRITE_OPTS).await?; - if existing_created_at.is_some() - && let Some(client) = self.streamer_client_if_active(&basin, &stream) - { - client.advise_reconfig(config); + if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) { + client.advise_reconfig(resolved); } let info = StreamInfo { @@ -207,7 +218,7 @@ impl Backend { deleted_at: None, }; - Ok(if existing_created_at.is_some() { + Ok(if is_reconfigure { CreatedOrReconfigured::Reconfigured(info) } else { CreatedOrReconfigured::Created(info) diff --git a/lite/src/init.rs b/lite/src/init.rs index b5ab16b7..c87b87c2 100644 --- a/lite/src/init.rs +++ b/lite/src/init.rs @@ -4,22 +4,22 @@ use std::{path::Path, time::Duration}; -use s2_common::types::{ - basin::BasinName, - config::{ - BasinConfig, OptionalDeleteOnEmptyConfig, OptionalStreamConfig, OptionalTimestampingConfig, - RetentionPolicy, StorageClass, TimestampingMode, +use s2_common::{ + maybe::Maybe, + types::{ + basin::BasinName, + config::{ + BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass, + StreamReconfiguration, TimestampingMode, TimestampingReconfiguration, + }, + resources::CreateMode, + stream::StreamName, }, - resources::CreateMode, - stream::StreamName, }; use serde::Deserialize; use tracing::info; -use crate::backend::{ - Backend, - error::{CreateBasinError, CreateStreamError}, -}; +use crate::backend::Backend; #[derive(Debug, Deserialize, Default)] pub struct ResourcesSpec { @@ -164,37 +164,60 @@ impl<'de> Deserialize<'de> for HumanDuration { } } -impl From for BasinConfig { +impl From for BasinReconfiguration { fn from(s: BasinConfigSpec) -> Self { - BasinConfig { + BasinReconfiguration { default_stream_config: s .default_stream_config - .map(OptionalStreamConfig::from) - .unwrap_or_default(), - create_stream_on_append: s.create_stream_on_append.unwrap_or(false), - create_stream_on_read: s.create_stream_on_read.unwrap_or(false), + .map(|dsc| Some(StreamReconfiguration::from(dsc))) + .map_or(Maybe::Unspecified, Maybe::Specified), + create_stream_on_append: s + .create_stream_on_append + .map_or(Maybe::Unspecified, Maybe::Specified), + create_stream_on_read: s + .create_stream_on_read + .map_or(Maybe::Unspecified, Maybe::Specified), } } } -impl From for OptionalStreamConfig { +impl From for StreamReconfiguration { fn from(s: StreamConfigSpec) -> Self { - OptionalStreamConfig { - storage_class: s.storage_class.map(StorageClass::from), - retention_policy: s.retention_policy.map(|r| r.0), + StreamReconfiguration { + storage_class: s + .storage_class + .map(|sc| Some(StorageClass::from(sc))) + .map_or(Maybe::Unspecified, Maybe::Specified), + retention_policy: s + .retention_policy + .map(|rp| Some(rp.0)) + .map_or(Maybe::Unspecified, Maybe::Specified), timestamping: s .timestamping - .map(|t| OptionalTimestampingConfig { - mode: t.mode.map(TimestampingMode::from), - uncapped: t.uncapped, + .map(|ts| { + Some(TimestampingReconfiguration { + mode: ts + .mode + .map(|m| Some(TimestampingMode::from(m))) + .map_or(Maybe::Unspecified, Maybe::Specified), + uncapped: ts + .uncapped + .map(Some) + .map_or(Maybe::Unspecified, Maybe::Specified), + }) }) - .unwrap_or_default(), + .map_or(Maybe::Unspecified, Maybe::Specified), delete_on_empty: s .delete_on_empty - .map(|d| OptionalDeleteOnEmptyConfig { - min_age: d.min_age.map(|h| h.0), + .map(|doe| { + Some(DeleteOnEmptyReconfiguration { + min_age: doe + .min_age + .map(|h| Some(h.0)) + .map_or(Maybe::Unspecified, Maybe::Specified), + }) }) - .unwrap_or_default(), + .map_or(Maybe::Unspecified, Maybe::Specified), } } } @@ -214,26 +237,21 @@ pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> { .parse() .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?; - let mode = if basin_spec.config.is_some() { - CreateMode::CreateOrReconfigure - } else { - CreateMode::CreateOnly(None) - }; - let config = basin_spec.config.map(BasinConfig::from).unwrap_or_default(); + let reconfiguration = basin_spec + .config + .map(BasinReconfiguration::from) + .unwrap_or_default(); - match backend.create_basin(basin.clone(), config, mode).await { - Ok(_) => info!(basin = basin.as_ref(), "basin applied"), - Err(CreateBasinError::BasinAlreadyExists(_)) => { - info!(basin = basin.as_ref(), "basin already exists, skipping") - } - Err(e) => { - return Err(eyre::eyre!( - "failed to create/reconfigure basin {:?}: {}", - basin, - e - )); - } - } + backend + .create_basin( + basin.clone(), + reconfiguration, + CreateMode::CreateOrReconfigure, + ) + .await + .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?; + + info!(basin = basin.as_ref(), "basin applied"); for stream_spec in basin_spec.streams { let stream: StreamName = stream_spec @@ -241,39 +259,33 @@ pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> { .parse() .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?; - let mode = if stream_spec.config.is_some() { - CreateMode::CreateOrReconfigure - } else { - CreateMode::CreateOnly(None) - }; - let config = stream_spec + let reconfiguration = stream_spec .config - .map(OptionalStreamConfig::from) + .map(StreamReconfiguration::from) .unwrap_or_default(); - match backend - .create_stream(basin.clone(), stream.clone(), config, mode) + backend + .create_stream( + basin.clone(), + stream.clone(), + reconfiguration, + CreateMode::CreateOrReconfigure, + ) .await - { - Ok(_) => info!( - basin = basin.as_ref(), - stream = stream.as_ref(), - "stream applied" - ), - Err(CreateStreamError::StreamAlreadyExists(_)) => info!( - basin = basin.as_ref(), - stream = stream.as_ref(), - "stream already exists, skipping" - ), - Err(e) => { - return Err(eyre::eyre!( - "failed to create/reconfigure stream {:?}/{:?}: {}", - basin, - stream, + .map_err(|e| { + eyre::eyre!( + "failed to apply stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), e - )); - } - } + ) + })?; + + info!( + basin = basin.as_ref(), + stream = stream.as_ref(), + "stream applied" + ); } } Ok(()) @@ -408,9 +420,13 @@ mod tests { create_stream_on_append: Some(true), create_stream_on_read: None, }; - let config = BasinConfig::from(spec); - assert!(config.create_stream_on_append); - assert!(!config.create_stream_on_read); + let reconfig = BasinReconfiguration::from(spec); + assert!(matches!( + reconfig.create_stream_on_append, + Maybe::Specified(true) + )); + assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified)); + assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified)); } #[test] @@ -421,11 +437,16 @@ mod tests { timestamping: None, delete_on_empty: None, }; - let config = OptionalStreamConfig::from(spec); - assert!(matches!(config.storage_class, Some(StorageClass::Standard))); + let reconfig = StreamReconfiguration::from(spec); assert!(matches!( - config.retention_policy, - Some(RetentionPolicy::Infinite()) + reconfig.storage_class, + Maybe::Specified(Some(StorageClass::Standard)) + )); + assert!(matches!( + reconfig.retention_policy, + Maybe::Specified(Some(RetentionPolicy::Infinite())) )); + assert!(matches!(reconfig.timestamping, Maybe::Unspecified)); + assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified)); } } From 8d0d04ff689b8c12ecb8afcf09f6b4bf23fd7bdf Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 19:18:34 +0530 Subject: [PATCH 05/12] idea --- cli/src/apply.rs | 364 +++++++++++++++++++++++++++++++++++- cli/src/cli.rs | 3 + cli/src/main.rs | 18 +- lite/src/backend/streams.rs | 10 +- 4 files changed, 380 insertions(+), 15 deletions(-) diff --git a/cli/src/apply.rs b/cli/src/apply.rs index 47f793cf..465f2591 100644 --- a/cli/src/apply.rs +++ b/cli/src/apply.rs @@ -1,6 +1,6 @@ //! Declarative basin/stream configuration via a JSON spec file. -use std::path::Path; +use std::{path::Path, time::Duration}; use colored::Colorize; use s2_lite::init::{ @@ -11,8 +11,8 @@ use s2_sdk::{ S2, types::{ BasinConfig, BasinName, CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, - CreateOrReconfigured, DeleteOnEmptyConfig, RetentionPolicy, StorageClass, StreamConfig, - StreamName, TimestampingConfig, TimestampingMode, + CreateOrReconfigured, DeleteOnEmptyConfig, ErrorResponse, RetentionPolicy, S2Error, + StorageClass, StreamConfig, StreamName, TimestampingConfig, TimestampingMode, }, }; @@ -178,3 +178,361 @@ async fn apply_stream( } Ok(()) } + +enum ResourceAction { + Create, + Reconfigure(Vec), + Unchanged, +} + +struct FieldDiff { + field: &'static str, + old: String, + new: String, +} + +fn is_not_found_error(e: &S2Error) -> bool { + matches!(e, S2Error::Server(ErrorResponse { code, .. }) if code == "basin_not_found" || code == "stream_not_found") +} + +fn format_storage_class(sc: StorageClass) -> &'static str { + match sc { + StorageClass::Standard => "standard", + StorageClass::Express => "express", + } +} + +fn format_retention_policy(rp: RetentionPolicy) -> String { + match rp { + RetentionPolicy::Age(secs) => { + humantime::format_duration(Duration::from_secs(secs)).to_string() + } + RetentionPolicy::Infinite => "infinite".to_string(), + } +} + +fn format_timestamping_mode(m: TimestampingMode) -> &'static str { + match m { + TimestampingMode::ClientPrefer => "client-prefer", + TimestampingMode::ClientRequire => "client-require", + TimestampingMode::Arrival => "arrival", + } +} + +fn diff_basin_config(existing: &BasinConfig, spec: &BasinConfigSpec) -> Vec { + let mut diffs = Vec::new(); + + if let Some(v) = spec.create_stream_on_append + && existing.create_stream_on_append != v + { + diffs.push(FieldDiff { + field: "create_stream_on_append", + old: existing.create_stream_on_append.to_string(), + new: v.to_string(), + }); + } + + if let Some(v) = spec.create_stream_on_read + && existing.create_stream_on_read != v + { + diffs.push(FieldDiff { + field: "create_stream_on_read", + old: existing.create_stream_on_read.to_string(), + new: v.to_string(), + }); + } + + if let Some(ref spec_dsc) = spec.default_stream_config + && let Some(ref existing_dsc) = existing.default_stream_config + { + let stream_diffs = diff_stream_config(existing_dsc, spec_dsc); + for sd in stream_diffs { + diffs.push(FieldDiff { + field: sd.field, + old: sd.old, + new: sd.new, + }); + } + } + + diffs +} + +fn diff_stream_config(existing: &StreamConfig, spec: &StreamConfigSpec) -> Vec { + let mut diffs = Vec::new(); + + if let Some(ref sc) = spec.storage_class { + let existing_sc = existing.storage_class.unwrap_or(StorageClass::Standard); + let spec_sc = storage_class_to_sdk(sc.clone()); + if existing_sc != spec_sc { + diffs.push(FieldDiff { + field: "storage_class", + old: format_storage_class(existing_sc).to_string(), + new: format_storage_class(spec_sc).to_string(), + }); + } + } + + if let Some(ref rp) = spec.retention_policy { + let existing_rp = existing + .retention_policy + .unwrap_or(RetentionPolicy::Infinite); + let spec_rp = retention_policy_to_sdk(*rp); + if existing_rp != spec_rp { + diffs.push(FieldDiff { + field: "retention_policy", + old: format_retention_policy(existing_rp), + new: format_retention_policy(spec_rp), + }); + } + } + + if let Some(ref ts) = spec.timestamping + && let Some(ref existing_ts) = existing.timestamping + { + if let Some(ref mode) = ts.mode { + let spec_mode = timestamping_mode_to_sdk(mode.clone()); + if existing_ts.mode.unwrap_or(TimestampingMode::Arrival) != spec_mode { + diffs.push(FieldDiff { + field: "timestamping.mode", + old: format_timestamping_mode( + existing_ts.mode.unwrap_or(TimestampingMode::Arrival), + ) + .to_string(), + new: format_timestamping_mode(spec_mode).to_string(), + }); + } + } + if let Some(uncapped) = ts.uncapped + && existing_ts.uncapped != uncapped + { + diffs.push(FieldDiff { + field: "timestamping.uncapped", + old: existing_ts.uncapped.to_string(), + new: uncapped.to_string(), + }); + } + } + + if let Some(ref doe) = spec.delete_on_empty + && let Some(ref existing_doe) = existing.delete_on_empty + && let Some(ref min_age) = doe.min_age + && existing_doe.min_age_secs != min_age.0.as_secs() + { + diffs.push(FieldDiff { + field: "delete_on_empty.min_age", + old: humantime::format_duration(Duration::from_secs(existing_doe.min_age_secs)) + .to_string(), + new: humantime::format_duration(min_age.0).to_string(), + }); + } + + diffs +} + +fn spec_basin_fields(spec: &BasinConfigSpec) -> Vec { + let mut fields = Vec::new(); + + if let Some(v) = spec.create_stream_on_append { + fields.push(FieldDiff { + field: "create_stream_on_append", + old: String::new(), + new: v.to_string(), + }); + } + if let Some(v) = spec.create_stream_on_read { + fields.push(FieldDiff { + field: "create_stream_on_read", + old: String::new(), + new: v.to_string(), + }); + } + if let Some(ref dsc) = spec.default_stream_config { + for f in spec_stream_fields(dsc) { + fields.push(f); + } + } + + fields +} + +fn spec_stream_fields(spec: &StreamConfigSpec) -> Vec { + let mut fields = Vec::new(); + + if let Some(ref sc) = spec.storage_class { + fields.push(FieldDiff { + field: "storage_class", + old: String::new(), + new: format_storage_class(storage_class_to_sdk(sc.clone())).to_string(), + }); + } + if let Some(ref rp) = spec.retention_policy { + fields.push(FieldDiff { + field: "retention_policy", + old: String::new(), + new: format_retention_policy(retention_policy_to_sdk(*rp)), + }); + } + if let Some(ref ts) = spec.timestamping { + if let Some(ref mode) = ts.mode { + fields.push(FieldDiff { + field: "timestamping.mode", + old: String::new(), + new: format_timestamping_mode(timestamping_mode_to_sdk(mode.clone())).to_string(), + }); + } + if let Some(uncapped) = ts.uncapped { + fields.push(FieldDiff { + field: "timestamping.uncapped", + old: String::new(), + new: uncapped.to_string(), + }); + } + } + if let Some(ref doe) = spec.delete_on_empty + && let Some(ref min_age) = doe.min_age + { + fields.push(FieldDiff { + field: "delete_on_empty.min_age", + old: String::new(), + new: humantime::format_duration(min_age.0).to_string(), + }); + } + + fields +} + +fn print_basin_result(basin: &str, action: &ResourceAction) { + match action { + ResourceAction::Create => { + println!("{}", format!("+ basin {basin}").green().bold()); + } + ResourceAction::Reconfigure(diffs) => { + println!("{}", format!("~ basin {basin}").yellow().bold()); + for diff in diffs { + println!(" {}: {} → {}", diff.field, diff.old.dimmed(), diff.new); + } + } + ResourceAction::Unchanged => { + println!("{}", format!("= basin {basin}").dimmed()); + } + } +} + +fn print_stream_result(basin: &str, stream: &str, action: &ResourceAction) { + match action { + ResourceAction::Create => { + println!("{}", format!(" + stream {basin}/{stream}").green().bold()); + } + ResourceAction::Reconfigure(diffs) => { + println!("{}", format!(" ~ stream {basin}/{stream}").yellow().bold()); + for diff in diffs { + println!(" {}: {} → {}", diff.field, diff.old.dimmed(), diff.new); + } + } + ResourceAction::Unchanged => { + println!("{}", format!(" = stream {basin}/{stream}").dimmed()); + } + } +} + +fn print_basin_create(basin: &str, spec: &Option) { + println!("{}", format!("+ basin {basin}").green().bold()); + if let Some(config) = spec { + for field in spec_basin_fields(config) { + println!(" {}: {}", field.field, field.new); + } + } +} + +fn print_stream_create(basin: &str, stream: &str, spec: &Option) { + println!("{}", format!(" + stream {basin}/{stream}").green().bold()); + if let Some(config) = spec { + for field in spec_stream_fields(config) { + println!(" {}: {}", field.field, field.new); + } + } +} + +pub async fn dry_run(s2: &S2, spec: ResourcesSpec) -> miette::Result<()> { + for basin_spec in spec.basins { + let basin: BasinName = basin_spec + .name + .parse() + .map_err(|e| miette::miette!("invalid basin name {:?}: {}", basin_spec.name, e))?; + + let basin_action = match s2.get_basin_config(basin.clone()).await { + Ok(existing) => { + if let Some(ref config) = basin_spec.config { + let diffs = diff_basin_config(&existing, config); + if diffs.is_empty() { + ResourceAction::Unchanged + } else { + ResourceAction::Reconfigure(diffs) + } + } else { + ResourceAction::Unchanged + } + } + Err(e) if is_not_found_error(&e) => ResourceAction::Create, + Err(e) => { + return Err(miette::miette!( + "failed to check basin {:?}: {}", + basin.as_ref(), + e + )); + } + }; + + match &basin_action { + ResourceAction::Create => { + print_basin_create(basin.as_ref(), &basin_spec.config); + } + action => { + print_basin_result(basin.as_ref(), action); + } + } + + let basin_client = s2.basin(basin.clone()); + + for stream_spec in basin_spec.streams { + let stream: StreamName = stream_spec.name.parse().map_err(|e| { + miette::miette!("invalid stream name {:?}: {}", stream_spec.name, e) + })?; + + let stream_action = match basin_client.get_stream_config(stream.clone()).await { + Ok(existing) => { + if let Some(ref config) = stream_spec.config { + let diffs = diff_stream_config(&existing, config); + if diffs.is_empty() { + ResourceAction::Unchanged + } else { + ResourceAction::Reconfigure(diffs) + } + } else { + ResourceAction::Unchanged + } + } + Err(e) if is_not_found_error(&e) => ResourceAction::Create, + Err(e) => { + return Err(miette::miette!( + "failed to check stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + )); + } + }; + + match &stream_action { + ResourceAction::Create => { + print_stream_create(basin.as_ref(), stream.as_ref(), &stream_spec.config); + } + action => { + print_stream_result(basin.as_ref(), stream.as_ref(), action); + } + } + } + } + Ok(()) +} diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 1f39ce96..e80598b1 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -535,6 +535,9 @@ pub struct ApplyArgs { /// Path to a JSON spec file defining basins and streams to create or reconfigure. #[arg(short = 'f', long, value_name = "FILE")] pub file: PathBuf, + /// Preview changes without making any mutations. + #[arg(long)] + pub dry_run: bool, } #[derive(Args, Debug)] diff --git a/cli/src/main.rs b/cli/src/main.rs index 5b95ba8f..51d2124a 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -526,12 +526,18 @@ async fn run() -> Result<(), CliError> { } } - Command::Apply(ApplyArgs { file }) => { - let spec = apply::load(&file).map_err(|e| CliError::InvalidArgs(e.into()))?; - apply::apply(&s2, spec) - .await - .map_err(|e| CliError::Apply(e.to_string()))?; - eprintln!("{}", "✓ Done".green().bold()); + Command::Apply(ApplyArgs { file, dry_run }) => { + let spec = apply::load(&file).map_err(CliError::InvalidArgs)?; + if dry_run { + apply::dry_run(&s2, spec) + .await + .map_err(|e| CliError::Apply(e.to_string()))?; + } else { + apply::apply(&s2, spec) + .await + .map_err(|e| CliError::Apply(e.to_string()))?; + eprintln!("{}", "✓ Done".green().bold()); + } } Command::Bench(args) => { diff --git a/lite/src/backend/streams.rs b/lite/src/backend/streams.rs index 9a7c5f6d..c131b50c 100644 --- a/lite/src/backend/streams.rs +++ b/lite/src/backend/streams.rs @@ -145,17 +145,15 @@ impl Backend { let is_reconfigure = existing_meta_opt.is_some(); let (resolved, created_at) = match existing_meta_opt { - Some(existing) => ( - existing.config.reconfigure(config), - existing.created_at, - ), + Some(existing) => (existing.config.reconfigure(config), existing.created_at), None => ( OptionalStreamConfig::default().reconfigure(config), OffsetDateTime::now_utc(), ), }; - let resolved: OptionalStreamConfig = - resolved.merge(basin_meta.config.default_stream_config).into(); + let resolved: OptionalStreamConfig = resolved + .merge(basin_meta.config.default_stream_config) + .into(); let meta = kv::stream_meta::StreamMeta { config: resolved.clone(), From cf6909403972efb0550bd93f2ca7b76164c0d3d6 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 20:36:10 +0530 Subject: [PATCH 06/12] .. --- Cargo.lock | 42 +++++++++ Cargo.toml | 1 + cli/src/apply.rs | 8 ++ cli/src/cli.rs | 12 ++- cli/src/main.rs | 15 +++- lite/Cargo.toml | 1 + lite/src/init.rs | 230 ++++++++++++++++++++++++++++++++++++++++++++--- 7 files changed, 296 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 248d3a3f..7fe8b604 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1628,6 +1628,12 @@ dependencies = [ "winnow 0.6.26", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -4643,6 +4649,7 @@ dependencies = [ "rustls", "s2-api", "s2-common", + "schemars", "serde", "serde_json", "slatedb", @@ -4723,6 +4730,30 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "schemars" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.116", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -4815,6 +4846,17 @@ dependencies = [ "syn 2.0.116", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.116", +] + [[package]] name = "serde_json" version = "1.0.149" diff --git a/Cargo.toml b/Cargo.toml index 883f2c72..3348337a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ s2-api = { path = "api", version = "0.27" } s2-common = { path = "common", version = "0.28" } s2-lite = { path = "lite", version = "0.29" } s2-sdk = { path = "sdk", version = "0.24" } +schemars = "0.8" serde = "1.0" serde_json = "1.0" slatedb = "0.10" diff --git a/cli/src/apply.rs b/cli/src/apply.rs index 465f2591..734fb6fb 100644 --- a/cli/src/apply.rs +++ b/cli/src/apply.rs @@ -88,6 +88,10 @@ fn delete_on_empty_to_sdk(doe: DeleteOnEmptySpec) -> DeleteOnEmptyConfig { doec } +pub fn validate(spec: &ResourcesSpec) -> miette::Result<()> { + s2_lite::init::validate(spec).map_err(|e| miette::miette!("{}", e)) +} + pub fn load(path: &Path) -> miette::Result { let contents = std::fs::read_to_string(path) .map_err(|e| miette::miette!("failed to read spec file {:?}: {}", path.display(), e))?; @@ -97,6 +101,8 @@ pub fn load(path: &Path) -> miette::Result { } pub async fn apply(s2: &S2, spec: ResourcesSpec) -> miette::Result<()> { + validate(&spec)?; + for basin_spec in spec.basins { let basin: BasinName = basin_spec .name @@ -455,6 +461,8 @@ fn print_stream_create(basin: &str, stream: &str, spec: &Option miette::Result<()> { + validate(&spec)?; + for basin_spec in spec.basins { let basin: BasinName = basin_spec .name diff --git a/cli/src/cli.rs b/cli/src/cli.rs index e80598b1..a64bf796 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -533,11 +533,19 @@ pub struct TailArgs { #[derive(Args, Debug)] pub struct ApplyArgs { /// Path to a JSON spec file defining basins and streams to create or reconfigure. - #[arg(short = 'f', long, value_name = "FILE")] - pub file: PathBuf, + #[arg( + short = 'f', + long, + value_name = "FILE", + required_unless_present = "schema" + )] + pub file: Option, /// Preview changes without making any mutations. #[arg(long)] pub dry_run: bool, + /// Print the JSON Schema for the spec file format to stdout and exit. + #[arg(long)] + pub schema: bool, } #[derive(Args, Debug)] diff --git a/cli/src/main.rs b/cli/src/main.rs index 51d2124a..6f73e41d 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -526,7 +526,20 @@ async fn run() -> Result<(), CliError> { } } - Command::Apply(ApplyArgs { file, dry_run }) => { + Command::Apply(ApplyArgs { + file, + dry_run, + schema, + }) => { + if schema { + let schema = s2_lite::init::json_schema(); + println!( + "{}", + serde_json::to_string_pretty(&schema).expect("valid schema") + ); + return Ok(()); + } + let file = file.expect("--file is required when --schema is not set"); let spec = apply::load(&file).map_err(CliError::InvalidArgs)?; if dry_run { apply::dry_run(&s2, spec) diff --git a/lite/Cargo.toml b/lite/Cargo.toml index 69e06487..bea93b08 100644 --- a/lite/Cargo.toml +++ b/lite/Cargo.toml @@ -48,6 +48,7 @@ rcgen = { workspace = true } rustls = { workspace = true, features = ["aws-lc-rs"] } s2-api = { workspace = true, features = ["axum"] } s2-common = { workspace = true, features = ["clap"] } +schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } slatedb = { workspace = true } diff --git a/lite/src/init.rs b/lite/src/init.rs index c87b87c2..04b21cd2 100644 --- a/lite/src/init.rs +++ b/lite/src/init.rs @@ -16,18 +16,18 @@ use s2_common::{ stream::StreamName, }, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tracing::info; use crate::backend::Backend; -#[derive(Debug, Deserialize, Default)] +#[derive(Debug, Deserialize, Default, schemars::JsonSchema)] pub struct ResourcesSpec { #[serde(default)] pub basins: Vec, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, schemars::JsonSchema)] pub struct BasinSpec { pub name: String, #[serde(default)] @@ -36,42 +36,70 @@ pub struct BasinSpec { pub streams: Vec, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, schemars::JsonSchema)] pub struct StreamSpec { pub name: String, #[serde(default)] pub config: Option, } -#[derive(Debug, Clone, Deserialize, Default)] +#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)] pub struct BasinConfigSpec { #[serde(default)] pub default_stream_config: Option, + /// Create stream on append if it doesn't exist, using the default stream configuration. #[serde(default)] pub create_stream_on_append: Option, + /// Create stream on read if it doesn't exist, using the default stream configuration. #[serde(default)] pub create_stream_on_read: Option, } -#[derive(Debug, Clone, Deserialize, Default)] +#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)] pub struct StreamConfigSpec { + /// Storage class for recent writes. #[serde(default)] pub storage_class: Option, + /// Retention policy for the stream. If unspecified, the default is to retain records for 7 + /// days. #[serde(default)] pub retention_policy: Option, + /// Timestamping behavior. #[serde(default)] pub timestamping: Option, + /// Delete-on-empty configuration. #[serde(default)] pub delete_on_empty: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub enum StorageClassSpec { Standard, Express, } +impl schemars::JsonSchema for StorageClassSpec { + fn schema_name() -> String { + "StorageClassSpec".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some("Storage class for recent writes.".to_string()), + ..Default::default() + })), + enum_values: Some(vec![ + serde_json::Value::String("standard".to_string()), + serde_json::Value::String("express".to_string()), + ]), + ..Default::default() + }) + } +} + impl From for StorageClass { fn from(s: StorageClassSpec) -> Self { match s { @@ -111,15 +139,44 @@ impl<'de> Deserialize<'de> for RetentionPolicySpec { } } -#[derive(Debug, Clone, Deserialize)] +impl schemars::JsonSchema for RetentionPolicySpec { + fn schema_name() -> String { + "RetentionPolicySpec".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some( + "Retain records unless explicitly trimmed (\"infinite\"), or automatically \ + trim records older than the given duration (e.g. \"7days\", \"1week\")." + .to_string(), + ), + examples: vec![ + serde_json::Value::String("infinite".to_string()), + serde_json::Value::String("7days".to_string()), + serde_json::Value::String("1week".to_string()), + ], + ..Default::default() + })), + ..Default::default() + }) + } +} + +#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)] pub struct TimestampingSpec { + /// Timestamping mode for appends that influences how timestamps are handled. #[serde(default)] pub mode: Option, + /// Allow client-specified timestamps to exceed the arrival time. + /// If this is `false` or not set, client timestamps will be capped at the arrival time. #[serde(default)] pub uncapped: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub enum TimestampingModeSpec { ClientPrefer, @@ -127,6 +184,31 @@ pub enum TimestampingModeSpec { Arrival, } +impl schemars::JsonSchema for TimestampingModeSpec { + fn schema_name() -> String { + "TimestampingModeSpec".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some( + "Timestamping mode for appends that influences how timestamps are handled." + .to_string(), + ), + ..Default::default() + })), + enum_values: Some(vec![ + serde_json::Value::String("client-prefer".to_string()), + serde_json::Value::String("client-require".to_string()), + serde_json::Value::String("arrival".to_string()), + ]), + ..Default::default() + }) + } +} + impl From for TimestampingMode { fn from(m: TimestampingModeSpec) -> Self { match m { @@ -137,8 +219,10 @@ impl From for TimestampingMode { } } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)] pub struct DeleteOnEmptySpec { + /// Minimum age before an empty stream can be deleted. + /// Set to 0 (default) to disable delete-on-empty (don't delete automatically). #[serde(default)] pub min_age: Option, } @@ -164,6 +248,29 @@ impl<'de> Deserialize<'de> for HumanDuration { } } +impl schemars::JsonSchema for HumanDuration { + fn schema_name() -> String { + "HumanDuration".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some( + "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"".to_string(), + ), + examples: vec![ + serde_json::Value::String("1day".to_string()), + serde_json::Value::String("2h 30m".to_string()), + ], + ..Default::default() + })), + ..Default::default() + }) + } +} + impl From for BasinReconfiguration { fn from(s: BasinConfigSpec) -> Self { BasinReconfiguration { @@ -222,6 +329,48 @@ impl From for StreamReconfiguration { } } +pub fn json_schema() -> serde_json::Value { + serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap() +} + +pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> { + let mut errors = Vec::new(); + let mut seen_basins = std::collections::HashSet::new(); + + for basin_spec in &spec.basins { + if !seen_basins.insert(basin_spec.name.clone()) { + errors.push(format!("duplicate basin name {:?}", basin_spec.name)); + } + + if let Err(e) = basin_spec.name.parse::() { + errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e)); + continue; + } + + let mut seen_streams = std::collections::HashSet::new(); + for stream_spec in &basin_spec.streams { + if !seen_streams.insert(stream_spec.name.clone()) { + errors.push(format!( + "duplicate stream name {:?} in basin {:?}", + stream_spec.name, basin_spec.name + )); + } + if let Err(e) = stream_spec.name.parse::() { + errors.push(format!( + "invalid stream name {:?} in basin {:?}: {}", + stream_spec.name, basin_spec.name, e + )); + } + } + } + + if errors.is_empty() { + Ok(()) + } else { + Err(eyre::eyre!("{}", errors.join("\n"))) + } +} + pub fn load(path: &Path) -> eyre::Result { let contents = std::fs::read_to_string(path) .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?; @@ -231,6 +380,8 @@ pub fn load(path: &Path) -> eyre::Result { } pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> { + validate(&spec)?; + for basin_spec in spec.basins { let basin: BasinName = basin_spec .name @@ -429,6 +580,65 @@ mod tests { assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified)); } + #[test] + fn validate_valid_spec() { + let spec = parse_spec( + r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#, + ); + assert!(validate(&spec).is_ok()); + } + + #[test] + fn validate_invalid_basin_name() { + let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("invalid basin name")); + } + + #[test] + fn validate_invalid_stream_name() { + let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("invalid stream name")); + } + + #[test] + fn validate_duplicate_basin_names() { + let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("duplicate basin name")); + } + + #[test] + fn validate_duplicate_stream_names() { + let spec = parse_spec( + r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#, + ); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("duplicate stream name")); + } + + #[test] + fn validate_multiple_errors() { + let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#); + let err = validate(&spec).unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("invalid basin name")); + assert!(msg.contains("duplicate basin name")); + } + + #[test] + fn json_schema_is_valid() { + let schema = json_schema(); + assert!(schema.is_object()); + let schema_obj = schema.as_object().unwrap(); + // Should have at minimum a definitions/properties structure + assert!( + schema_obj.contains_key("definitions") || schema_obj.contains_key("properties"), + "schema should have definitions or properties" + ); + } + #[test] fn stream_config_conversion() { let spec = StreamConfigSpec { From 2ba13040f0b7de29481c40286087a17971fe8c40 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 23:35:42 +0530 Subject: [PATCH 07/12] . --- api/src/v1/basin.rs | 6 +- cli/src/apply.rs | 179 ++++++++++++++++++-------------- cli/src/cli.rs | 12 ++- lite/src/handlers/v1/basins.rs | 2 +- lite/src/handlers/v1/streams.rs | 6 +- sdk/src/api.rs | 20 +++- sdk/src/client.rs | 1 + sdk/src/ops.rs | 17 ++- sdk/src/types.rs | 31 ++++-- 9 files changed, 171 insertions(+), 103 deletions(-) diff --git a/api/src/v1/basin.rs b/api/src/v1/basin.rs index 7948008c..2f2513e6 100644 --- a/api/src/v1/basin.rs +++ b/api/src/v1/basin.rs @@ -4,7 +4,7 @@ use s2_common::types::{ }; use serde::{Deserialize, Serialize}; -use super::config::BasinConfig; +use super::config::{BasinConfig, BasinReconfiguration}; #[rustfmt::skip] #[derive(Debug, Clone, Serialize, Deserialize)] @@ -116,8 +116,8 @@ impl From for BasinState { #[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] pub struct CreateOrReconfigureBasinRequest { - /// Basin configuration. - pub config: Option, + /// Basin reconfiguration. + pub config: Option, /// Basin scope. /// This cannot be reconfigured. pub scope: Option, diff --git a/cli/src/apply.rs b/cli/src/apply.rs index 734fb6fb..19330c46 100644 --- a/cli/src/apply.rs +++ b/cli/src/apply.rs @@ -10,58 +10,29 @@ use s2_lite::init::{ use s2_sdk::{ S2, types::{ - BasinConfig, BasinName, CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, - CreateOrReconfigured, DeleteOnEmptyConfig, ErrorResponse, RetentionPolicy, S2Error, - StorageClass, StreamConfig, StreamName, TimestampingConfig, TimestampingMode, + BasinConfig, BasinName, BasinReconfiguration, CreateOrReconfigureBasinInput, + CreateOrReconfigureStreamInput, CreateOrReconfigured, DeleteOnEmptyConfig, + DeleteOnEmptyReconfiguration, ErrorResponse, RetentionPolicy, S2Error, StorageClass, + StreamConfig, StreamName, StreamReconfiguration, TimestampingConfig, TimestampingMode, + TimestampingReconfiguration, }, }; -fn stream_config_to_sdk(s: StreamConfigSpec) -> StreamConfig { - let mut c = StreamConfig::new(); - if let Some(sc) = s.storage_class { - c = c.with_storage_class(storage_class_to_sdk(sc)); - } - if let Some(rp) = s.retention_policy { - c = c.with_retention_policy(retention_policy_to_sdk(rp)); - } - if let Some(ts) = s.timestamping { - c = c.with_timestamping(timestamping_to_sdk(ts)); - } - if let Some(doe) = s.delete_on_empty { - c = c.with_delete_on_empty(delete_on_empty_to_sdk(doe)); - } - c -} - -fn basin_config_to_sdk(s: BasinConfigSpec) -> BasinConfig { - let mut c = BasinConfig::new(); - if let Some(dsc) = s.default_stream_config { - c = c.with_default_stream_config(stream_config_to_sdk(dsc)); - } - if let Some(v) = s.create_stream_on_append { - c = c.with_create_stream_on_append(v); - } - if let Some(v) = s.create_stream_on_read { - c = c.with_create_stream_on_read(v); - } - c -} - -fn storage_class_to_sdk(s: StorageClassSpec) -> StorageClass { +fn storage_class_from_spec(s: StorageClassSpec) -> StorageClass { match s { StorageClassSpec::Standard => StorageClass::Standard, StorageClassSpec::Express => StorageClass::Express, } } -fn retention_policy_to_sdk(rp: RetentionPolicySpec) -> RetentionPolicy { +fn retention_policy_from_spec(rp: RetentionPolicySpec) -> RetentionPolicy { match rp.age_secs() { Some(secs) => RetentionPolicy::Age(secs), None => RetentionPolicy::Infinite, } } -fn timestamping_mode_to_sdk(m: TimestampingModeSpec) -> TimestampingMode { +fn timestamping_mode_from_spec(m: TimestampingModeSpec) -> TimestampingMode { match m { TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer, TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire, @@ -69,23 +40,56 @@ fn timestamping_mode_to_sdk(m: TimestampingModeSpec) -> TimestampingMode { } } -fn timestamping_to_sdk(ts: TimestampingSpec) -> TimestampingConfig { - let mut tsc = TimestampingConfig::new(); +fn timestamping_reconfiguration_from_spec(ts: TimestampingSpec) -> TimestampingReconfiguration { + let mut tsr = TimestampingReconfiguration::new(); if let Some(m) = ts.mode { - tsc = tsc.with_mode(timestamping_mode_to_sdk(m)); + tsr = tsr.with_mode(timestamping_mode_from_spec(m)); } if let Some(u) = ts.uncapped { - tsc = tsc.with_uncapped(u); + tsr = tsr.with_uncapped(u); } - tsc + tsr } -fn delete_on_empty_to_sdk(doe: DeleteOnEmptySpec) -> DeleteOnEmptyConfig { - let mut doec = DeleteOnEmptyConfig::new(); +fn delete_on_empty_reconfiguration_from_spec( + doe: DeleteOnEmptySpec, +) -> DeleteOnEmptyReconfiguration { + let mut doer = DeleteOnEmptyReconfiguration::new(); if let Some(ma) = doe.min_age { - doec = doec.with_min_age(ma.0); + doer = doer.with_min_age(ma.0); } - doec + doer +} + +fn stream_reconfiguration_from_spec(s: StreamConfigSpec) -> StreamReconfiguration { + let mut r = StreamReconfiguration::new(); + if let Some(sc) = s.storage_class { + r = r.with_storage_class(storage_class_from_spec(sc)); + } + if let Some(rp) = s.retention_policy { + r = r.with_retention_policy(retention_policy_from_spec(rp)); + } + if let Some(ts) = s.timestamping { + r = r.with_timestamping(timestamping_reconfiguration_from_spec(ts)); + } + if let Some(doe) = s.delete_on_empty { + r = r.with_delete_on_empty(delete_on_empty_reconfiguration_from_spec(doe)); + } + r +} + +fn basin_reconfiguration_from_spec(s: BasinConfigSpec) -> BasinReconfiguration { + let mut r = BasinReconfiguration::new(); + if let Some(dsc) = s.default_stream_config { + r = r.with_default_stream_config(stream_reconfiguration_from_spec(dsc)); + } + if let Some(v) = s.create_stream_on_append { + r = r.with_create_stream_on_append(v); + } + if let Some(v) = s.create_stream_on_read { + r = r.with_create_stream_on_read(v); + } + r } pub fn validate(spec: &ResourcesSpec) -> miette::Result<()> { @@ -128,7 +132,7 @@ async fn apply_basin( ) -> miette::Result<()> { let mut input = CreateOrReconfigureBasinInput::new(basin.clone()); if let Some(c) = config { - input = input.with_config(basin_config_to_sdk(c)); + input = input.with_config(basin_reconfiguration_from_spec(c)); } match s2 .create_or_reconfigure_basin(input) @@ -156,7 +160,7 @@ async fn apply_stream( ) -> miette::Result<()> { let mut input = CreateOrReconfigureStreamInput::new(stream.clone()); if let Some(c) = config { - input = input.with_config(stream_config_to_sdk(c)); + input = input.with_config(stream_reconfiguration_from_spec(c)); } let basin_client = s2.basin(basin.clone()); match basin_client @@ -225,6 +229,32 @@ fn format_timestamping_mode(m: TimestampingMode) -> &'static str { } } +fn effective_storage_class(sc: Option) -> StorageClass { + // Server-side default when not explicitly configured. + sc.unwrap_or(StorageClass::Express) +} + +fn effective_retention_policy(rp: Option) -> RetentionPolicy { + // Server-side default when not explicitly configured. + rp.unwrap_or(RetentionPolicy::Age(7 * 24 * 60 * 60)) +} + +fn effective_timestamping_mode(ts: Option<&TimestampingConfig>) -> TimestampingMode { + // Server-side default when not explicitly configured. + ts.and_then(|cfg| cfg.mode) + .unwrap_or(TimestampingMode::ClientPrefer) +} + +fn effective_timestamping_uncapped(ts: Option<&TimestampingConfig>) -> bool { + // Server-side default when not explicitly configured. + ts.map(|cfg| cfg.uncapped).unwrap_or(false) +} + +fn effective_delete_on_empty_min_age_secs(doe: Option<&DeleteOnEmptyConfig>) -> u64 { + // Server-side default when not explicitly configured. + doe.map(|cfg| cfg.min_age_secs).unwrap_or(0) +} + fn diff_basin_config(existing: &BasinConfig, spec: &BasinConfigSpec) -> Vec { let mut diffs = Vec::new(); @@ -248,10 +278,9 @@ fn diff_basin_config(existing: &BasinConfig, spec: &BasinConfigSpec) -> Vec Vec Vec Vec Vec { fields.push(FieldDiff { field: "storage_class", old: String::new(), - new: format_storage_class(storage_class_to_sdk(sc.clone())).to_string(), + new: format_storage_class(storage_class_from_spec(sc.clone())).to_string(), }); } if let Some(ref rp) = spec.retention_policy { fields.push(FieldDiff { field: "retention_policy", old: String::new(), - new: format_retention_policy(retention_policy_to_sdk(*rp)), + new: format_retention_policy(retention_policy_from_spec(*rp)), }); } if let Some(ref ts) = spec.timestamping { @@ -384,7 +410,8 @@ fn spec_stream_fields(spec: &StreamConfigSpec) -> Vec { fields.push(FieldDiff { field: "timestamping.mode", old: String::new(), - new: format_timestamping_mode(timestamping_mode_to_sdk(mode.clone())).to_string(), + new: format_timestamping_mode(timestamping_mode_from_spec(mode.clone())) + .to_string(), }); } if let Some(uncapped) = ts.uncapped { diff --git a/cli/src/cli.rs b/cli/src/cli.rs index a64bf796..e20bf1f9 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -168,6 +168,11 @@ pub enum Command { /// are reconfigured to match the spec. Only the fields present in the spec are /// updated. /// + /// Dry-run output legend: + /// + create + /// ~ reconfigure + /// = unchanged + /// /// Example spec file: /// {"basins":[{"name":"my-basin","streams":[{"name":"events"}]}]} Apply(ApplyArgs), @@ -541,9 +546,14 @@ pub struct ApplyArgs { )] pub file: Option, /// Preview changes without making any mutations. + /// + /// Dry-run output legend: + /// + create + /// ~ reconfigure + /// = unchanged #[arg(long)] pub dry_run: bool, - /// Print the JSON Schema for the spec file format to stdout and exit. + /// Print the JSON Schema for the spec file format to stdout. #[arg(long)] pub schema: bool, } diff --git a/lite/src/handlers/v1/basins.rs b/lite/src/handlers/v1/basins.rs index a5b8d2bf..56eaf0bf 100644 --- a/lite/src/handlers/v1/basins.rs +++ b/lite/src/handlers/v1/basins.rs @@ -161,7 +161,7 @@ pub async fn create_or_reconfigure_basin( request: JsonOpt(request), }: CreateOrReconfigureArgs, ) -> Result<(StatusCode, Json), ServiceError> { - let config: BasinConfig = request + let config: BasinReconfiguration = request .and_then(|req| req.config) .map(TryInto::try_into) .transpose()? diff --git a/lite/src/handlers/v1/streams.rs b/lite/src/handlers/v1/streams.rs index f11f186a..13bee62c 100644 --- a/lite/src/handlers/v1/streams.rs +++ b/lite/src/handlers/v1/streams.rs @@ -180,7 +180,7 @@ pub struct CreateOrReconfigureArgs { basin: BasinName, #[from_request(via(Path))] stream: StreamName, - config: JsonOpt, + config: JsonOpt, } /// Create or reconfigure a stream. @@ -188,7 +188,7 @@ pub struct CreateOrReconfigureArgs { put, path = super::paths::streams::CREATE_OR_RECONFIGURE, tag = super::paths::streams::TAG, - request_body = Option, + request_body = Option, params(v1t::StreamNamePathSegment), responses( (status = StatusCode::OK, body = v1t::stream::StreamInfo), @@ -215,7 +215,7 @@ pub async fn create_or_reconfigure_stream( config: JsonOpt(config), }: CreateOrReconfigureArgs, ) -> Result<(StatusCode, Json), ServiceError> { - let config: OptionalStreamConfig = config + let config: StreamReconfiguration = config .map(TryInto::try_into) .transpose()? .unwrap_or_default(); diff --git a/sdk/src/api.rs b/sdk/src/api.rs index 73c75b7e..778a16b0 100644 --- a/sdk/src/api.rs +++ b/sdk/src/api.rs @@ -15,8 +15,7 @@ use s2_api::v1::{ ListAccessTokensResponse, }, basin::{ - BasinInfo, CreateBasinRequest, CreateOrReconfigureBasinRequest, ListBasinsRequest, - ListBasinsResponse, + BasinInfo, CreateBasinRequest, ListBasinsRequest, ListBasinsResponse, }, config::{BasinConfig, BasinReconfiguration, StreamConfig, StreamReconfiguration}, metrics::{ @@ -33,6 +32,8 @@ use secrecy::ExposeSecret; use tokio_util::codec::Decoder; use tracing::{debug, warn}; use url::Url; +#[cfg(feature = "_hidden")] +use s2_api::v1::basin::CreateOrReconfigureBasinRequest; use crate::{ client::{self, StreamingResponse, UnaryResponse}, @@ -143,13 +144,17 @@ impl AccountClient { Ok(response.json::()?) } + #[cfg(feature = "_hidden")] pub async fn create_or_reconfigure_basin( &self, name: BasinName, request: Option, ) -> Result<(bool, BasinInfo), ApiError> { let url = self.base_url.join(&format!("v1/basins/{name}"))?; - let request = self.put(url).json(&request).build()?; + let request = match request { + Some(body) => self.put(url).json(&body).build()?, + None => self.put(url).build()?, + }; let response = self.request(request).send().await?; let was_created = response.status() == StatusCode::CREATED; Ok((was_created, response.json::()?)) @@ -293,15 +298,19 @@ impl BasinClient { Ok(response.json::()?) } + #[cfg(feature = "_hidden")] pub async fn create_or_reconfigure_stream( &self, name: StreamName, - config: Option, + config: Option, ) -> Result<(bool, StreamInfo), ApiError> { let url = self .base_url .join(&format!("v1/streams/{}", urlencoding::encode(&name)))?; - let request = self.put(url).json(&config).build()?; + let request = match config { + Some(body) => self.put(url).json(&body).build()?, + None => self.put(url).build()?, + }; let response = self.request(request).send().await?; let was_created = response.status() == StatusCode::CREATED; Ok((was_created, response.json::()?)) @@ -787,6 +796,7 @@ impl BaseClient { .compression(self.compression) } + #[cfg(feature = "_hidden")] pub fn put(&self, url: Url) -> client::RequestBuilder { client::RequestBuilder::put(url) .timeout(self.request_timeout) diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 78ad9f10..75784ed4 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -213,6 +213,7 @@ impl RequestBuilder { Self::new(Method::PATCH, url) } + #[cfg(feature = "_hidden")] pub fn put(url: Url) -> Self { Self::new(Method::PUT, url) } diff --git a/sdk/src/ops.rs b/sdk/src/ops.rs index 7b6b3fbe..b4a006e7 100644 --- a/sdk/src/ops.rs +++ b/sdk/src/ops.rs @@ -2,17 +2,20 @@ use futures::StreamExt; #[cfg(feature = "_hidden")] use crate::client::Connect; +#[cfg(feature = "_hidden")] +use crate::types::{ + CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, CreateOrReconfigured, +}; use crate::{ api::{AccountClient, BaseClient, BasinClient}, producer::{Producer, ProducerConfig}, session::{self, AppendSession, AppendSessionConfig}, types::{ AccessTokenId, AccessTokenInfo, AppendAck, AppendInput, BasinConfig, BasinInfo, BasinName, - BasinState, CreateBasinInput, CreateOrReconfigureBasinInput, - CreateOrReconfigureStreamInput, CreateOrReconfigured, CreateStreamInput, DeleteBasinInput, - DeleteStreamInput, GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, - IssueAccessTokenInput, ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, - ListAllStreamsInput, ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput, + BasinState, CreateBasinInput, CreateStreamInput, DeleteBasinInput, DeleteStreamInput, + GetAccountMetricsInput, GetBasinMetricsInput, GetStreamMetricsInput, IssueAccessTokenInput, + ListAccessTokensInput, ListAllAccessTokensInput, ListAllBasinsInput, ListAllStreamsInput, + ListBasinsInput, ListStreamsInput, Metric, Page, ReadBatch, ReadInput, ReconfigureBasinInput, ReconfigureStreamInput, S2Config, S2Error, StreamConfig, StreamInfo, StreamName, StreamPosition, Streaming, }, @@ -111,6 +114,8 @@ impl S2 { /// /// Returns [`CreateOrReconfigured::Created`] with the basin info if the basin was newly /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed. + #[doc(hidden)] + #[cfg(feature = "_hidden")] pub async fn create_or_reconfigure_basin( &self, input: CreateOrReconfigureBasinInput, @@ -327,6 +332,8 @@ impl S2Basin { /// /// Returns [`CreateOrReconfigured::Created`] with the stream info if the stream was newly /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed. + #[doc(hidden)] + #[cfg(feature = "_hidden")] pub async fn create_or_reconfigure_stream( &self, input: CreateOrReconfigureStreamInput, diff --git a/sdk/src/types.rs b/sdk/src/types.rs index 09257b7e..fc261d81 100644 --- a/sdk/src/types.rs +++ b/sdk/src/types.rs @@ -866,6 +866,8 @@ impl From for api::basin::BasinScope { /// /// Indicates whether the resource was newly created or already existed and was /// reconfigured. Both variants hold the resource's current state. +#[doc(hidden)] +#[cfg(feature = "_hidden")] #[derive(Debug, Clone, PartialEq, Eq)] pub enum CreateOrReconfigured { /// Resource was newly created. @@ -874,6 +876,7 @@ pub enum CreateOrReconfigured { Reconfigured(T), } +#[cfg(feature = "_hidden")] impl CreateOrReconfigured { /// Returns `true` if the resource was newly created. pub fn is_created(&self) -> bool { @@ -949,19 +952,22 @@ impl From for (api::basin::CreateBasinRequest, String) { #[derive(Debug, Clone)] #[non_exhaustive] /// Input for [`create_or_reconfigure_basin`](crate::S2::create_or_reconfigure_basin) operation. +#[doc(hidden)] +#[cfg(feature = "_hidden")] pub struct CreateOrReconfigureBasinInput { /// Basin name. pub name: BasinName, - /// Configuration for the basin. + /// Reconfiguration for the basin. /// /// If `None`, the basin is created with default configuration or left unchanged if it exists. - pub config: Option, + pub config: Option, /// Scope of the basin. /// /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set. pub scope: Option, } +#[cfg(feature = "_hidden")] impl CreateOrReconfigureBasinInput { /// Create a new [`CreateOrReconfigureBasinInput`] with the given basin name. pub fn new(name: BasinName) -> Self { @@ -972,8 +978,8 @@ impl CreateOrReconfigureBasinInput { } } - /// Set the configuration for the basin. - pub fn with_config(self, config: BasinConfig) -> Self { + /// Set the reconfiguration for the basin. + pub fn with_config(self, config: BasinReconfiguration) -> Self { Self { config: Some(config), ..self @@ -989,6 +995,7 @@ impl CreateOrReconfigureBasinInput { } } +#[cfg(feature = "_hidden")] impl From for ( BasinName, @@ -2669,23 +2676,26 @@ impl From for (api::stream::CreateStreamRequest, String) { #[non_exhaustive] /// Input for [`create_or_reconfigure_stream`](crate::S2Basin::create_or_reconfigure_stream) /// operation. +#[doc(hidden)] +#[cfg(feature = "_hidden")] pub struct CreateOrReconfigureStreamInput { /// Stream name. pub name: StreamName, - /// Configuration for the stream. + /// Reconfiguration for the stream. /// /// If `None`, the stream is created with default configuration or left unchanged if it exists. - pub config: Option, + pub config: Option, } +#[cfg(feature = "_hidden")] impl CreateOrReconfigureStreamInput { /// Create a new [`CreateOrReconfigureStreamInput`] with the given stream name. pub fn new(name: StreamName) -> Self { Self { name, config: None } } - /// Set the configuration for the stream. - pub fn with_config(self, config: StreamConfig) -> Self { + /// Set the reconfiguration for the stream. + pub fn with_config(self, config: StreamReconfiguration) -> Self { Self { config: Some(config), ..self @@ -2693,7 +2703,10 @@ impl CreateOrReconfigureStreamInput { } } -impl From for (StreamName, Option) { +#[cfg(feature = "_hidden")] +impl From + for (StreamName, Option) +{ fn from(value: CreateOrReconfigureStreamInput) -> Self { (value.name, value.config.map(Into::into)) } From 6cd3371b36a92a595f9cc115cf7eca878ba7f6e6 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 23:38:35 +0530 Subject: [PATCH 08/12] . --- cli/src/apply.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cli/src/apply.rs b/cli/src/apply.rs index 19330c46..91f912cd 100644 --- a/cli/src/apply.rs +++ b/cli/src/apply.rs @@ -230,28 +230,23 @@ fn format_timestamping_mode(m: TimestampingMode) -> &'static str { } fn effective_storage_class(sc: Option) -> StorageClass { - // Server-side default when not explicitly configured. sc.unwrap_or(StorageClass::Express) } fn effective_retention_policy(rp: Option) -> RetentionPolicy { - // Server-side default when not explicitly configured. rp.unwrap_or(RetentionPolicy::Age(7 * 24 * 60 * 60)) } fn effective_timestamping_mode(ts: Option<&TimestampingConfig>) -> TimestampingMode { - // Server-side default when not explicitly configured. ts.and_then(|cfg| cfg.mode) .unwrap_or(TimestampingMode::ClientPrefer) } fn effective_timestamping_uncapped(ts: Option<&TimestampingConfig>) -> bool { - // Server-side default when not explicitly configured. ts.map(|cfg| cfg.uncapped).unwrap_or(false) } fn effective_delete_on_empty_min_age_secs(doe: Option<&DeleteOnEmptyConfig>) -> u64 { - // Server-side default when not explicitly configured. doe.map(|cfg| cfg.min_age_secs).unwrap_or(0) } From d850cb5675c3c948404f5227f21bba764e5bb787 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Tue, 24 Feb 2026 23:42:06 +0530 Subject: [PATCH 09/12] .. --- cli/src/cli.rs | 12 ++++++------ cli/src/tui/ui.rs | 5 ++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/cli/src/cli.rs b/cli/src/cli.rs index e20bf1f9..d945c50f 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -169,9 +169,9 @@ pub enum Command { /// updated. /// /// Dry-run output legend: - /// + create - /// ~ reconfigure - /// = unchanged + /// `+` create + /// `~` reconfigure + /// `=` unchanged /// /// Example spec file: /// {"basins":[{"name":"my-basin","streams":[{"name":"events"}]}]} @@ -548,9 +548,9 @@ pub struct ApplyArgs { /// Preview changes without making any mutations. /// /// Dry-run output legend: - /// + create - /// ~ reconfigure - /// = unchanged + /// `+` create + /// `~` reconfigure + /// `=` unchanged #[arg(long)] pub dry_run: bool, /// Print the JSON Schema for the spec file format to stdout. diff --git a/cli/src/tui/ui.rs b/cli/src/tui/ui.rs index efe6db7b..4b112233 100644 --- a/cli/src/tui/ui.rs +++ b/cli/src/tui/ui.rs @@ -3771,7 +3771,10 @@ fn draw_append_view(f: &mut Frame, area: Rect, state: &AppendViewState) { // Show progress if appending from file if let Some((done, total)) = state.file_append_progress { - let pct = if total > 0 { (done * 100) / total } else { 0 }; + let pct = done + .checked_mul(100) + .and_then(|v| v.checked_div(total)) + .unwrap_or(0); lines.push(Line::from(vec![ Span::styled(" ", Style::default()), Span::styled( From 9ca95aecd04bad9c6f6f6d6e3235149a1b0dd013 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Wed, 25 Feb 2026 00:01:51 +0530 Subject: [PATCH 10/12] ci --- .github/workflows/ci.yml | 20 ++++ cli/schema.json | 219 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 239 insertions(+) create mode 100644 cli/schema.json diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 92ad0c8a..f85f4edd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,6 +15,7 @@ jobs: runs-on: ubuntu-latest outputs: sdk: ${{ steps.filter.outputs.sdk }} + apply_schema: ${{ steps.filter.outputs.apply_schema }} steps: - uses: actions/checkout@v4 - uses: dorny/paths-filter@v3 @@ -23,6 +24,25 @@ jobs: filters: | sdk: - 'sdk/**' + apply_schema: + - 'cli/src/cli.rs' + - 'cli/src/main.rs' + - 'lite/src/init.rs' + - 'cli/schema.json' + + cli-schema-drift: + name: CLI Schema Drift + needs: [changes] + if: needs.changes.outputs.apply_schema == 'true' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - name: Generate apply schema + run: cargo run -q -p s2-cli -- apply --schema > /tmp/apply.schema.json + - name: Check for schema drift + run: diff -u cli/schema.json /tmp/apply.schema.json fmt: name: Format diff --git a/cli/schema.json b/cli/schema.json new file mode 100644 index 00000000..860d2114 --- /dev/null +++ b/cli/schema.json @@ -0,0 +1,219 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ResourcesSpec", + "type": "object", + "properties": { + "basins": { + "type": "array", + "items": { + "$ref": "#/definitions/BasinSpec" + } + } + }, + "definitions": { + "BasinConfigSpec": { + "type": "object", + "properties": { + "create_stream_on_append": { + "description": "Create stream on append if it doesn't exist, using the default stream configuration.", + "default": null, + "type": [ + "boolean", + "null" + ] + }, + "create_stream_on_read": { + "description": "Create stream on read if it doesn't exist, using the default stream configuration.", + "default": null, + "type": [ + "boolean", + "null" + ] + }, + "default_stream_config": { + "anyOf": [ + { + "$ref": "#/definitions/StreamConfigSpec" + }, + { + "type": "null" + } + ] + } + } + }, + "BasinSpec": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "anyOf": [ + { + "$ref": "#/definitions/BasinConfigSpec" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + }, + "streams": { + "type": "array", + "items": { + "$ref": "#/definitions/StreamSpec" + } + } + } + }, + "DeleteOnEmptySpec": { + "type": "object", + "properties": { + "min_age": { + "description": "Minimum age before an empty stream can be deleted. Set to 0 (default) to disable delete-on-empty (don't delete automatically).", + "anyOf": [ + { + "$ref": "#/definitions/HumanDuration" + }, + { + "type": "null" + } + ] + } + } + }, + "HumanDuration": { + "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"", + "examples": [ + "1day", + "2h 30m" + ], + "type": "string" + }, + "RetentionPolicySpec": { + "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically trim records older than the given duration (e.g. \"7days\", \"1week\").", + "examples": [ + "infinite", + "7days", + "1week" + ], + "type": "string" + }, + "StorageClassSpec": { + "description": "Storage class for recent writes.", + "type": "string", + "enum": [ + "standard", + "express" + ] + }, + "StreamConfigSpec": { + "type": "object", + "properties": { + "delete_on_empty": { + "description": "Delete-on-empty configuration.", + "anyOf": [ + { + "$ref": "#/definitions/DeleteOnEmptySpec" + }, + { + "type": "null" + } + ] + }, + "retention_policy": { + "description": "Retention policy for the stream. If unspecified, the default is to retain records for 7 days.", + "anyOf": [ + { + "$ref": "#/definitions/RetentionPolicySpec" + }, + { + "type": "null" + } + ] + }, + "storage_class": { + "description": "Storage class for recent writes.", + "default": null, + "anyOf": [ + { + "$ref": "#/definitions/StorageClassSpec" + }, + { + "type": "null" + } + ] + }, + "timestamping": { + "description": "Timestamping behavior.", + "anyOf": [ + { + "$ref": "#/definitions/TimestampingSpec" + }, + { + "type": "null" + } + ] + } + } + }, + "StreamSpec": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "anyOf": [ + { + "$ref": "#/definitions/StreamConfigSpec" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + } + } + }, + "TimestampingModeSpec": { + "description": "Timestamping mode for appends that influences how timestamps are handled.", + "type": "string", + "enum": [ + "client-prefer", + "client-require", + "arrival" + ] + }, + "TimestampingSpec": { + "type": "object", + "properties": { + "mode": { + "description": "Timestamping mode for appends that influences how timestamps are handled.", + "default": null, + "anyOf": [ + { + "$ref": "#/definitions/TimestampingModeSpec" + }, + { + "type": "null" + } + ] + }, + "uncapped": { + "description": "Allow client-specified timestamps to exceed the arrival time. If this is `false` or not set, client timestamps will be capped at the arrival time.", + "default": null, + "type": [ + "boolean", + "null" + ] + } + } + } + } +} From 20fe53cea6339e8beee86f84e15e1056de9e17b3 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Wed, 25 Feb 2026 00:16:20 +0530 Subject: [PATCH 11/12] .. --- cli/schema.json | 18 ++++++++++++------ cli/src/cli.rs | 8 +++++++- lite/src/init.rs | 6 ++++++ 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/cli/schema.json b/cli/schema.json index 860d2114..3951b843 100644 --- a/cli/schema.json +++ b/cli/schema.json @@ -40,7 +40,8 @@ } ] } - } + }, + "additionalProperties": false }, "BasinSpec": { "type": "object", @@ -67,7 +68,8 @@ "$ref": "#/definitions/StreamSpec" } } - } + }, + "additionalProperties": false }, "DeleteOnEmptySpec": { "type": "object", @@ -83,7 +85,8 @@ } ] } - } + }, + "additionalProperties": false }, "HumanDuration": { "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"", @@ -158,7 +161,8 @@ } ] } - } + }, + "additionalProperties": false }, "StreamSpec": { "type": "object", @@ -179,7 +183,8 @@ "name": { "type": "string" } - } + }, + "additionalProperties": false }, "TimestampingModeSpec": { "description": "Timestamping mode for appends that influences how timestamps are handled.", @@ -213,7 +218,8 @@ "null" ] } - } + }, + "additionalProperties": false } } } diff --git a/cli/src/cli.rs b/cli/src/cli.rs index d945c50f..236ef2f1 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -173,8 +173,14 @@ pub enum Command { /// `~` reconfigure /// `=` unchanged /// + /// For IDE validation/autocomplete, add `$schema` at the top of each spec file: + /// {"$schema":"https://raw.githubusercontent.com/s2-streamstore/s2/main/cli/schema.json","basins":[]} + /// + /// For local-only use, point to a local path/URI instead: + /// {"$schema":"./cli/schema.json","basins":[]} + /// /// Example spec file: - /// {"basins":[{"name":"my-basin","streams":[{"name":"events"}]}]} + /// {"$schema":"https://raw.githubusercontent.com/s2-streamstore/s2/main/cli/schema.json","basins":[{"name":"my-basin","streams":[{"name":"events"}]}]} Apply(ApplyArgs), /// Run S2 Lite server backed by object storage. diff --git a/lite/src/init.rs b/lite/src/init.rs index 04b21cd2..b5b8d1c1 100644 --- a/lite/src/init.rs +++ b/lite/src/init.rs @@ -28,6 +28,7 @@ pub struct ResourcesSpec { } #[derive(Debug, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] pub struct BasinSpec { pub name: String, #[serde(default)] @@ -37,6 +38,7 @@ pub struct BasinSpec { } #[derive(Debug, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] pub struct StreamSpec { pub name: String, #[serde(default)] @@ -44,6 +46,7 @@ pub struct StreamSpec { } #[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] pub struct BasinConfigSpec { #[serde(default)] pub default_stream_config: Option, @@ -56,6 +59,7 @@ pub struct BasinConfigSpec { } #[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] pub struct StreamConfigSpec { /// Storage class for recent writes. #[serde(default)] @@ -166,6 +170,7 @@ impl schemars::JsonSchema for RetentionPolicySpec { } #[derive(Debug, Clone, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] pub struct TimestampingSpec { /// Timestamping mode for appends that influences how timestamps are handled. #[serde(default)] @@ -220,6 +225,7 @@ impl From for TimestampingMode { } #[derive(Debug, Clone, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] pub struct DeleteOnEmptySpec { /// Minimum age before an empty stream can be deleted. /// Set to 0 (default) to disable delete-on-empty (don't delete automatically). From d01030d74267c6bf3fb80498cfd58771dc054fd6 Mon Sep 17 00:00:00 2001 From: Mehul Arora Date: Wed, 25 Feb 2026 00:39:43 +0530 Subject: [PATCH 12/12] .. --- cli/src/main.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cli/src/main.rs b/cli/src/main.rs index 6f73e41d..15d79be4 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -140,6 +140,15 @@ async fn run() -> Result<(), CliError> { return Ok(()); } + if let Command::Apply(ApplyArgs { schema: true, .. }) = &command { + let schema = s2_lite::init::json_schema(); + println!( + "{}", + serde_json::to_string_pretty(&schema).expect("valid schema") + ); + return Ok(()); + } + let cli_config = load_cli_config()?; let sdk_config = sdk_config(&cli_config)?; let s2 = S2::new(sdk_config.clone()).map_err(CliError::SdkInit)?; @@ -529,16 +538,8 @@ async fn run() -> Result<(), CliError> { Command::Apply(ApplyArgs { file, dry_run, - schema, + schema: _, }) => { - if schema { - let schema = s2_lite::init::json_schema(); - println!( - "{}", - serde_json::to_string_pretty(&schema).expect("valid schema") - ); - return Ok(()); - } let file = file.expect("--file is required when --schema is not set"); let spec = apply::load(&file).map_err(CliError::InvalidArgs)?; if dry_run {