Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions example-streamer-uses/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,47 @@ To better track the messages you can also check that the payload is listed with
Further, you will see printed the deserialized `HelloResponse` ProtoBuf object:

> Here we received response: HelloResponse { message: "The response to the request: someip_client@i=n", special_fields: SpecialFields { unknown_fields: UnknownFields { fields: None }, cached_size: CachedSize { size: 0 } } }

## CLI parameter overrides

All 12 example binaries now support a common local-entity override surface:

- `--uauthority <string>`
- `--uentity <u32 decimal|hex>`
- `--uversion <u8 decimal|hex>`
- `--resource <u16 decimal|hex>`

Additional role-specific flags:

- Client binaries: `--target-authority --target-uentity --target-uversion --target-resource`
- Subscriber binaries: `--source-authority --source-uentity --source-uversion --source-resource`

Transport-specific flags:

- MQTT binaries: `--broker-uri` (default `localhost:1883`)
- Zenoh binaries: `--endpoint` (existing behavior, now composed with URI overrides)
- SOME/IP binaries: `--vsomeip-config` and `--remote-authority`

Running with no extra flags keeps prior behavior (defaults are aligned with previous constants).

### Numeric formats

Numeric URI flags accept decimal and `0x`/`0X` prefixed hex.

Decimal example:

```bash
cargo run -p example-streamer-uses --bin mqtt_publisher --features mqtt-transport -- --uauthority authority-a --uentity 23456 --uversion 1 --resource 32769
```

Hex example:

```bash
cargo run -p example-streamer-uses --bin mqtt_publisher --features mqtt-transport -- --uauthority authority-a --uentity 0x5BA0 --uversion 0x1 --resource 0x8001
```

Invalid formats (for example underscores in numeric values) are rejected with deterministic errors that include the flag name, raw value, and expected range.

### SOME/IP caveat

SOME/IP binaries accept URI overrides, but runtime compatibility can still depend on application/service IDs configured in the selected `--vsomeip-config` file. If `--uentity` is overridden, the binaries emit a startup warning when that override may conflict with vsomeip config expectations.
157 changes: 157 additions & 0 deletions example-streamer-uses/src/bin/common/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#[cfg(feature = "vsomeip-transport")]
use std::fs::canonicalize;
#[cfg(feature = "vsomeip-transport")]
use std::path::PathBuf;
use up_rust::{UCode, UStatus, UUri};

fn format_parse_error(flag: &str, raw: &str, expected: &str, max: u128) -> String {
format!(
"invalid value for {flag}: '{raw}' (expected {expected} in decimal or 0x-prefixed hex, range 0..={max})"
)
}

fn parse_unsigned(flag: &str, raw: &str, expected: &str, max: u128) -> Result<u128, String> {
if raw.is_empty() || raw.chars().any(char::is_whitespace) || raw.contains('_') {
return Err(format_parse_error(flag, raw, expected, max));
}

let parsed = if let Some(hex_digits) = raw.strip_prefix("0x").or_else(|| raw.strip_prefix("0X"))
{
if hex_digits.is_empty() {
return Err(format_parse_error(flag, raw, expected, max));
}
u128::from_str_radix(hex_digits, 16)
.map_err(|_| format_parse_error(flag, raw, expected, max))?
} else {
raw.parse::<u128>()
.map_err(|_| format_parse_error(flag, raw, expected, max))?
};

if parsed > max {
return Err(format_parse_error(flag, raw, expected, max));
}

Ok(parsed)
}

pub(crate) fn parse_u32_flag(flag: &str, raw: &str) -> Result<u32, String> {
parse_unsigned(flag, raw, "u32", u32::MAX as u128).map(|value| value as u32)
}

pub(crate) fn parse_u16_flag(flag: &str, raw: &str) -> Result<u16, String> {
parse_unsigned(flag, raw, "u16", u16::MAX as u128).map(|value| value as u16)
}

pub(crate) fn parse_u8_flag(flag: &str, raw: &str) -> Result<u8, String> {
parse_unsigned(flag, raw, "u8", u8::MAX as u128).map(|value| value as u8)
}

pub(crate) fn invalid_argument_status(message: impl Into<String>) -> UStatus {
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, message.into())
}

pub(crate) fn parse_u32_status(flag: &str, raw: &str) -> Result<u32, UStatus> {
parse_u32_flag(flag, raw).map_err(invalid_argument_status)
}

pub(crate) fn parse_u16_status(flag: &str, raw: &str) -> Result<u16, UStatus> {
parse_u16_flag(flag, raw).map_err(invalid_argument_status)
}

