Skip to content
Closed

[WIP] #228

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions api/src/v1/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ pub struct AccessTokenInfo {
pub expires_at: Option<OffsetDateTime>,
/// Namespace streams based on the configured stream-level scope, which must be a prefix.
/// Stream name arguments will be automatically prefixed, and the prefix will be stripped when listing streams.
#[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
pub auto_prefix_streams: Option<bool>,
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(default = false))]
pub auto_prefix_streams: bool,
/// Access token scope.
pub scope: AccessTokenScope,
}
Expand All @@ -176,7 +177,7 @@ impl TryFrom<AccessTokenInfo> for types::access::IssueAccessTokenRequest {
Ok(Self {
id: value.id,
expires_at: value.expires_at,
auto_prefix_streams: value.auto_prefix_streams.unwrap_or_default(),
auto_prefix_streams: value.auto_prefix_streams,
scope: value.scope.try_into()?,
})
}
Expand All @@ -187,7 +188,7 @@ impl From<types::access::AccessTokenInfo> for AccessTokenInfo {
Self {
id: value.id,
expires_at: Some(value.expires_at),
auto_prefix_streams: Some(value.auto_prefix_streams),
auto_prefix_streams: value.auto_prefix_streams,
scope: value.scope.into(),
}
}
Expand All @@ -198,7 +199,7 @@ impl From<types::access::IssueAccessTokenRequest> for AccessTokenInfo {
Self {
id: value.id,
expires_at: value.expires_at,
auto_prefix_streams: Some(value.auto_prefix_streams),
auto_prefix_streams: value.auto_prefix_streams,
scope: value.scope.into(),
}
}
Expand Down Expand Up @@ -352,32 +353,28 @@ impl From<types::access::PermittedOperationGroups> for PermittedOperationGroups
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct ReadWritePermissions {
/// Read permission.
#[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
pub read: Option<bool>,
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(default = false))]
pub read: bool,
/// Write permission.
#[cfg_attr(feature = "utoipa", schema(value_type = bool, default = false, required = false))]
pub write: Option<bool>,
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(default = false))]
pub write: bool,
}

impl From<ReadWritePermissions> for types::access::ReadWritePermissions {
fn from(value: ReadWritePermissions) -> Self {
let ReadWritePermissions { read, write } = value;

Self {
read: read.unwrap_or_default(),
write: write.unwrap_or_default(),
}
Self { read, write }
}
}

impl From<types::access::ReadWritePermissions> for ReadWritePermissions {
fn from(value: types::access::ReadWritePermissions) -> Self {
let types::access::ReadWritePermissions { read, write } = value;

Self {
read: Some(read),
write: Some(write),
}
Self { read, write }
}
}

Expand Down
33 changes: 20 additions & 13 deletions api/src/v1/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,17 @@ pub struct TimestampingConfig {
/// Timestamping mode for appends that influences how timestamps are handled.
pub mode: Option<TimestampingMode>,
/// Allow client-specified timestamps to exceed the arrival time.
/// If this is `false` or not set, client timestamps will be capped at the arrival time.
pub uncapped: Option<bool>,
/// If this is `false`, client timestamps will be capped at the arrival time.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(default = false))]
pub uncapped: bool,
}

