diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aecc938..c69d9e4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,6 +15,7 @@ jobs: runs-on: ubuntu-latest outputs: sdk: ${{ steps.filter.outputs.sdk }} + apply_schema: ${{ steps.filter.outputs.apply_schema }} steps: - uses: actions/checkout@v4 - uses: dorny/paths-filter@v3 @@ -23,6 +24,25 @@ jobs: filters: | sdk: - 'sdk/**' + apply_schema: + - 'cli/src/cli.rs' + - 'cli/src/main.rs' + - 'lite/src/init.rs' + - 'cli/schema.json' + + cli-schema-drift: + name: CLI Schema Drift + needs: [changes] + if: needs.changes.outputs.apply_schema == 'true' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - name: Generate apply schema + run: cargo run -q -p s2-cli -- apply --schema > /tmp/apply.schema.json + - name: Check for schema drift + run: diff -u cli/schema.json /tmp/apply.schema.json fmt: name: Format diff --git a/Cargo.lock b/Cargo.lock index 727b9e5..7fe8b60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1628,6 +1628,12 @@ dependencies = [ "winnow 0.6.26", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -4630,6 +4636,7 @@ dependencies = [ "eyre", "futures", "http 1.4.0", + "humantime", "indexmap", "itertools", "mimalloc", @@ -4642,6 +4649,7 @@ dependencies = [ "rustls", "s2-api", "s2-common", + "schemars", "serde", "serde_json", "slatedb", @@ -4722,6 +4730,30 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "schemars" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.116", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -4814,6 +4846,17 @@ dependencies = [ "syn 2.0.116", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.116", +] + [[package]] name = "serde_json" version = "1.0.149" diff --git a/Cargo.toml b/Cargo.toml index 883f2c7..3348337 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ s2-api = { path = "api", version = "0.27" } s2-common = { path = "common", version = "0.28" } s2-lite = { path = "lite", version = "0.29" } s2-sdk = { path = "sdk", version = "0.24" } +schemars = "0.8" serde = "1.0" serde_json = "1.0" slatedb = "0.10" diff --git a/api/src/v1/basin.rs b/api/src/v1/basin.rs index 7948008..2f2513e 100644 --- a/api/src/v1/basin.rs +++ b/api/src/v1/basin.rs @@ -4,7 +4,7 @@ use s2_common::types::{ }; use serde::{Deserialize, Serialize}; -use super::config::BasinConfig; +use super::config::{BasinConfig, BasinReconfiguration}; #[rustfmt::skip] #[derive(Debug, Clone, Serialize, Deserialize)] @@ -116,8 +116,8 @@ impl From for BasinState { #[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] pub struct CreateOrReconfigureBasinRequest { - /// Basin configuration. - pub config: Option, + /// Basin reconfiguration. + pub config: Option, /// Basin scope. /// This cannot be reconfigured. pub scope: Option, diff --git a/cli/schema.json b/cli/schema.json new file mode 100644 index 0000000..3951b84 --- /dev/null +++ b/cli/schema.json @@ -0,0 +1,225 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ResourcesSpec", + "type": "object", + "properties": { + "basins": { + "type": "array", + "items": { + "$ref": "#/definitions/BasinSpec" + } + } + }, + "definitions": { + "BasinConfigSpec": { + "type": "object", + "properties": { + "create_stream_on_append": { + "description": "Create stream on append if it doesn't exist, using the default stream configuration.", + "default": null, + "type": [ + "boolean", + "null" + ] + }, + "create_stream_on_read": { + "description": "Create stream on read if it doesn't exist, using the default stream configuration.", + "default": null, + "type": [ + "boolean", + "null" + ] + }, + "default_stream_config": { + "anyOf": [ + { + "$ref": "#/definitions/StreamConfigSpec" + }, + { + "type": "null" + } + ] + } + }, + "additionalProperties": false + }, + "BasinSpec": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "anyOf": [ + { + "$ref": "#/definitions/BasinConfigSpec" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + }, + "streams": { + "type": "array", + "items": { + "$ref": "#/definitions/StreamSpec" + } + } + }, + "additionalProperties": false + }, + "DeleteOnEmptySpec": { + "type": "object", + "properties": { + "min_age": { + "description": "Minimum age before an empty stream can be deleted. Set to 0 (default) to disable delete-on-empty (don't delete automatically).", + "anyOf": [ + { + "$ref": "#/definitions/HumanDuration" + }, + { + "type": "null" + } + ] + } + }, + "additionalProperties": false + }, + "HumanDuration": { + "description": "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"", + "examples": [ + "1day", + "2h 30m" + ], + "type": "string" + }, + "RetentionPolicySpec": { + "description": "Retain records unless explicitly trimmed (\"infinite\"), or automatically trim records older than the given duration (e.g. \"7days\", \"1week\").", + "examples": [ + "infinite", + "7days", + "1week" + ], + "type": "string" + }, + "StorageClassSpec": { + "description": "Storage class for recent writes.", + "type": "string", + "enum": [ + "standard", + "express" + ] + }, + "StreamConfigSpec": { + "type": "object", + "properties": { + "delete_on_empty": { + "description": "Delete-on-empty configuration.", + "anyOf": [ + { + "$ref": "#/definitions/DeleteOnEmptySpec" + }, + { + "type": "null" + } + ] + }, + "retention_policy": { + "description": "Retention policy for the stream. If unspecified, the default is to retain records for 7 days.", + "anyOf": [ + { + "$ref": "#/definitions/RetentionPolicySpec" + }, + { + "type": "null" + } + ] + }, + "storage_class": { + "description": "Storage class for recent writes.", + "default": null, + "anyOf": [ + { + "$ref": "#/definitions/StorageClassSpec" + }, + { + "type": "null" + } + ] + }, + "timestamping": { + "description": "Timestamping behavior.", + "anyOf": [ + { + "$ref": "#/definitions/TimestampingSpec" + }, + { + "type": "null" + } + ] + } + }, + "additionalProperties": false + }, + "StreamSpec": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "config": { + "anyOf": [ + { + "$ref": "#/definitions/StreamConfigSpec" + }, + { + "type": "null" + } + ] + }, + "name": { + "type": "string" + } + }, + "additionalProperties": false + }, + "TimestampingModeSpec": { + "description": "Timestamping mode for appends that influences how timestamps are handled.", + "type": "string", + "enum": [ + "client-prefer", + "client-require", + "arrival" + ] + }, + "TimestampingSpec": { + "type": "object", + "properties": { + "mode": { + "description": "Timestamping mode for appends that influences how timestamps are handled.", + "default": null, + "anyOf": [ + { + "$ref": "#/definitions/TimestampingModeSpec" + }, + { + "type": "null" + } + ] + }, + "uncapped": { + "description": "Allow client-specified timestamps to exceed the arrival time. If this is `false` or not set, client timestamps will be capped at the arrival time.", + "default": null, + "type": [ + "boolean", + "null" + ] + } + }, + "additionalProperties": false + } + } +} diff --git a/cli/src/apply.rs b/cli/src/apply.rs new file mode 100644 index 0000000..91f912c --- /dev/null +++ b/cli/src/apply.rs @@ -0,0 +1,568 @@ +//! Declarative basin/stream configuration via a JSON spec file. + +use std::{path::Path, time::Duration}; + +use colored::Colorize; +use s2_lite::init::{ + BasinConfigSpec, DeleteOnEmptySpec, ResourcesSpec, RetentionPolicySpec, StorageClassSpec, + StreamConfigSpec, TimestampingModeSpec, TimestampingSpec, +}; +use s2_sdk::{ + S2, + types::{ + BasinConfig, BasinName, BasinReconfiguration, CreateOrReconfigureBasinInput, + CreateOrReconfigureStreamInput, CreateOrReconfigured, DeleteOnEmptyConfig, + DeleteOnEmptyReconfiguration, ErrorResponse, RetentionPolicy, S2Error, StorageClass, + StreamConfig, StreamName, StreamReconfiguration, TimestampingConfig, TimestampingMode, + TimestampingReconfiguration, + }, +}; + +fn storage_class_from_spec(s: StorageClassSpec) -> StorageClass { + match s { + StorageClassSpec::Standard => StorageClass::Standard, + StorageClassSpec::Express => StorageClass::Express, + } +} + +fn retention_policy_from_spec(rp: RetentionPolicySpec) -> RetentionPolicy { + match rp.age_secs() { + Some(secs) => RetentionPolicy::Age(secs), + None => RetentionPolicy::Infinite, + } +} + +fn timestamping_mode_from_spec(m: TimestampingModeSpec) -> TimestampingMode { + match m { + TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer, + TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire, + TimestampingModeSpec::Arrival => TimestampingMode::Arrival, + } +} + +fn timestamping_reconfiguration_from_spec(ts: TimestampingSpec) -> TimestampingReconfiguration { + let mut tsr = TimestampingReconfiguration::new(); + if let Some(m) = ts.mode { + tsr = tsr.with_mode(timestamping_mode_from_spec(m)); + } + if let Some(u) = ts.uncapped { + tsr = tsr.with_uncapped(u); + } + tsr +} + +fn delete_on_empty_reconfiguration_from_spec( + doe: DeleteOnEmptySpec, +) -> DeleteOnEmptyReconfiguration { + let mut doer = DeleteOnEmptyReconfiguration::new(); + if let Some(ma) = doe.min_age { + doer = doer.with_min_age(ma.0); + } + doer +} + +fn stream_reconfiguration_from_spec(s: StreamConfigSpec) -> StreamReconfiguration { + let mut r = StreamReconfiguration::new(); + if let Some(sc) = s.storage_class { + r = r.with_storage_class(storage_class_from_spec(sc)); + } + if let Some(rp) = s.retention_policy { + r = r.with_retention_policy(retention_policy_from_spec(rp)); + } + if let Some(ts) = s.timestamping { + r = r.with_timestamping(timestamping_reconfiguration_from_spec(ts)); + } + if let Some(doe) = s.delete_on_empty { + r = r.with_delete_on_empty(delete_on_empty_reconfiguration_from_spec(doe)); + } + r +} + +fn basin_reconfiguration_from_spec(s: BasinConfigSpec) -> BasinReconfiguration { + let mut r = BasinReconfiguration::new(); + if let Some(dsc) = s.default_stream_config { + r = r.with_default_stream_config(stream_reconfiguration_from_spec(dsc)); + } + if let Some(v) = s.create_stream_on_append { + r = r.with_create_stream_on_append(v); + } + if let Some(v) = s.create_stream_on_read { + r = r.with_create_stream_on_read(v); + } + r +} + +pub fn validate(spec: &ResourcesSpec) -> miette::Result<()> { + s2_lite::init::validate(spec).map_err(|e| miette::miette!("{}", e)) +} + +pub fn load(path: &Path) -> miette::Result { + let contents = std::fs::read_to_string(path) + .map_err(|e| miette::miette!("failed to read spec file {:?}: {}", path.display(), e))?; + let spec: ResourcesSpec = serde_json::from_str(&contents) + .map_err(|e| miette::miette!("failed to parse spec file {:?}: {}", path.display(), e))?; + Ok(spec) +} + +pub async fn apply(s2: &S2, spec: ResourcesSpec) -> miette::Result<()> { + validate(&spec)?; + + for basin_spec in spec.basins { + let basin: BasinName = basin_spec + .name + .parse() + .map_err(|e| miette::miette!("invalid basin name {:?}: {}", basin_spec.name, e))?; + + apply_basin(s2, basin.clone(), basin_spec.config).await?; + + for stream_spec in basin_spec.streams { + let stream: StreamName = stream_spec.name.parse().map_err(|e| { + miette::miette!("invalid stream name {:?}: {}", stream_spec.name, e) + })?; + apply_stream(s2, basin.clone(), stream, stream_spec.config).await?; + } + } + Ok(()) +} + +async fn apply_basin( + s2: &S2, + basin: BasinName, + config: Option, +) -> miette::Result<()> { + let mut input = CreateOrReconfigureBasinInput::new(basin.clone()); + if let Some(c) = config { + input = input.with_config(basin_reconfiguration_from_spec(c)); + } + match s2 + .create_or_reconfigure_basin(input) + .await + .map_err(|e| miette::miette!("failed to apply basin {:?}: {}", basin.as_ref(), e))? + { + CreateOrReconfigured::Created(_) => { + eprintln!("{}", format!(" basin {basin}").green().bold()); + } + CreateOrReconfigured::Reconfigured(_) => { + eprintln!( + "{}", + format!(" basin {basin} (reconfigured)").yellow().bold() + ); + } + } + Ok(()) +} + +async fn apply_stream( + s2: &S2, + basin: BasinName, + stream: StreamName, + config: Option, +) -> miette::Result<()> { + let mut input = CreateOrReconfigureStreamInput::new(stream.clone()); + if let Some(c) = config { + input = input.with_config(stream_reconfiguration_from_spec(c)); + } + let basin_client = s2.basin(basin.clone()); + match basin_client + .create_or_reconfigure_stream(input) + .await + .map_err(|e| { + miette::miette!( + "failed to apply stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + ) + })? { + CreateOrReconfigured::Created(_) => { + eprintln!("{}", format!(" stream {basin}/{stream}").green().bold()); + } + CreateOrReconfigured::Reconfigured(_) => { + eprintln!( + "{}", + format!(" stream {basin}/{stream} (reconfigured)") + .yellow() + .bold() + ); + } + } + Ok(()) +} + +enum ResourceAction { + Create, + Reconfigure(Vec), + Unchanged, +} + +struct FieldDiff { + field: &'static str, + old: String, + new: String, +} + +fn is_not_found_error(e: &S2Error) -> bool { + matches!(e, S2Error::Server(ErrorResponse { code, .. }) if code == "basin_not_found" || code == "stream_not_found") +} + +fn format_storage_class(sc: StorageClass) -> &'static str { + match sc { + StorageClass::Standard => "standard", + StorageClass::Express => "express", + } +} + +fn format_retention_policy(rp: RetentionPolicy) -> String { + match rp { + RetentionPolicy::Age(secs) => { + humantime::format_duration(Duration::from_secs(secs)).to_string() + } + RetentionPolicy::Infinite => "infinite".to_string(), + } +} + +fn format_timestamping_mode(m: TimestampingMode) -> &'static str { + match m { + TimestampingMode::ClientPrefer => "client-prefer", + TimestampingMode::ClientRequire => "client-require", + TimestampingMode::Arrival => "arrival", + } +} + +fn effective_storage_class(sc: Option) -> StorageClass { + sc.unwrap_or(StorageClass::Express) +} + +fn effective_retention_policy(rp: Option) -> RetentionPolicy { + rp.unwrap_or(RetentionPolicy::Age(7 * 24 * 60 * 60)) +} + +fn effective_timestamping_mode(ts: Option<&TimestampingConfig>) -> TimestampingMode { + ts.and_then(|cfg| cfg.mode) + .unwrap_or(TimestampingMode::ClientPrefer) +} + +fn effective_timestamping_uncapped(ts: Option<&TimestampingConfig>) -> bool { + ts.map(|cfg| cfg.uncapped).unwrap_or(false) +} + +fn effective_delete_on_empty_min_age_secs(doe: Option<&DeleteOnEmptyConfig>) -> u64 { + doe.map(|cfg| cfg.min_age_secs).unwrap_or(0) +} + +fn diff_basin_config(existing: &BasinConfig, spec: &BasinConfigSpec) -> Vec { + let mut diffs = Vec::new(); + + if let Some(v) = spec.create_stream_on_append + && existing.create_stream_on_append != v + { + diffs.push(FieldDiff { + field: "create_stream_on_append", + old: existing.create_stream_on_append.to_string(), + new: v.to_string(), + }); + } + + if let Some(v) = spec.create_stream_on_read + && existing.create_stream_on_read != v + { + diffs.push(FieldDiff { + field: "create_stream_on_read", + old: existing.create_stream_on_read.to_string(), + new: v.to_string(), + }); + } + + if let Some(ref spec_dsc) = spec.default_stream_config { + let existing_dsc = existing.default_stream_config.clone().unwrap_or_default(); + let stream_diffs = diff_stream_config(&existing_dsc, spec_dsc); + for sd in stream_diffs { + diffs.push(FieldDiff { + field: sd.field, + old: sd.old, + new: sd.new, + }); + } + } + + diffs +} + +fn diff_stream_config(existing: &StreamConfig, spec: &StreamConfigSpec) -> Vec { + let mut diffs = Vec::new(); + + if let Some(ref sc) = spec.storage_class { + let existing_sc = effective_storage_class(existing.storage_class); + let spec_sc = storage_class_from_spec(sc.clone()); + if existing_sc != spec_sc { + diffs.push(FieldDiff { + field: "storage_class", + old: format_storage_class(existing_sc).to_string(), + new: format_storage_class(spec_sc).to_string(), + }); + } + } + + if let Some(ref rp) = spec.retention_policy { + let existing_rp = effective_retention_policy(existing.retention_policy); + let spec_rp = retention_policy_from_spec(*rp); + if existing_rp != spec_rp { + diffs.push(FieldDiff { + field: "retention_policy", + old: format_retention_policy(existing_rp), + new: format_retention_policy(spec_rp), + }); + } + } + + if let Some(ref ts) = spec.timestamping { + let existing_ts = existing.timestamping.as_ref(); + if let Some(ref mode) = ts.mode { + let spec_mode = timestamping_mode_from_spec(mode.clone()); + if effective_timestamping_mode(existing_ts) != spec_mode { + diffs.push(FieldDiff { + field: "timestamping.mode", + old: format_timestamping_mode(effective_timestamping_mode(existing_ts)) + .to_string(), + new: format_timestamping_mode(spec_mode).to_string(), + }); + } + } + if let Some(uncapped) = ts.uncapped + && effective_timestamping_uncapped(existing_ts) != uncapped + { + diffs.push(FieldDiff { + field: "timestamping.uncapped", + old: effective_timestamping_uncapped(existing_ts).to_string(), + new: uncapped.to_string(), + }); + } + } + + if let Some(ref doe) = spec.delete_on_empty + && let Some(ref min_age) = doe.min_age + && effective_delete_on_empty_min_age_secs(existing.delete_on_empty.as_ref()) + != min_age.0.as_secs() + { + diffs.push(FieldDiff { + field: "delete_on_empty.min_age", + old: humantime::format_duration(Duration::from_secs( + effective_delete_on_empty_min_age_secs(existing.delete_on_empty.as_ref()), + )) + .to_string(), + new: humantime::format_duration(min_age.0).to_string(), + }); + } + + diffs +} + +fn spec_basin_fields(spec: &BasinConfigSpec) -> Vec { + let mut fields = Vec::new(); + + if let Some(v) = spec.create_stream_on_append { + fields.push(FieldDiff { + field: "create_stream_on_append", + old: String::new(), + new: v.to_string(), + }); + } + if let Some(v) = spec.create_stream_on_read { + fields.push(FieldDiff { + field: "create_stream_on_read", + old: String::new(), + new: v.to_string(), + }); + } + if let Some(ref dsc) = spec.default_stream_config { + for f in spec_stream_fields(dsc) { + fields.push(f); + } + } + + fields +} + +fn spec_stream_fields(spec: &StreamConfigSpec) -> Vec { + let mut fields = Vec::new(); + + if let Some(ref sc) = spec.storage_class { + fields.push(FieldDiff { + field: "storage_class", + old: String::new(), + new: format_storage_class(storage_class_from_spec(sc.clone())).to_string(), + }); + } + if let Some(ref rp) = spec.retention_policy { + fields.push(FieldDiff { + field: "retention_policy", + old: String::new(), + new: format_retention_policy(retention_policy_from_spec(*rp)), + }); + } + if let Some(ref ts) = spec.timestamping { + if let Some(ref mode) = ts.mode { + fields.push(FieldDiff { + field: "timestamping.mode", + old: String::new(), + new: format_timestamping_mode(timestamping_mode_from_spec(mode.clone())) + .to_string(), + }); + } + if let Some(uncapped) = ts.uncapped { + fields.push(FieldDiff { + field: "timestamping.uncapped", + old: String::new(), + new: uncapped.to_string(), + }); + } + } + if let Some(ref doe) = spec.delete_on_empty + && let Some(ref min_age) = doe.min_age + { + fields.push(FieldDiff { + field: "delete_on_empty.min_age", + old: String::new(), + new: humantime::format_duration(min_age.0).to_string(), + }); + } + + fields +} + +fn print_basin_result(basin: &str, action: &ResourceAction) { + match action { + ResourceAction::Create => { + println!("{}", format!("+ basin {basin}").green().bold()); + } + ResourceAction::Reconfigure(diffs) => { + println!("{}", format!("~ basin {basin}").yellow().bold()); + for diff in diffs { + println!(" {}: {} → {}", diff.field, diff.old.dimmed(), diff.new); + } + } + ResourceAction::Unchanged => { + println!("{}", format!("= basin {basin}").dimmed()); + } + } +} + +fn print_stream_result(basin: &str, stream: &str, action: &ResourceAction) { + match action { + ResourceAction::Create => { + println!("{}", format!(" + stream {basin}/{stream}").green().bold()); + } + ResourceAction::Reconfigure(diffs) => { + println!("{}", format!(" ~ stream {basin}/{stream}").yellow().bold()); + for diff in diffs { + println!(" {}: {} → {}", diff.field, diff.old.dimmed(), diff.new); + } + } + ResourceAction::Unchanged => { + println!("{}", format!(" = stream {basin}/{stream}").dimmed()); + } + } +} + +fn print_basin_create(basin: &str, spec: &Option) { + println!("{}", format!("+ basin {basin}").green().bold()); + if let Some(config) = spec { + for field in spec_basin_fields(config) { + println!(" {}: {}", field.field, field.new); + } + } +} + +fn print_stream_create(basin: &str, stream: &str, spec: &Option) { + println!("{}", format!(" + stream {basin}/{stream}").green().bold()); + if let Some(config) = spec { + for field in spec_stream_fields(config) { + println!(" {}: {}", field.field, field.new); + } + } +} + +pub async fn dry_run(s2: &S2, spec: ResourcesSpec) -> miette::Result<()> { + validate(&spec)?; + + for basin_spec in spec.basins { + let basin: BasinName = basin_spec + .name + .parse() + .map_err(|e| miette::miette!("invalid basin name {:?}: {}", basin_spec.name, e))?; + + let basin_action = match s2.get_basin_config(basin.clone()).await { + Ok(existing) => { + if let Some(ref config) = basin_spec.config { + let diffs = diff_basin_config(&existing, config); + if diffs.is_empty() { + ResourceAction::Unchanged + } else { + ResourceAction::Reconfigure(diffs) + } + } else { + ResourceAction::Unchanged + } + } + Err(e) if is_not_found_error(&e) => ResourceAction::Create, + Err(e) => { + return Err(miette::miette!( + "failed to check basin {:?}: {}", + basin.as_ref(), + e + )); + } + }; + + match &basin_action { + ResourceAction::Create => { + print_basin_create(basin.as_ref(), &basin_spec.config); + } + action => { + print_basin_result(basin.as_ref(), action); + } + } + + let basin_client = s2.basin(basin.clone()); + + for stream_spec in basin_spec.streams { + let stream: StreamName = stream_spec.name.parse().map_err(|e| { + miette::miette!("invalid stream name {:?}: {}", stream_spec.name, e) + })?; + + let stream_action = match basin_client.get_stream_config(stream.clone()).await { + Ok(existing) => { + if let Some(ref config) = stream_spec.config { + let diffs = diff_stream_config(&existing, config); + if diffs.is_empty() { + ResourceAction::Unchanged + } else { + ResourceAction::Reconfigure(diffs) + } + } else { + ResourceAction::Unchanged + } + } + Err(e) if is_not_found_error(&e) => ResourceAction::Create, + Err(e) => { + return Err(miette::miette!( + "failed to check stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + )); + } + }; + + match &stream_action { + ResourceAction::Create => { + print_stream_create(basin.as_ref(), stream.as_ref(), &stream_spec.config); + } + action => { + print_stream_result(basin.as_ref(), stream.as_ref(), action); + } + } + } + } + Ok(()) +} diff --git a/cli/src/cli.rs b/cli/src/cli.rs index fa2a49c..236ef2f 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -1,4 +1,4 @@ -use std::num::NonZeroU64; +use std::{num::NonZeroU64, path::PathBuf}; use clap::{Args, Parser, Subcommand, builder::styling}; use s2_sdk::types::{ @@ -161,6 +161,28 @@ pub enum Command { /// Benchmark a stream to measure throughput and latency. Bench(BenchArgs), + /// Apply a declarative spec file, creating or reconfiguring basins and streams. + /// + /// Reads a JSON file and ensures the declared basins and streams exist with the + /// specified configuration. Basins and streams that already exist + /// are reconfigured to match the spec. Only the fields present in the spec are + /// updated. + /// + /// Dry-run output legend: + /// `+` create + /// `~` reconfigure + /// `=` unchanged + /// + /// For IDE validation/autocomplete, add `$schema` at the top of each spec file: + /// {"$schema":"https://raw.githubusercontent.com/s2-streamstore/s2/main/cli/schema.json","basins":[]} + /// + /// For local-only use, point to a local path/URI instead: + /// {"$schema":"./cli/schema.json","basins":[]} + /// + /// Example spec file: + /// {"$schema":"https://raw.githubusercontent.com/s2-streamstore/s2/main/cli/schema.json","basins":[{"name":"my-basin","streams":[{"name":"events"}]}]} + Apply(ApplyArgs), + /// Run S2 Lite server backed by object storage. /// /// Starts a lightweight S2-compatible server that can be backed by @@ -519,6 +541,29 @@ pub struct TailArgs { pub output: RecordsOut, } +#[derive(Args, Debug)] +pub struct ApplyArgs { + /// Path to a JSON spec file defining basins and streams to create or reconfigure. + #[arg( + short = 'f', + long, + value_name = "FILE", + required_unless_present = "schema" + )] + pub file: Option, + /// Preview changes without making any mutations. + /// + /// Dry-run output legend: + /// `+` create + /// `~` reconfigure + /// `=` unchanged + #[arg(long)] + pub dry_run: bool, + /// Print the JSON Schema for the spec file format to stdout. + #[arg(long)] + pub schema: bool, +} + #[derive(Args, Debug)] pub struct BenchArgs { /// Name of the basin to use for the test. diff --git a/cli/src/error.rs b/cli/src/error.rs index 927abca..c217a12 100644 --- a/cli/src/error.rs +++ b/cli/src/error.rs @@ -61,6 +61,10 @@ pub enum CliError { #[error("S2 Lite server error: {0}")] #[diagnostic(help("{}", HELP))] LiteServer(String), + + #[error("Apply failed: {0}")] + #[diagnostic(help("{}", HELP))] + Apply(String), } impl CliError { diff --git a/cli/src/main.rs b/cli/src/main.rs index 28bd79e..15d79be 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,5 +1,6 @@ //! S2 command-line interface. +mod apply; mod bench; mod cli; mod config; @@ -16,7 +17,7 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; use std::{pin::Pin, time::Duration}; use clap::{CommandFactory, Parser}; -use cli::{Cli, Command, ConfigCommand, ListBasinsArgs, ListStreamsArgs}; +use cli::{ApplyArgs, Cli, Command, ConfigCommand, ListBasinsArgs, ListStreamsArgs}; use colored::Colorize; use config::{ ConfigKey, load_cli_config, load_config_file, sdk_config, set_config_value, unset_config_value, @@ -139,6 +140,15 @@ async fn run() -> Result<(), CliError> { return Ok(()); } + if let Command::Apply(ApplyArgs { schema: true, .. }) = &command { + let schema = s2_lite::init::json_schema(); + println!( + "{}", + serde_json::to_string_pretty(&schema).expect("valid schema") + ); + return Ok(()); + } + let cli_config = load_cli_config()?; let sdk_config = sdk_config(&cli_config)?; let s2 = S2::new(sdk_config.clone()).map_err(CliError::SdkInit)?; @@ -525,6 +535,25 @@ async fn run() -> Result<(), CliError> { } } + Command::Apply(ApplyArgs { + file, + dry_run, + schema: _, + }) => { + let file = file.expect("--file is required when --schema is not set"); + let spec = apply::load(&file).map_err(CliError::InvalidArgs)?; + if dry_run { + apply::dry_run(&s2, spec) + .await + .map_err(|e| CliError::Apply(e.to_string()))?; + } else { + apply::apply(&s2, spec) + .await + .map_err(|e| CliError::Apply(e.to_string()))?; + eprintln!("{}", "✓ Done".green().bold()); + } + } + Command::Bench(args) => { let basin_name = args.basin.0.clone(); let stream_name: StreamName = format!("bench/{}", uuid::Uuid::new_v4()) diff --git a/cli/src/tui/ui.rs b/cli/src/tui/ui.rs index efe6db7..4b11223 100644 --- a/cli/src/tui/ui.rs +++ b/cli/src/tui/ui.rs @@ -3771,7 +3771,10 @@ fn draw_append_view(f: &mut Frame, area: Rect, state: &AppendViewState) { // Show progress if appending from file if let Some((done, total)) = state.file_append_progress { - let pct = if total > 0 { (done * 100) / total } else { 0 }; + let pct = done + .checked_mul(100) + .and_then(|v| v.checked_div(total)) + .unwrap_or(0); lines.push(Line::from(vec![ Span::styled(" ", Style::default()), Span::styled( diff --git a/lite/Cargo.toml b/lite/Cargo.toml index ecd5164..bea93b0 100644 --- a/lite/Cargo.toml +++ b/lite/Cargo.toml @@ -29,13 +29,14 @@ axum-server = { workspace = true, features = ["tls-rustls"] } blake3 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } -clap = { workspace = true, features = ["derive"] } +clap = { workspace = true, features = ["derive", "env"] } compact_str = { workspace = true } dashmap = { workspace = true } enum-ordinalize = { workspace = true } eyre = { workspace = true } futures = { workspace = true } http = { workspace = true } +humantime = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } mimalloc = { workspace = true } @@ -47,6 +48,7 @@ rcgen = { workspace = true } rustls = { workspace = true, features = ["aws-lc-rs"] } s2-api = { workspace = true, features = ["axum"] } s2-common = { workspace = true, features = ["clap"] } +schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } slatedb = { workspace = true } diff --git a/lite/src/backend/basins.rs b/lite/src/backend/basins.rs index b4bdd9b..2bea1bf 100644 --- a/lite/src/backend/basins.rs +++ b/lite/src/backend/basins.rs @@ -75,22 +75,23 @@ impl Backend { pub async fn create_basin( &self, basin: BasinName, - config: BasinConfig, + config: impl Into, mode: CreateMode, ) -> Result, CreateBasinError> { + let config = config.into(); let meta_key = kv::basin_meta::ser_key(&basin); let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?; let new_creation_idempotency_key = match &mode { CreateMode::CreateOnly(Some(req_token)) => { - Some(creation_idempotency_key(req_token, &config)) + let resolved = BasinConfig::default().reconfigure(config.clone()); + Some(creation_idempotency_key(req_token, &resolved)) } _ => None, }; - let mut existing_created_at = None; - let mut existing_creation_idempotency_key = None; + let mut existing_meta_opt = None; if let Some(existing_meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? { @@ -112,18 +113,27 @@ impl Backend { }; } CreateMode::CreateOrReconfigure => { - existing_created_at = Some(existing_meta.created_at); - existing_creation_idempotency_key = existing_meta.creation_idempotency_key; + existing_meta_opt = Some(existing_meta); } } } - let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc); - let creation_idempotency_key = - existing_creation_idempotency_key.or(new_creation_idempotency_key); + let is_reconfigure = existing_meta_opt.is_some(); + let (resolved, created_at, creation_idempotency_key) = match existing_meta_opt { + Some(existing) => ( + existing.config.reconfigure(config), + existing.created_at, + existing.creation_idempotency_key, + ), + None => ( + BasinConfig::default().reconfigure(config), + OffsetDateTime::now_utc(), + new_creation_idempotency_key, + ), + }; let meta = kv::basin_meta::BasinMeta { - config, + config: resolved, created_at, deleted_at: None, creation_idempotency_key, @@ -142,7 +152,7 @@ impl Backend { state: BasinState::Active, }; - Ok(if existing_created_at.is_some() { + Ok(if is_reconfigure { CreatedOrReconfigured::Reconfigured(info) } else { CreatedOrReconfigured::Created(info) diff --git a/lite/src/backend/read.rs b/lite/src/backend/read.rs index af14837..fd19be4 100644 --- a/lite/src/backend/read.rs +++ b/lite/src/backend/read.rs @@ -349,7 +349,7 @@ mod tests { read_extent::{ReadLimit, ReadUntil}, types::{ basin::BasinName, - config::OptionalStreamConfig, + config::{BasinConfig, OptionalStreamConfig}, resources::CreateMode, stream::{ AppendInput, AppendRecordBatch, AppendRecordParts, ReadEnd, ReadFrom, ReadStart, @@ -424,7 +424,7 @@ mod tests { backend .create_basin( basin.clone(), - Default::default(), + BasinConfig::default(), CreateMode::CreateOnly(None), ) .await diff --git a/lite/src/backend/streams.rs b/lite/src/backend/streams.rs index 81476cc..c131b50 100644 --- a/lite/src/backend/streams.rs +++ b/lite/src/backend/streams.rs @@ -77,9 +77,10 @@ impl Backend { &self, basin: BasinName, stream: StreamName, - mut config: OptionalStreamConfig, + config: impl Into, mode: CreateMode, ) -> Result, CreateStreamError> { + let config = config.into(); let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?; let Some(basin_meta) = db_txn_get( @@ -100,12 +101,13 @@ impl Backend { let creation_idempotency_key = match &mode { CreateMode::CreateOnly(Some(req_token)) => { - Some(creation_idempotency_key(req_token, &config)) + let resolved = OptionalStreamConfig::default().reconfigure(config.clone()); + Some(creation_idempotency_key(req_token, &resolved)) } _ => None, }; - let mut existing_created_at = None; + let mut existing_meta_opt = None; let mut prior_doe_min_age = None; if let Some(existing_meta) = @@ -136,16 +138,25 @@ impl Backend { }; } CreateMode::CreateOrReconfigure => { - existing_created_at = Some(existing_meta.created_at); + existing_meta_opt = Some(existing_meta); } } } - config = config.merge(basin_meta.config.default_stream_config).into(); + let is_reconfigure = existing_meta_opt.is_some(); + let (resolved, created_at) = match existing_meta_opt { + Some(existing) => (existing.config.reconfigure(config), existing.created_at), + None => ( + OptionalStreamConfig::default().reconfigure(config), + OffsetDateTime::now_utc(), + ), + }; + let resolved: OptionalStreamConfig = resolved + .merge(basin_meta.config.default_stream_config) + .into(); - let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc); let meta = kv::stream_meta::StreamMeta { - config: config.clone(), + config: resolved.clone(), created_at, deleted_at: None, creation_idempotency_key, @@ -153,7 +164,7 @@ impl Backend { txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?; let stream_id = StreamId::new(&basin, &stream); - if existing_created_at.is_none() { + if !is_reconfigure { txn.put( kv::stream_id_mapping::ser_key(stream_id), kv::stream_id_mapping::ser_value(&basin, &stream), @@ -195,10 +206,8 @@ impl Backend { }; txn.commit_with_options(&WRITE_OPTS).await?; - if existing_created_at.is_some() - && let Some(client) = self.streamer_client_if_active(&basin, &stream) - { - client.advise_reconfig(config); + if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) { + client.advise_reconfig(resolved); } let info = StreamInfo { @@ -207,7 +216,7 @@ impl Backend { deleted_at: None, }; - Ok(if existing_created_at.is_some() { + Ok(if is_reconfigure { CreatedOrReconfigured::Reconfigured(info) } else { CreatedOrReconfigured::Created(info) diff --git a/lite/src/handlers/v1/basins.rs b/lite/src/handlers/v1/basins.rs index a5b8d2b..56eaf0b 100644 --- a/lite/src/handlers/v1/basins.rs +++ b/lite/src/handlers/v1/basins.rs @@ -161,7 +161,7 @@ pub async fn create_or_reconfigure_basin( request: JsonOpt(request), }: CreateOrReconfigureArgs, ) -> Result<(StatusCode, Json), ServiceError> { - let config: BasinConfig = request + let config: BasinReconfiguration = request .and_then(|req| req.config) .map(TryInto::try_into) .transpose()? diff --git a/lite/src/handlers/v1/streams.rs b/lite/src/handlers/v1/streams.rs index f11f186..13bee62 100644 --- a/lite/src/handlers/v1/streams.rs +++ b/lite/src/handlers/v1/streams.rs @@ -180,7 +180,7 @@ pub struct CreateOrReconfigureArgs { basin: BasinName, #[from_request(via(Path))] stream: StreamName, - config: JsonOpt, + config: JsonOpt, } /// Create or reconfigure a stream. @@ -188,7 +188,7 @@ pub struct CreateOrReconfigureArgs { put, path = super::paths::streams::CREATE_OR_RECONFIGURE, tag = super::paths::streams::TAG, - request_body = Option, + request_body = Option, params(v1t::StreamNamePathSegment), responses( (status = StatusCode::OK, body = v1t::stream::StreamInfo), @@ -215,7 +215,7 @@ pub async fn create_or_reconfigure_stream( config: JsonOpt(config), }: CreateOrReconfigureArgs, ) -> Result<(StatusCode, Json), ServiceError> { - let config: OptionalStreamConfig = config + let config: StreamReconfiguration = config .map(TryInto::try_into) .transpose()? .unwrap_or_default(); diff --git a/lite/src/init.rs b/lite/src/init.rs new file mode 100644 index 0000000..b5b8d1c --- /dev/null +++ b/lite/src/init.rs @@ -0,0 +1,668 @@ +//! Declarative basin/stream initialization from a JSON spec file. +//! +//! Loaded at startup when `--init-file` / `S2LITE_INIT_FILE` is set. + +use std::{path::Path, time::Duration}; + +use s2_common::{ + maybe::Maybe, + types::{ + basin::BasinName, + config::{ + BasinReconfiguration, DeleteOnEmptyReconfiguration, RetentionPolicy, StorageClass, + StreamReconfiguration, TimestampingMode, TimestampingReconfiguration, + }, + resources::CreateMode, + stream::StreamName, + }, +}; +use serde::{Deserialize, Serialize}; +use tracing::info; + +use crate::backend::Backend; + +#[derive(Debug, Deserialize, Default, schemars::JsonSchema)] +pub struct ResourcesSpec { + #[serde(default)] + pub basins: Vec, +} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct BasinSpec { + pub name: String, + #[serde(default)] + pub config: Option, + #[serde(default)] + pub streams: Vec, +} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct StreamSpec { + pub name: String, + #[serde(default)] + pub config: Option, +} + +#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct BasinConfigSpec { + #[serde(default)] + pub default_stream_config: Option, + /// Create stream on append if it doesn't exist, using the default stream configuration. + #[serde(default)] + pub create_stream_on_append: Option, + /// Create stream on read if it doesn't exist, using the default stream configuration. + #[serde(default)] + pub create_stream_on_read: Option, +} + +#[derive(Debug, Clone, Deserialize, Default, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct StreamConfigSpec { + /// Storage class for recent writes. + #[serde(default)] + pub storage_class: Option, + /// Retention policy for the stream. If unspecified, the default is to retain records for 7 + /// days. + #[serde(default)] + pub retention_policy: Option, + /// Timestamping behavior. + #[serde(default)] + pub timestamping: Option, + /// Delete-on-empty configuration. + #[serde(default)] + pub delete_on_empty: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum StorageClassSpec { + Standard, + Express, +} + +impl schemars::JsonSchema for StorageClassSpec { + fn schema_name() -> String { + "StorageClassSpec".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some("Storage class for recent writes.".to_string()), + ..Default::default() + })), + enum_values: Some(vec![ + serde_json::Value::String("standard".to_string()), + serde_json::Value::String("express".to_string()), + ]), + ..Default::default() + }) + } +} + +impl From for StorageClass { + fn from(s: StorageClassSpec) -> Self { + match s { + StorageClassSpec::Standard => StorageClass::Standard, + StorageClassSpec::Express => StorageClass::Express, + } + } +} + +/// Accepts `"infinite"` or a humantime duration string such as `"7d"`, `"1w"`. +#[derive(Debug, Clone, Copy)] +pub struct RetentionPolicySpec(pub RetentionPolicy); + +impl RetentionPolicySpec { + pub fn age_secs(self) -> Option { + self.0.age().map(|d| d.as_secs()) + } +} + +impl TryFrom for RetentionPolicySpec { + type Error = String; + + fn try_from(s: String) -> Result { + if s.eq_ignore_ascii_case("infinite") { + return Ok(RetentionPolicySpec(RetentionPolicy::Infinite())); + } + let d = humantime::parse_duration(&s) + .map_err(|e| format!("invalid retention_policy {:?}: {}", s, e))?; + Ok(RetentionPolicySpec(RetentionPolicy::Age(d))) + } +} + +impl<'de> Deserialize<'de> for RetentionPolicySpec { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + RetentionPolicySpec::try_from(s).map_err(serde::de::Error::custom) + } +} + +impl schemars::JsonSchema for RetentionPolicySpec { + fn schema_name() -> String { + "RetentionPolicySpec".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some( + "Retain records unless explicitly trimmed (\"infinite\"), or automatically \ + trim records older than the given duration (e.g. \"7days\", \"1week\")." + .to_string(), + ), + examples: vec![ + serde_json::Value::String("infinite".to_string()), + serde_json::Value::String("7days".to_string()), + serde_json::Value::String("1week".to_string()), + ], + ..Default::default() + })), + ..Default::default() + }) + } +} + +#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct TimestampingSpec { + /// Timestamping mode for appends that influences how timestamps are handled. + #[serde(default)] + pub mode: Option, + /// Allow client-specified timestamps to exceed the arrival time. + /// If this is `false` or not set, client timestamps will be capped at the arrival time. + #[serde(default)] + pub uncapped: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum TimestampingModeSpec { + ClientPrefer, + ClientRequire, + Arrival, +} + +impl schemars::JsonSchema for TimestampingModeSpec { + fn schema_name() -> String { + "TimestampingModeSpec".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some( + "Timestamping mode for appends that influences how timestamps are handled." + .to_string(), + ), + ..Default::default() + })), + enum_values: Some(vec![ + serde_json::Value::String("client-prefer".to_string()), + serde_json::Value::String("client-require".to_string()), + serde_json::Value::String("arrival".to_string()), + ]), + ..Default::default() + }) + } +} + +impl From for TimestampingMode { + fn from(m: TimestampingModeSpec) -> Self { + match m { + TimestampingModeSpec::ClientPrefer => TimestampingMode::ClientPrefer, + TimestampingModeSpec::ClientRequire => TimestampingMode::ClientRequire, + TimestampingModeSpec::Arrival => TimestampingMode::Arrival, + } + } +} + +#[derive(Debug, Clone, Deserialize, schemars::JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct DeleteOnEmptySpec { + /// Minimum age before an empty stream can be deleted. + /// Set to 0 (default) to disable delete-on-empty (don't delete automatically). + #[serde(default)] + pub min_age: Option, +} + +/// A `std::time::Duration` deserialized from a humantime string (e.g. `"1d"`, `"2h 30m"`). +#[derive(Debug, Clone, Copy)] +pub struct HumanDuration(pub Duration); + +impl TryFrom for HumanDuration { + type Error = String; + + fn try_from(s: String) -> Result { + humantime::parse_duration(&s) + .map(HumanDuration) + .map_err(|e| format!("invalid duration {:?}: {}", s, e)) + } +} + +impl<'de> Deserialize<'de> for HumanDuration { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + HumanDuration::try_from(s).map_err(serde::de::Error::custom) + } +} + +impl schemars::JsonSchema for HumanDuration { + fn schema_name() -> String { + "HumanDuration".to_string() + } + + fn json_schema(_: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema { + schemars::schema::Schema::Object(schemars::schema::SchemaObject { + instance_type: Some(schemars::schema::InstanceType::String.into()), + metadata: Some(Box::new(schemars::schema::Metadata { + description: Some( + "A duration string in humantime format, e.g. \"1day\", \"2h 30m\"".to_string(), + ), + examples: vec![ + serde_json::Value::String("1day".to_string()), + serde_json::Value::String("2h 30m".to_string()), + ], + ..Default::default() + })), + ..Default::default() + }) + } +} + +impl From for BasinReconfiguration { + fn from(s: BasinConfigSpec) -> Self { + BasinReconfiguration { + default_stream_config: s + .default_stream_config + .map(|dsc| Some(StreamReconfiguration::from(dsc))) + .map_or(Maybe::Unspecified, Maybe::Specified), + create_stream_on_append: s + .create_stream_on_append + .map_or(Maybe::Unspecified, Maybe::Specified), + create_stream_on_read: s + .create_stream_on_read + .map_or(Maybe::Unspecified, Maybe::Specified), + } + } +} + +impl From for StreamReconfiguration { + fn from(s: StreamConfigSpec) -> Self { + StreamReconfiguration { + storage_class: s + .storage_class + .map(|sc| Some(StorageClass::from(sc))) + .map_or(Maybe::Unspecified, Maybe::Specified), + retention_policy: s + .retention_policy + .map(|rp| Some(rp.0)) + .map_or(Maybe::Unspecified, Maybe::Specified), + timestamping: s + .timestamping + .map(|ts| { + Some(TimestampingReconfiguration { + mode: ts + .mode + .map(|m| Some(TimestampingMode::from(m))) + .map_or(Maybe::Unspecified, Maybe::Specified), + uncapped: ts + .uncapped + .map(Some) + .map_or(Maybe::Unspecified, Maybe::Specified), + }) + }) + .map_or(Maybe::Unspecified, Maybe::Specified), + delete_on_empty: s + .delete_on_empty + .map(|doe| { + Some(DeleteOnEmptyReconfiguration { + min_age: doe + .min_age + .map(|h| Some(h.0)) + .map_or(Maybe::Unspecified, Maybe::Specified), + }) + }) + .map_or(Maybe::Unspecified, Maybe::Specified), + } + } +} + +pub fn json_schema() -> serde_json::Value { + serde_json::to_value(schemars::schema_for!(ResourcesSpec)).unwrap() +} + +pub fn validate(spec: &ResourcesSpec) -> eyre::Result<()> { + let mut errors = Vec::new(); + let mut seen_basins = std::collections::HashSet::new(); + + for basin_spec in &spec.basins { + if !seen_basins.insert(basin_spec.name.clone()) { + errors.push(format!("duplicate basin name {:?}", basin_spec.name)); + } + + if let Err(e) = basin_spec.name.parse::() { + errors.push(format!("invalid basin name {:?}: {}", basin_spec.name, e)); + continue; + } + + let mut seen_streams = std::collections::HashSet::new(); + for stream_spec in &basin_spec.streams { + if !seen_streams.insert(stream_spec.name.clone()) { + errors.push(format!( + "duplicate stream name {:?} in basin {:?}", + stream_spec.name, basin_spec.name + )); + } + if let Err(e) = stream_spec.name.parse::() { + errors.push(format!( + "invalid stream name {:?} in basin {:?}: {}", + stream_spec.name, basin_spec.name, e + )); + } + } + } + + if errors.is_empty() { + Ok(()) + } else { + Err(eyre::eyre!("{}", errors.join("\n"))) + } +} + +pub fn load(path: &Path) -> eyre::Result { + let contents = std::fs::read_to_string(path) + .map_err(|e| eyre::eyre!("failed to read init file {:?}: {}", path, e))?; + let spec: ResourcesSpec = serde_json::from_str(&contents) + .map_err(|e| eyre::eyre!("failed to parse init file {:?}: {}", path, e))?; + Ok(spec) +} + +pub async fn apply(backend: &Backend, spec: ResourcesSpec) -> eyre::Result<()> { + validate(&spec)?; + + for basin_spec in spec.basins { + let basin: BasinName = basin_spec + .name + .parse() + .map_err(|e| eyre::eyre!("invalid basin name {:?}: {}", basin_spec.name, e))?; + + let reconfiguration = basin_spec + .config + .map(BasinReconfiguration::from) + .unwrap_or_default(); + + backend + .create_basin( + basin.clone(), + reconfiguration, + CreateMode::CreateOrReconfigure, + ) + .await + .map_err(|e| eyre::eyre!("failed to apply basin {:?}: {}", basin.as_ref(), e))?; + + info!(basin = basin.as_ref(), "basin applied"); + + for stream_spec in basin_spec.streams { + let stream: StreamName = stream_spec + .name + .parse() + .map_err(|e| eyre::eyre!("invalid stream name {:?}: {}", stream_spec.name, e))?; + + let reconfiguration = stream_spec + .config + .map(StreamReconfiguration::from) + .unwrap_or_default(); + + backend + .create_stream( + basin.clone(), + stream.clone(), + reconfiguration, + CreateMode::CreateOrReconfigure, + ) + .await + .map_err(|e| { + eyre::eyre!( + "failed to apply stream {:?}/{:?}: {}", + basin.as_ref(), + stream.as_ref(), + e + ) + })?; + + info!( + basin = basin.as_ref(), + stream = stream.as_ref(), + "stream applied" + ); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_spec(json: &str) -> ResourcesSpec { + serde_json::from_str(json).expect("valid JSON") + } + + #[test] + fn empty_spec() { + let spec = parse_spec("{}"); + assert!(spec.basins.is_empty()); + } + + #[test] + fn basin_no_config() { + let spec = parse_spec(r#"{"basins":[{"name":"my-basin"}]}"#); + assert_eq!(spec.basins.len(), 1); + assert_eq!(spec.basins[0].name, "my-basin"); + assert!(spec.basins[0].config.is_none()); + assert!(spec.basins[0].streams.is_empty()); + } + + #[test] + fn retention_policy_infinite() { + let rp: RetentionPolicySpec = serde_json::from_str(r#""infinite""#).expect("deserialize"); + assert!(matches!(rp.0, RetentionPolicy::Infinite())); + } + + #[test] + fn retention_policy_duration() { + let rp: RetentionPolicySpec = serde_json::from_str(r#""7days""#).expect("deserialize"); + assert!(matches!(rp.0, RetentionPolicy::Age(_))); + if let RetentionPolicy::Age(d) = rp.0 { + assert_eq!(d, Duration::from_secs(7 * 24 * 3600)); + } + } + + #[test] + fn retention_policy_invalid() { + let err = serde_json::from_str::(r#""not-a-duration""#); + assert!(err.is_err()); + } + + #[test] + fn human_duration() { + let hd: HumanDuration = serde_json::from_str(r#""1day""#).expect("deserialize"); + assert_eq!(hd.0, Duration::from_secs(86400)); + } + + #[test] + fn full_spec_roundtrip() { + let json = r#" + { + "basins": [ + { + "name": "my-basin", + "config": { + "create_stream_on_append": true, + "create_stream_on_read": false, + "default_stream_config": { + "storage_class": "express", + "retention_policy": "7days", + "timestamping": { + "mode": "client-prefer", + "uncapped": false + }, + "delete_on_empty": { + "min_age": "1day" + } + } + }, + "streams": [ + { + "name": "events", + "config": { + "storage_class": "standard", + "retention_policy": "infinite" + } + } + ] + } + ] + }"#; + + let spec = parse_spec(json); + assert_eq!(spec.basins.len(), 1); + let basin = &spec.basins[0]; + assert_eq!(basin.name, "my-basin"); + + let config = basin.config.as_ref().unwrap(); + assert_eq!(config.create_stream_on_append, Some(true)); + assert_eq!(config.create_stream_on_read, Some(false)); + + let dsc = config.default_stream_config.as_ref().unwrap(); + assert!(matches!(dsc.storage_class, Some(StorageClassSpec::Express))); + assert!(matches!( + dsc.retention_policy.as_ref().map(|r| &r.0), + Some(RetentionPolicy::Age(_)) + )); + + let ts = dsc.timestamping.as_ref().unwrap(); + assert!(matches!(ts.mode, Some(TimestampingModeSpec::ClientPrefer))); + assert_eq!(ts.uncapped, Some(false)); + + let doe = dsc.delete_on_empty.as_ref().unwrap(); + assert_eq!( + doe.min_age.as_ref().map(|h| h.0), + Some(Duration::from_secs(86400)) + ); + + assert_eq!(basin.streams.len(), 1); + let stream = &basin.streams[0]; + assert_eq!(stream.name, "events"); + let sc = stream.config.as_ref().unwrap(); + assert!(matches!(sc.storage_class, Some(StorageClassSpec::Standard))); + assert!(matches!( + sc.retention_policy.as_ref().map(|r| &r.0), + Some(RetentionPolicy::Infinite()) + )); + } + + #[test] + fn basin_config_conversion() { + let spec = BasinConfigSpec { + default_stream_config: None, + create_stream_on_append: Some(true), + create_stream_on_read: None, + }; + let reconfig = BasinReconfiguration::from(spec); + assert!(matches!( + reconfig.create_stream_on_append, + Maybe::Specified(true) + )); + assert!(matches!(reconfig.create_stream_on_read, Maybe::Unspecified)); + assert!(matches!(reconfig.default_stream_config, Maybe::Unspecified)); + } + + #[test] + fn validate_valid_spec() { + let spec = parse_spec( + r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"logs"}]}]}"#, + ); + assert!(validate(&spec).is_ok()); + } + + #[test] + fn validate_invalid_basin_name() { + let spec = parse_spec(r#"{"basins":[{"name":"INVALID_BASIN"}]}"#); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("invalid basin name")); + } + + #[test] + fn validate_invalid_stream_name() { + let spec = parse_spec(r#"{"basins":[{"name":"my-basin","streams":[{"name":""}]}]}"#); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("invalid stream name")); + } + + #[test] + fn validate_duplicate_basin_names() { + let spec = parse_spec(r#"{"basins":[{"name":"my-basin"},{"name":"my-basin"}]}"#); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("duplicate basin name")); + } + + #[test] + fn validate_duplicate_stream_names() { + let spec = parse_spec( + r#"{"basins":[{"name":"my-basin","streams":[{"name":"events"},{"name":"events"}]}]}"#, + ); + let err = validate(&spec).unwrap_err(); + assert!(err.to_string().contains("duplicate stream name")); + } + + #[test] + fn validate_multiple_errors() { + let spec = parse_spec(r#"{"basins":[{"name":"INVALID"},{"name":"INVALID"}]}"#); + let err = validate(&spec).unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("invalid basin name")); + assert!(msg.contains("duplicate basin name")); + } + + #[test] + fn json_schema_is_valid() { + let schema = json_schema(); + assert!(schema.is_object()); + let schema_obj = schema.as_object().unwrap(); + // Should have at minimum a definitions/properties structure + assert!( + schema_obj.contains_key("definitions") || schema_obj.contains_key("properties"), + "schema should have definitions or properties" + ); + } + + #[test] + fn stream_config_conversion() { + let spec = StreamConfigSpec { + storage_class: Some(StorageClassSpec::Standard), + retention_policy: Some(RetentionPolicySpec(RetentionPolicy::Infinite())), + timestamping: None, + delete_on_empty: None, + }; + let reconfig = StreamReconfiguration::from(spec); + assert!(matches!( + reconfig.storage_class, + Maybe::Specified(Some(StorageClass::Standard)) + )); + assert!(matches!( + reconfig.retention_policy, + Maybe::Specified(Some(RetentionPolicy::Infinite())) + )); + assert!(matches!(reconfig.timestamping, Maybe::Unspecified)); + assert!(matches!(reconfig.delete_on_empty, Maybe::Unspecified)); + } +} diff --git a/lite/src/lib.rs b/lite/src/lib.rs index 1935ea4..60eb0ec 100644 --- a/lite/src/lib.rs +++ b/lite/src/lib.rs @@ -2,5 +2,6 @@ pub mod backend; pub mod handlers; +pub mod init; pub mod metrics; pub mod server; diff --git a/lite/src/server.rs b/lite/src/server.rs index 2459ee7..afef0f2 100644 --- a/lite/src/server.rs +++ b/lite/src/server.rs @@ -15,7 +15,7 @@ use tower_http::{ }; use tracing::info; -use crate::{backend::Backend, handlers}; +use crate::{backend::Backend, handlers, init}; #[derive(clap::Args, Debug, Clone)] pub struct TlsConfig { @@ -68,6 +68,13 @@ pub struct LiteArgs { /// should be denied at the HTTP layer. #[arg(long)] pub no_cors: bool, + + /// Path to a JSON file defining basins and streams to create at startup. + /// + /// Uses create-or-reconfigure semantics, so it is safe to run on repeated + /// restarts. Can also be set via S2LITE_INIT_FILE environment variable. + #[arg(long, env = "S2LITE_INIT_FILE")] + pub init_file: Option, } #[derive(Debug, Clone)] @@ -145,6 +152,11 @@ pub async fn run(args: LiteArgs) -> eyre::Result<()> { let backend = Backend::new(db, append_inflight_max); crate::backend::bgtasks::spawn(&backend); + if let Some(init_file) = &args.init_file { + let spec = init::load(init_file)?; + init::apply(&backend, spec).await?; + } + let mut app = handlers::router().with_state(backend).layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::new().level(tracing::Level::INFO)) diff --git a/sdk/src/api.rs b/sdk/src/api.rs index 34939a2..778a16b 100644 --- a/sdk/src/api.rs +++ b/sdk/src/api.rs @@ -1,39 +1,48 @@ -use crate::client::{self, StreamingResponse, UnaryResponse}; -use crate::retry::{RetryBackoff, RetryBackoffBuilder}; -use crate::types::{ - AccessTokenId, BasinAuthority, BasinName, Compression, RetryConfig, S2Config, S2Endpoints, - StreamName, -}; +use std::{ops::Deref, pin::Pin, sync::Arc, time::Duration}; + use async_stream::try_stream; use async_trait::async_trait; use bytes::BytesMut; use futures::{Stream, StreamExt}; -use http::header::InvalidHeaderValue; -use http::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE}; -use http::{HeaderMap, HeaderValue, StatusCode}; -use prost::{self, Message}; -use s2_api::v1::access::{ - AccessTokenInfo, IssueAccessTokenResponse, ListAccessTokensRequest, ListAccessTokensResponse, -}; -use s2_api::v1::basin::{BasinInfo, CreateBasinRequest, ListBasinsRequest, ListBasinsResponse}; -use s2_api::v1::config::{BasinConfig, BasinReconfiguration, StreamConfig, StreamReconfiguration}; -use s2_api::v1::metrics::{ - AccountMetricSetRequest, BasinMetricSetRequest, MetricSetResponse, StreamMetricSetRequest, +use http::{ + HeaderMap, HeaderValue, StatusCode, + header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, InvalidHeaderValue}, }; -use s2_api::v1::stream::s2s::{self, FrameDecoder, SessionMessage, TerminalMessage}; -use s2_api::v1::stream::{ - AppendConditionFailed, CreateStreamRequest, ListStreamsRequest, ListStreamsResponse, ReadEnd, - ReadStart, StreamInfo, TailResponse, - proto::{AppendAck, AppendInput, ReadBatch}, +use prost::{self, Message}; +use s2_api::v1::{ + access::{ + AccessTokenInfo, IssueAccessTokenResponse, ListAccessTokensRequest, + ListAccessTokensResponse, + }, + basin::{ + BasinInfo, CreateBasinRequest, ListBasinsRequest, ListBasinsResponse, + }, + config::{BasinConfig, BasinReconfiguration, StreamConfig, StreamReconfiguration}, + metrics::{ + AccountMetricSetRequest, BasinMetricSetRequest, MetricSetResponse, StreamMetricSetRequest, + }, + stream::{ + AppendConditionFailed, CreateStreamRequest, ListStreamsRequest, ListStreamsResponse, + ReadEnd, ReadStart, StreamInfo, TailResponse, + proto::{AppendAck, AppendInput, ReadBatch}, + s2s::{self, FrameDecoder, SessionMessage, TerminalMessage}, + }, }; use secrecy::ExposeSecret; -use std::ops::Deref; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; use tokio_util::codec::Decoder; use tracing::{debug, warn}; use url::Url; +#[cfg(feature = "_hidden")] +use s2_api::v1::basin::CreateOrReconfigureBasinRequest; + +use crate::{ + client::{self, StreamingResponse, UnaryResponse}, + retry::{RetryBackoff, RetryBackoffBuilder}, + types::{ + AccessTokenId, BasinAuthority, BasinName, Compression, RetryConfig, S2Config, S2Endpoints, + StreamName, + }, +}; const CONTENT_TYPE_S2S: &str = "s2s/proto"; const CONTENT_TYPE_PROTO: &str = "application/protobuf"; @@ -135,6 +144,22 @@ impl AccountClient { Ok(response.json::()?) } + #[cfg(feature = "_hidden")] + pub async fn create_or_reconfigure_basin( + &self, + name: BasinName, + request: Option, + ) -> Result<(bool, BasinInfo), ApiError> { + let url = self.base_url.join(&format!("v1/basins/{name}"))?; + let request = match request { + Some(body) => self.put(url).json(&body).build()?, + None => self.put(url).build()?, + }; + let response = self.request(request).send().await?; + let was_created = response.status() == StatusCode::CREATED; + Ok((was_created, response.json::()?)) + } + pub async fn delete_basin( &self, name: BasinName, @@ -273,6 +298,24 @@ impl BasinClient { Ok(response.json::()?) } + #[cfg(feature = "_hidden")] + pub async fn create_or_reconfigure_stream( + &self, + name: StreamName, + config: Option, + ) -> Result<(bool, StreamInfo), ApiError> { + let url = self + .base_url + .join(&format!("v1/streams/{}", urlencoding::encode(&name)))?; + let request = match config { + Some(body) => self.put(url).json(&body).build()?, + None => self.put(url).build()?, + }; + let response = self.request(request).send().await?; + let was_created = response.status() == StatusCode::CREATED; + Ok((was_created, response.json::()?)) + } + pub async fn delete_stream( &self, name: StreamName, @@ -753,6 +796,14 @@ impl BaseClient { .compression(self.compression) } + #[cfg(feature = "_hidden")] + pub fn put(&self, url: Url) -> client::RequestBuilder { + client::RequestBuilder::put(url) + .timeout(self.request_timeout) + .headers(&self.default_headers) + .compression(self.compression) + } + pub fn delete(&self, url: Url) -> client::RequestBuilder { client::RequestBuilder::delete(url) .timeout(self.request_timeout) diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 390c20a..75784ed 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -213,6 +213,11 @@ impl RequestBuilder { Self::new(Method::PATCH, url) } + #[cfg(feature = "_hidden")] + pub fn put(url: Url) -> Self { + Self::new(Method::PUT, url) + } + pub fn delete(url: Url) -> Self { Self::new(Method::DELETE, url) } diff --git a/sdk/src/ops.rs b/sdk/src/ops.rs index 7f29102..b4a006e 100644 --- a/sdk/src/ops.rs +++ b/sdk/src/ops.rs @@ -2,6 +2,10 @@ use futures::StreamExt; #[cfg(feature = "_hidden")] use crate::client::Connect; +#[cfg(feature = "_hidden")] +use crate::types::{ + CreateOrReconfigureBasinInput, CreateOrReconfigureStreamInput, CreateOrReconfigured, +}; use crate::{ api::{AccountClient, BaseClient, BasinClient}, producer::{Producer, ProducerConfig}, @@ -103,6 +107,32 @@ impl S2 { Ok(info.into()) } + /// Create or reconfigure a basin. + /// + /// Creates the basin if it doesn't exist, or reconfigures it to match the provided + /// configuration if it does. Uses HTTP PUT semantics — always idempotent. + /// + /// Returns [`CreateOrReconfigured::Created`] with the basin info if the basin was newly + /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed. + #[doc(hidden)] + #[cfg(feature = "_hidden")] + pub async fn create_or_reconfigure_basin( + &self, + input: CreateOrReconfigureBasinInput, + ) -> Result, S2Error> { + let (name, request) = input.into(); + let (was_created, info) = self + .client + .create_or_reconfigure_basin(name, request) + .await?; + let info = info.into(); + Ok(if was_created { + CreateOrReconfigured::Created(info) + } else { + CreateOrReconfigured::Reconfigured(info) + }) + } + /// Get basin configuration. pub async fn get_basin_config(&self, name: BasinName) -> Result { let config = self.client.get_basin_config(name).await?; @@ -295,6 +325,32 @@ impl S2Basin { Ok(info.try_into()?) } + /// Create or reconfigure a stream. + /// + /// Creates the stream if it doesn't exist, or reconfigures it to match the provided + /// configuration if it does. Uses HTTP PUT semantics — always idempotent. + /// + /// Returns [`CreateOrReconfigured::Created`] with the stream info if the stream was newly + /// created, or [`CreateOrReconfigured::Reconfigured`] if it already existed. + #[doc(hidden)] + #[cfg(feature = "_hidden")] + pub async fn create_or_reconfigure_stream( + &self, + input: CreateOrReconfigureStreamInput, + ) -> Result, S2Error> { + let (name, config) = input.into(); + let (was_created, info) = self + .client + .create_or_reconfigure_stream(name, config) + .await?; + let info = info.try_into()?; + Ok(if was_created { + CreateOrReconfigured::Created(info) + } else { + CreateOrReconfigured::Reconfigured(info) + }) + } + /// Get stream configuration. pub async fn get_stream_config(&self, name: StreamName) -> Result { let config = self.client.get_stream_config(name).await?; diff --git a/sdk/src/types.rs b/sdk/src/types.rs index aaa41a3..fc261d8 100644 --- a/sdk/src/types.rs +++ b/sdk/src/types.rs @@ -862,6 +862,35 @@ impl From for api::basin::BasinScope { } } +/// Result of a create-or-reconfigure operation. +/// +/// Indicates whether the resource was newly created or already existed and was +/// reconfigured. Both variants hold the resource's current state. +#[doc(hidden)] +#[cfg(feature = "_hidden")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CreateOrReconfigured { + /// Resource was newly created. + Created(T), + /// Resource already existed and was reconfigured to match the spec. + Reconfigured(T), +} + +#[cfg(feature = "_hidden")] +impl CreateOrReconfigured { + /// Returns `true` if the resource was newly created. + pub fn is_created(&self) -> bool { + matches!(self, Self::Created(_)) + } + + /// Unwrap the inner value regardless of variant. + pub fn into_inner(self) -> T { + match self { + Self::Created(t) | Self::Reconfigured(t) => t, + } + } +} + #[derive(Debug, Clone)] #[non_exhaustive] /// Input for [`create_basin`](crate::S2::create_basin) operation. @@ -920,6 +949,72 @@ impl From for (api::basin::CreateBasinRequest, String) { } } +#[derive(Debug, Clone)] +#[non_exhaustive] +/// Input for [`create_or_reconfigure_basin`](crate::S2::create_or_reconfigure_basin) operation. +#[doc(hidden)] +#[cfg(feature = "_hidden")] +pub struct CreateOrReconfigureBasinInput { + /// Basin name. + pub name: BasinName, + /// Reconfiguration for the basin. + /// + /// If `None`, the basin is created with default configuration or left unchanged if it exists. + pub config: Option, + /// Scope of the basin. + /// + /// Defaults to [`AwsUsEast1`](BasinScope::AwsUsEast1). Cannot be changed once set. + pub scope: Option, +} + +#[cfg(feature = "_hidden")] +impl CreateOrReconfigureBasinInput { + /// Create a new [`CreateOrReconfigureBasinInput`] with the given basin name. + pub fn new(name: BasinName) -> Self { + Self { + name, + config: None, + scope: None, + } + } + + /// Set the reconfiguration for the basin. + pub fn with_config(self, config: BasinReconfiguration) -> Self { + Self { + config: Some(config), + ..self + } + } + + /// Set the scope of the basin. + pub fn with_scope(self, scope: BasinScope) -> Self { + Self { + scope: Some(scope), + ..self + } + } +} + +#[cfg(feature = "_hidden")] +impl From + for ( + BasinName, + Option, + ) +{ + fn from(value: CreateOrReconfigureBasinInput) -> Self { + let request = if value.config.is_some() || value.scope.is_some() { + Some(api::basin::CreateOrReconfigureBasinRequest { + config: value.config.map(Into::into), + scope: value.scope.map(Into::into), + }) + } else { + None + }; + (value.name, request) + } +} + #[derive(Debug, Clone, Default)] #[non_exhaustive] /// Input for [`list_basins`](crate::S2::list_basins) operation. @@ -2577,6 +2672,46 @@ impl From for (api::stream::CreateStreamRequest, String) { } } +#[derive(Debug, Clone)] +#[non_exhaustive] +/// Input for [`create_or_reconfigure_stream`](crate::S2Basin::create_or_reconfigure_stream) +/// operation. +#[doc(hidden)] +#[cfg(feature = "_hidden")] +pub struct CreateOrReconfigureStreamInput { + /// Stream name. + pub name: StreamName, + /// Reconfiguration for the stream. + /// + /// If `None`, the stream is created with default configuration or left unchanged if it exists. + pub config: Option, +} + +#[cfg(feature = "_hidden")] +impl CreateOrReconfigureStreamInput { + /// Create a new [`CreateOrReconfigureStreamInput`] with the given stream name. + pub fn new(name: StreamName) -> Self { + Self { name, config: None } + } + + /// Set the reconfiguration for the stream. + pub fn with_config(self, config: StreamReconfiguration) -> Self { + Self { + config: Some(config), + ..self + } + } +} + +#[cfg(feature = "_hidden")] +impl From + for (StreamName, Option) +{ + fn from(value: CreateOrReconfigureStreamInput) -> Self { + (value.name, value.config.map(Into::into)) + } +} + #[derive(Debug, Clone)] #[non_exhaustive] /// Input of [`delete_stream`](crate::S2Basin::delete_stream) operation.