pub(crate) fn parse_u8_status(flag: &str, raw: &str) -> Result<u8, UStatus> {
parse_u8_flag(flag, raw).map_err(invalid_argument_status)
}

pub(crate) fn build_uuri(
authority: &str,
uentity: u32,
uversion: u8,
resource: u16,
) -> Result<UUri, UStatus> {
UUri::try_from_parts(authority, uentity, uversion, resource).map_err(|error| {
invalid_argument_status(format!(
"unable to build UUri from authority='{authority}', uentity={uentity:#X}, uversion={uversion:#X}, resource={resource:#X}: {error}"
))
})
}

#[cfg(feature = "vsomeip-transport")]
#[allow(dead_code)]
pub(crate) fn canonicalize_cli_path(flag: &str, raw: &str) -> Result<PathBuf, UStatus> {
canonicalize(PathBuf::from(raw)).map_err(|error| {
invalid_argument_status(format!(
"invalid value for {flag}: '{raw}' (expected an existing file path): {error}"
))
})
}

#[cfg(test)]
mod tests {
use super::*;
use clap::{CommandFactory, Parser};

#[test]
fn parse_u32_accepts_decimal_and_hex() {
assert_eq!(parse_u32_flag("--uentity", "23456").unwrap(), 23_456);
assert_eq!(parse_u32_flag("--uentity", "0x5BA0").unwrap(), 0x5BA0);
}

#[test]
fn parse_u8_rejects_underscores_and_whitespace() {
let underscore_error = parse_u8_flag("--uversion", "0x1_0").unwrap_err();
assert_eq!(
underscore_error,
"invalid value for --uversion: '0x1_0' (expected u8 in decimal or 0x-prefixed hex, range 0..=255)"
);

let whitespace_error = parse_u8_flag("--uversion", "1 0").unwrap_err();
assert_eq!(
whitespace_error,
"invalid value for --uversion: '1 0' (expected u8 in decimal or 0x-prefixed hex, range 0..=255)"
);
}

#[test]
fn parse_u16_reports_range_overflow() {
let error = parse_u16_flag("--resource", "0x1_0000").unwrap_err();
assert_eq!(
error,
"invalid value for --resource: '0x1_0000' (expected u16 in decimal or 0x-prefixed hex, range 0..=65535)"
);
}

#[test]
fn build_uuri_from_parts() {
let uuri = build_uuri("authority-a", 0x5BA0, 0x1, 0x8001).unwrap();
assert_eq!(uuri.authority_name, "authority-a");
assert_eq!(uuri.ue_id, 0x5BA0);
assert_eq!(uuri.uentity_major_version(), 0x1);
assert_eq!(uuri.resource_id(), 0x8001);
}

#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct RepresentativeHelpArgs {
#[arg(long, default_value = "authority-a")]
uauthority: String,
#[arg(long, default_value = "0x5BA0")]
uentity: String,
#[arg(long, default_value = "0x1")]
uversion: String,
#[arg(long, default_value = "0x8001")]
resource: String,
}

#[test]
fn representative_help_includes_common_flags_and_defaults() {
let mut command = RepresentativeHelpArgs::command();
let help = command.render_long_help().to_string();

assert!(help.contains("--uauthority <UAUTHORITY>"));
assert!(help.contains("authority-a"));
assert!(help.contains("--uentity <UENTITY>"));
assert!(help.contains("0x5BA0"));
assert!(help.contains("--uversion <UVERSION>"));
assert!(help.contains("--resource <RESOURCE>"));
}
}
2 changes: 2 additions & 0 deletions example-streamer-uses/src/bin/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub(crate) mod cli;