impl TimestampingConfig {
pub fn to_opt(config: types::config::OptionalTimestampingConfig) -> Option<Self> {
let config = TimestampingConfig {
mode: config.mode.map(Into::into),
uncapped: config.uncapped,
uncapped: config.uncapped.unwrap_or_default(),
};
if config == Self::default() {
None
Expand All @@ -135,7 +137,7 @@ impl From<types::config::TimestampingConfig> for TimestampingConfig {
fn from(value: types::config::TimestampingConfig) -> Self {
Self {
mode: Some(value.mode.into()),
uncapped: Some(value.uncapped),
uncapped: value.uncapped,
}
}
}
Expand All @@ -144,7 +146,7 @@ impl From<TimestampingConfig> for types::config::OptionalTimestampingConfig {
fn from(value: TimestampingConfig) -> Self {
Self {
mode: value.mode.map(Into::into),
uncapped: value.uncapped,
uncapped: Some(value.uncapped),
}
}
}
Expand All @@ -160,14 +162,14 @@ pub struct TimestampingReconfiguration {
/// Allow client-specified timestamps to exceed the arrival time.
#[serde(default, skip_serializing_if = "Maybe::is_unspecified")]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<bool>))]
pub uncapped: Maybe<Option<bool>>,
pub uncapped: Maybe<bool>,
}

impl From<TimestampingReconfiguration> for types::config::TimestampingReconfiguration {
fn from(value: TimestampingReconfiguration) -> Self {
Self {
mode: value.mode.map_opt(Into::into),
uncapped: value.uncapped,
uncapped: value.uncapped.map(Some),
}
}
}
Expand All @@ -176,7 +178,7 @@ impl From<types::config::TimestampingReconfiguration> for TimestampingReconfigur
fn from(value: types::config::TimestampingReconfiguration) -> Self {
Self {
mode: value.mode.map_opt(Into::into),
uncapped: value.uncapped,
uncapped: value.uncapped.map(|v| v.unwrap_or_default()),
}
}
}
Expand Down Expand Up @@ -406,9 +408,11 @@ pub struct BasinConfig {
pub default_stream_config: Option<StreamConfig>,
/// Create stream on append if it doesn't exist, using the default stream configuration.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(default = false))]
pub create_stream_on_append: bool,
/// Create stream on read if it doesn't exist, using the default stream configuration.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(default = false))]
pub create_stream_on_read: bool,
}

Expand Down Expand Up @@ -527,10 +531,7 @@ mod tests {
}

fn gen_timestamping_config() -> impl Strategy<Value = TimestampingConfig> {
(
proptest::option::of(gen_timestamping_mode()),
proptest::option::of(any::<bool>()),
)
(proptest::option::of(gen_timestamping_mode()), any::<bool>())
.prop_map(|(mode, uncapped)| TimestampingConfig { mode, uncapped })
}

Expand Down Expand Up @@ -602,7 +603,13 @@ mod tests {
}

fn gen_timestamping_reconfiguration() -> impl Strategy<Value = TimestampingReconfiguration> {
(gen_maybe(gen_timestamping_mode()), gen_maybe(any::<bool>()))
(
gen_maybe(gen_timestamping_mode()),
prop_oneof![
Just(Maybe::Unspecified),
any::<bool>().prop_map(Maybe::Specified),
],
)
.prop_map(|(mode, uncapped)| TimestampingReconfiguration { mode, uncapped })
}

Expand Down
20 changes: 10 additions & 10 deletions sdk/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ impl From<api::config::TimestampingConfig> for TimestampingConfig {
fn from(value: api::config::TimestampingConfig) -> Self {
Self {
mode: value.mode.map(Into::into),
uncapped: value.uncapped.unwrap_or_default(),
uncapped: value.uncapped,
}
}
}
Expand All @@ -641,7 +641,7 @@ impl From<TimestampingConfig> for api::config::TimestampingConfig {
fn from(value: TimestampingConfig) -> Self {
Self {
mode: value.mode.map(Into::into),
uncapped: Some(value.uncapped),
uncapped: value.uncapped,
}
}
}
Expand Down Expand Up @@ -1105,7 +1105,7 @@ pub struct TimestampingReconfiguration {
/// Override for the existing [`mode`](TimestampingConfig::mode).
pub mode: Maybe<Option<TimestampingMode>>,
/// Override for the existing [`uncapped`](TimestampingConfig::uncapped) setting.
pub uncapped: Maybe<Option<bool>>,
pub uncapped: Maybe<bool>,
}

impl TimestampingReconfiguration {
Expand All @@ -1125,7 +1125,7 @@ impl TimestampingReconfiguration {
/// Set the override for the existing [`uncapped`](TimestampingConfig::uncapped).
pub fn with_uncapped(self, uncapped: bool) -> Self {
Self {
uncapped: Maybe::Specified(Some(uncapped)),
uncapped: Maybe::Specified(uncapped),
..self
}
}
Expand Down Expand Up @@ -1430,7 +1430,7 @@ impl TryFrom<api::access::AccessTokenInfo> for AccessTokenInfo {
Ok(Self {
id: value.id,
expires_at,
auto_prefix_streams: value.auto_prefix_streams.unwrap_or(false),
auto_prefix_streams: value.auto_prefix_streams,
scope: value.scope.into(),
})
}
Expand Down Expand Up @@ -1523,17 +1523,17 @@ impl ReadWritePermissions {
impl From<ReadWritePermissions> for api::access::ReadWritePermissions {
fn from(value: ReadWritePermissions) -> Self {
Self {
read: Some(value.read),
write: Some(value.write),
read: value.read,
write: value.write,
}
}
}

impl From<api::access::ReadWritePermissions> for ReadWritePermissions {
fn from(value: api::access::ReadWritePermissions) -> Self {
Self {
read: value.read.unwrap_or_default(),
write: value.write.unwrap_or_default(),
read: value.read,
write: value.write,
}
}
}
Expand Down Expand Up @@ -1980,7 +1980,7 @@ impl From<IssueAccessTokenInput> for api::access::AccessTokenInfo {
Self {
id: value.id,
expires_at: value.expires_at.map(Into::into),
auto_prefix_streams: value.auto_prefix_streams.then_some(true),
auto_prefix_streams: value.auto_prefix_streams,
scope: value.scope.into(),
}
}
Expand Down
Loading