use async_trait::async_trait;
use hello_world_protos::{
hello_world_service::{HelloRequest, HelloResponse},
Expand Down
88 changes: 63 additions & 25 deletions example-streamer-uses/src/bin/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,97 @@

mod common;

use clap::Parser;
use common::cli;
use common::ServiceResponseListener;
use hello_world_protos::hello_world_service::HelloRequest;
use log::info;
use std::sync::Arc;
use std::time::Duration;
use up_rust::{UListener, UMessageBuilder, UStatus, UTransport, UUri};
use up_rust::{UListener, UMessageBuilder, UStatus, UTransport};
use up_transport_mqtt5::{Mqtt5Transport, Mqtt5TransportOptions, MqttClientOptions};

const SERVICE_AUTHORITY: &str = "authority-b";
const SERVICE_UE_ID: u32 = 0x1236;
const SERVICE_UE_VERSION_MAJOR: u8 = 1;
const SERVICE_RESOURCE_ID: u16 = 0x0421;
const DEFAULT_UAUTHORITY: &str = "authority-a";
const DEFAULT_UENTITY: &str = "0x4321";
const DEFAULT_UVERSION: &str = "0x1";
const DEFAULT_RESOURCE: &str = "0x0";

const CLIENT_AUTHORITY: &str = "authority-a";
const CLIENT_UE_ID: u32 = 0x4321;
const CLIENT_UE_VERSION_MAJOR: u8 = 1;
const CLIENT_RESOURCE_ID: u16 = 0;
const DEFAULT_TARGET_AUTHORITY: &str = "authority-b";
const DEFAULT_TARGET_UENTITY: &str = "0x1236";
const DEFAULT_TARGET_UVERSION: &str = "0x1";
const DEFAULT_TARGET_RESOURCE: &str = "0x0421";

const DEFAULT_BROKER_URI: &str = "localhost:1883";

const REQUEST_TTL: u32 = 1000;

#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct Args {
/// Authority for the local client identity
#[arg(long, default_value = DEFAULT_UAUTHORITY)]
uauthority: String,
/// UEntity ID for local client identity (decimal or 0x-prefixed hex)
#[arg(long, default_value = DEFAULT_UENTITY)]
uentity: String,
/// UEntity major version for local client identity (decimal or 0x-prefixed hex)
#[arg(long, default_value = DEFAULT_UVERSION)]
uversion: String,
/// Resource ID for local client identity (decimal or 0x-prefixed hex)
#[arg(long, default_value = DEFAULT_RESOURCE)]
resource: String,
/// Authority for the target service URI
#[arg(long, default_value = DEFAULT_TARGET_AUTHORITY)]
target_authority: String,
/// UEntity ID for target service URI (decimal or 0x-prefixed hex)
#[arg(long, default_value = DEFAULT_TARGET_UENTITY)]
target_uentity: String,
/// UEntity major version for target service URI (decimal or 0x-prefixed hex)
#[arg(long, default_value = DEFAULT_TARGET_UVERSION)]
target_uversion: String,
/// Resource ID for target service URI (decimal or 0x-prefixed hex)
#[arg(long, default_value = DEFAULT_TARGET_RESOURCE)]
target_resource: String,
/// MQTT broker URI in host:port format
#[arg(long, default_value = DEFAULT_BROKER_URI)]
broker_uri: String,
}

#[tokio::main]
async fn main() -> Result<(), UStatus> {
env_logger::init();

let args = Args::parse();

info!("Started mqtt_client.");

let uentity = cli::parse_u32_status("--uentity", &args.uentity)?;
let uversion = cli::parse_u8_status("--uversion", &args.uversion)?;
let resource = cli::parse_u16_status("--resource", &args.resource)?;
let target_uentity = cli::parse_u32_status("--target-uentity", &args.target_uentity)?;
let target_uversion = cli::parse_u8_status("--target-uversion", &args.target_uversion)?;
let target_resource = cli::parse_u16_status("--target-resource", &args.target_resource)?;

// Source represents the client (specifically the topic that the client sends to)
let source = UUri::try_from_parts(
CLIENT_AUTHORITY,
CLIENT_UE_ID,
CLIENT_UE_VERSION_MAJOR,
CLIENT_RESOURCE_ID,
)
.unwrap();
let source = cli::build_uuri(&args.uauthority, uentity, uversion, resource)?;
// Sink is the destination entity which the streamer should rout our messages to.
let sink = UUri::try_from_parts(
SERVICE_AUTHORITY,
SERVICE_UE_ID,
SERVICE_UE_VERSION_MAJOR,
SERVICE_RESOURCE_ID,
)
.unwrap();
let sink = cli::build_uuri(
&args.target_authority,
target_uentity,
target_uversion,
target_resource,
)?;

let mqtt_client_options = MqttClientOptions {
broker_uri: "localhost:1883".to_string(),
broker_uri: args.broker_uri,
..Default::default()
};
let mqtt_transport_options = Mqtt5TransportOptions {
mqtt_client_options,
..Default::default()
};
let mqtt5_transport =
Mqtt5Transport::new(mqtt_transport_options, CLIENT_AUTHORITY.to_string()).await?;
Mqtt5Transport::new(mqtt_transport_options, args.uauthority.to_string()).await?;
mqtt5_transport.connect().await?;

let client: Arc<dyn UTransport> = Arc::new(mqtt5_transport);
Expand Down
Loading