diff --git a/Cargo.lock b/Cargo.lock index c0f58d916..e2a66097f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1321,11 +1321,11 @@ checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#2217e1a917583ddccb6668a7bd185f3d39bcede0" +source = "git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2#537f9bdbd89e786d508c16dffbcb6beb8eedf5b4" dependencies = [ "base64 0.22.1", "byteorder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "prost", "rand 0.8.5", "rand_chacha 0.3.1", @@ -2833,7 +2833,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "http 1.3.1", "metrics", "poc-metrics", @@ -2875,7 +2875,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "hex-literal", "prost", "rust_decimal", @@ -3485,7 +3485,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", "hex", "itertools 0.14.0", "jsonrpc_client", @@ -3510,6 +3510,23 @@ dependencies = [ "url", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2#537f9bdbd89e786d508c16dffbcb6beb8eedf5b4" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_json", + "strum", + "strum_macros", + "tonic", + "tonic-prost", + "tonic-prost-build", +] + [[package]] name = "helium-proto" version = "0.1.0" @@ -3556,7 +3573,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "hextree", "rust_decimal", "rust_decimal_macros", @@ -4083,7 +4100,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "http 1.3.1", "humantime-serde", "metrics", @@ -4165,8 +4182,9 @@ dependencies = [ "file-store-oracles", "futures", "futures-util", + "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "hextree", "http 1.3.1", "http-serde", @@ -4212,7 +4230,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "http 1.3.1", "http-serde", "humantime-serde", @@ -4255,7 +4273,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "http-serde", "humantime-serde", "iot-config", @@ -4926,7 +4944,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "hextree", "http 1.3.1", "http-serde", @@ -4972,7 +4990,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "mobile-config", "prost", "rand 0.8.5", @@ -5001,7 +5019,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "http 1.3.1", "http-serde", "humantime-serde", @@ -5049,7 +5067,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "hex-assignments", "hextree", "http-serde", @@ -5678,7 +5696,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "http 1.3.1", "hyper 0.14.32", "jsonrpsee", @@ -5793,7 +5811,7 @@ dependencies = [ "file-store-oracles", "futures", "futures-util", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6446,7 +6464,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=macpie%2Finfo_stream_v2)", "humantime-serde", "metrics", "metrics-exporter-prometheus", diff --git a/Cargo.toml b/Cargo.toml index 3de5c52e5..d2153b5f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,10 +126,10 @@ aws-smithy-types-convert = { version = "0.60.9", features = [ url = "2.5.4" ### Protobuf -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2" } # Pickup versions from above prost = "*" tonic = { version = "*", features = ["tls-aws-lc", "tls-native-roots"] } diff --git a/file_store_oracles/src/traits/msg_verify.rs b/file_store_oracles/src/traits/msg_verify.rs index 52356530e..858452fcb 100644 --- a/file_store_oracles/src/traits/msg_verify.rs +++ b/file_store_oracles/src/traits/msg_verify.rs @@ -73,6 +73,7 @@ impl_msg_verify!(iot_config::AdminLoadRegionReqV1, signature); impl_msg_verify!(iot_config::AdminRemoveKeyReqV1, signature); impl_msg_verify!(iot_config::GatewayInfoReqV1, signature); impl_msg_verify!(iot_config::GatewayInfoStreamReqV1, signature); +impl_msg_verify!(iot_config::GatewayInfoStreamReqV2, signature); impl_msg_verify!(iot_config::RegionParamsReqV1, signature); impl_msg_verify!(iot_config::GatewayInfoResV1, signature); impl_msg_verify!(iot_config::GatewayInfoStreamResV1, signature); diff --git a/iot_config/Cargo.toml b/iot_config/Cargo.toml index f695d3739..86f0c15ab 100644 --- a/iot_config/Cargo.toml +++ b/iot_config/Cargo.toml @@ -51,5 +51,6 @@ poc-metrics = { path = "../metrics" } task-manager = { path = "../task_manager" } [dev-dependencies] -rand = { workspace = true } backon = { version = "0", features = ["tokio-sleep"] } +h3o = { workspace = true } +rand = { workspace = true } diff --git a/iot_config/migrations/20251027000000_gateways.sql b/iot_config/migrations/20251027000000_gateways.sql new file mode 100644 index 000000000..9fdd887f3 --- /dev/null +++ b/iot_config/migrations/20251027000000_gateways.sql @@ -0,0 +1,21 @@ +CREATE TABLE IF NOT EXISTS gateways ( + address BYTEA PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL, + elevation INTEGER, + gain INTEGER, + hash TEXT, + is_active BOOLEAN, + is_full_hotspot BOOLEAN, + last_changed_at TIMESTAMPTZ NOT NULL, + location BIGINT, + location_asserts INTEGER, + location_changed_at TIMESTAMPTZ, + refreshed_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ NOT NULL +); + +CREATE INDEX IF NOT EXISTS gateways_last_changed_idx ON gateways (last_changed_at DESC); + +CREATE INDEX IF NOT EXISTS gateways_location_changed_idx ON gateways (location_changed_at DESC) +WHERE + location IS NOT NULL; \ No newline at end of file diff --git a/iot_config/src/cli/daemon.rs b/iot_config/src/cli/daemon.rs new file mode 100644 index 000000000..4d63ac455 --- /dev/null +++ b/iot_config/src/cli/daemon.rs @@ -0,0 +1,99 @@ +use std::sync::Arc; + +use crate::gateway::tracker::Tracker; +use crate::grpc_server::GrpcServer; +use crate::sub_dao_service::SubDaoService; +use crate::{ + admin::AuthCache, admin_service::AdminService, db_cleaner::DbCleaner, + gateway::service::GatewayService, org, org_service::OrgService, region_map::RegionMapReader, + route_service::RouteService, settings::Settings, telemetry, +}; +use task_manager::TaskManager; + +#[derive(Debug, clap::Args)] +pub struct Daemon; + +impl Daemon { + pub async fn run(&self, settings: &Settings) -> anyhow::Result<()> { + custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; + + // Install prometheus metrics exporter + poc_metrics::start_metrics(&settings.metrics)?; + telemetry::initialize(); + + // Create database pool + let pool = settings.database.connect("iot-config-store").await?; + sqlx::migrate!().run(&pool).await?; + + // Create on-chain metadata pool + let metadata_pool = settings.metadata.connect("iot-config-metadata").await?; + + let (auth_updater, auth_cache) = AuthCache::new(settings.admin_pubkey()?, &pool).await?; + let (region_updater, region_map) = RegionMapReader::new(&pool).await?; + let (delegate_key_updater, delegate_key_cache) = org::delegate_keys_cache(&pool).await?; + + let signing_keypair = Arc::new(settings.signing_keypair()?); + + let gateway_svc = GatewayService::new( + signing_keypair.clone(), + pool.clone(), + region_map.clone(), + auth_cache.clone(), + delegate_key_cache, + )?; + + let route_svc = + RouteService::new(signing_keypair.clone(), auth_cache.clone(), pool.clone()); + + let org_svc = OrgService::new( + signing_keypair.clone(), + auth_cache.clone(), + pool.clone(), + route_svc.clone_update_channel(), + delegate_key_updater, + )?; + + let admin_svc = AdminService::new( + settings, + auth_cache.clone(), + auth_updater, + pool.clone(), + region_map.clone(), + region_updater, + )?; + + let subdao_svc = SubDaoService::new(settings, auth_cache, metadata_pool.clone())?; + + let listen_addr = settings.listen; + let pubkey = settings + .signing_keypair() + .map(|keypair| keypair.public_key().to_string())?; + tracing::debug!("listening on {listen_addr}"); + tracing::debug!("signing as {pubkey}"); + + let tracker = Tracker::new( + pool.clone(), + metadata_pool.clone(), + settings.gateway_tracker_interval, + ); + + let grpc_server = GrpcServer::new( + listen_addr, + gateway_svc, + route_svc, + org_svc, + admin_svc, + subdao_svc, + ); + + let db_cleaner = DbCleaner::new(pool.clone(), settings.deleted_entry_retention); + + TaskManager::builder() + .add_task(tracker) + .add_task(grpc_server) + .add_task(db_cleaner) + .build() + .start() + .await + } +} diff --git a/iot_config/src/cli/mod.rs b/iot_config/src/cli/mod.rs new file mode 100644 index 000000000..d8af7887e --- /dev/null +++ b/iot_config/src/cli/mod.rs @@ -0,0 +1,34 @@ +use crate::{cli::daemon::Daemon, Settings}; +use std::path::PathBuf; + +pub mod daemon; + +#[derive(Debug, clap::Parser)] +#[clap(version = env!("CARGO_PKG_VERSION"))] +#[clap(about = "Helium IoT Config Service")] +pub struct Cli { + /// Optional configuration file to use. If present, the toml file at the + /// given path will be loaded. Environment variables can override the + /// settings in the given file. + #[clap(short = 'c')] + config: Option, + + #[clap(subcommand)] + cmd: Cmd, +} + +impl Cli { + pub async fn run(self) -> anyhow::Result<()> { + match self.cmd { + Cmd::Server(server) => { + let settings = Settings::new(self.config)?; + server.run(&settings).await + } + } + } +} + +#[derive(Debug, clap::Subcommand)] +pub enum Cmd { + Server(Daemon), +} diff --git a/iot_config/src/client/mod.rs b/iot_config/src/client/mod.rs index 668ff0e15..062fe1c67 100644 --- a/iot_config/src/client/mod.rs +++ b/iot_config/src/client/mod.rs @@ -1,4 +1,4 @@ -use crate::gateway_info::{self, GatewayInfo, GatewayInfoStream}; +use crate::gateway::service::info::{self as gateway_info, GatewayInfo, GatewayInfoStream}; use file_store_oracles::traits::MsgVerify; use futures::stream::{self, StreamExt}; use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; diff --git a/iot_config/src/gateway/db.rs b/iot_config/src/gateway/db.rs new file mode 100644 index 000000000..91a3b8284 --- /dev/null +++ b/iot_config/src/gateway/db.rs @@ -0,0 +1,186 @@ +use chrono::{DateTime, Utc}; +use futures::{Stream, StreamExt, TryStreamExt}; +use helium_crypto::PublicKeyBinary; +use sqlx::{postgres::PgRow, FromRow, PgExecutor, PgPool, Postgres, QueryBuilder, Row}; + +#[derive(Debug, Clone)] +pub struct Gateway { + pub address: PublicKeyBinary, + // When the record was first created from metadata DB + pub created_at: DateTime, + pub elevation: Option, + pub gain: Option, + pub hash: String, + pub is_active: Option, + pub is_full_hotspot: Option, + // When location or hash last changed, set to refreshed_at (updated via SQL query see Gateway::insert) + pub last_changed_at: DateTime, + pub location: Option, + pub location_asserts: Option, + // When location last changed, set to refreshed_at (updated via SQL query see Gateway::insert) + pub location_changed_at: Option>, + // When record was last updated from metadata DB (could be set to now if no metadata DB info) + pub refreshed_at: Option>, + // When record was last updated + pub updated_at: DateTime, +} + +impl Gateway { + pub async fn insert_bulk(pool: &PgPool, rows: &[Gateway]) -> anyhow::Result { + if rows.is_empty() { + return Ok(0); + } + let mut qb = QueryBuilder::::new( + "INSERT INTO gateways ( + address, + created_at, + elevation, + gain, + hash, + is_active, + is_full_hotspot, + last_changed_at, + location, + location_asserts, + location_changed_at, + refreshed_at, + updated_at + ) ", + ); + + qb.push_values(rows, |mut b, g| { + b.push_bind(g.address.as_ref()) + .push_bind(g.created_at) + .push_bind(g.elevation.map(|v| v as i32)) + .push_bind(g.gain.map(|v| v as i32)) + .push_bind(g.hash.clone()) + .push_bind(g.is_active) + .push_bind(g.is_full_hotspot) + .push_bind(g.last_changed_at) + .push_bind(g.location.map(|v| v as i64)) + .push_bind(g.location_asserts.map(|v| v as i32)) + .push_bind(g.location_changed_at) + .push_bind(g.refreshed_at) + .push_bind(g.updated_at); + }); + + qb.push( + " ON CONFLICT (address) DO UPDATE SET + created_at = EXCLUDED.created_at, + elevation = EXCLUDED.elevation, + gain = EXCLUDED.gain, + hash = EXCLUDED.hash, + is_active = EXCLUDED.is_active, + is_full_hotspot = EXCLUDED.is_full_hotspot, + last_changed_at = CASE + WHEN gateways.location IS DISTINCT FROM EXCLUDED.location + OR gateways.hash IS DISTINCT FROM EXCLUDED.hash + THEN EXCLUDED.refreshed_at + ELSE gateways.last_changed_at + END, + location = EXCLUDED.location, + location_asserts = EXCLUDED.location_asserts, + location_changed_at = CASE + WHEN gateways.location IS DISTINCT FROM EXCLUDED.location + THEN EXCLUDED.refreshed_at + ELSE gateways.location_changed_at + END, + refreshed_at = EXCLUDED.refreshed_at, + updated_at = EXCLUDED.updated_at", + ); + + let res = qb.build().execute(pool).await?; + Ok(res.rows_affected()) + } + + pub async fn get_by_address<'a>( + db: impl PgExecutor<'a>, + address: &PublicKeyBinary, + ) -> anyhow::Result> { + let gateway = sqlx::query_as::<_, Self>( + r#" + SELECT + address, + created_at, + elevation, + gain, + hash, + is_active, + is_full_hotspot, + last_changed_at, + location, + location_asserts, + location_changed_at, + refreshed_at, + updated_at + FROM gateways + WHERE address = $1 + "#, + ) + .bind(address.as_ref()) + .fetch_optional(db) + .await?; + + Ok(gateway) + } + + pub fn stream<'a>( + db: impl PgExecutor<'a> + 'a, + min_last_changed_at: DateTime, + min_location_changed_at: Option>, + ) -> impl Stream + 'a { + sqlx::query_as::<_, Self>( + r#" + SELECT + address, + created_at, + elevation, + gain, + hash, + is_active, + is_full_hotspot, + last_changed_at, + location, + location_asserts, + location_changed_at, + refreshed_at, + updated_at + FROM gateways + WHERE last_changed_at >= $1 + AND ( + $2::timestamptz IS NULL + OR (location IS NOT NULL AND location_changed_at >= $2) + ) + "#, + ) + .bind(min_last_changed_at) + .bind(min_location_changed_at) + .fetch(db) + .map_err(anyhow::Error::from) + .filter_map(|res| async move { res.ok() }) + } +} + +impl FromRow<'_, PgRow> for Gateway { + fn from_row(row: &PgRow) -> sqlx::Result { + // helpers to map Option -> Option + let to_u64 = |v: Option| -> Option { v.map(|x| x as u64) }; + let to_u32 = |v: Option| -> Option { v.map(|x| x as u32) }; + + Ok(Self { + address: PublicKeyBinary::from(row.try_get::, _>("address")?), + created_at: row.try_get("created_at")?, + elevation: to_u32(row.try_get("elevation")?), + gain: to_u32(row.try_get("gain")?), + hash: row.try_get("hash")?, + is_active: row.try_get("is_active")?, + is_full_hotspot: row.try_get("is_full_hotspot")?, + last_changed_at: row.try_get("last_changed_at")?, + location: to_u64(row.try_get("location")?), + location_asserts: to_u32(row.try_get("location_asserts")?), + location_changed_at: row.try_get("location_changed_at")?, + refreshed_at: row.try_get("refreshed_at")?, + updated_at: row.try_get("updated_at")?, + }) + } +} diff --git a/iot_config/src/gateway/metadata_db.rs b/iot_config/src/gateway/metadata_db.rs new file mode 100644 index 000000000..7018ec072 --- /dev/null +++ b/iot_config/src/gateway/metadata_db.rs @@ -0,0 +1,114 @@ +use chrono::{DateTime, Utc}; +use futures::Stream; +use helium_crypto::PublicKeyBinary; +use sqlx::{Pool, Postgres, Row}; +use std::{ + hash::{DefaultHasher, Hasher}, + str::FromStr, +}; + +use crate::gateway::db::Gateway; + +#[derive(Debug, Clone)] +pub struct IOTHotspotInfo { + entity_key: PublicKeyBinary, + location: Option, + elevation: Option, + gain: Option, + is_full_hotspot: Option, + num_location_asserts: Option, + is_active: Option, + dc_onboarding_fee_paid: Option, + refreshed_at: Option>, + created_at: DateTime, +} + +impl IOTHotspotInfo { + fn compute_hash(&self) -> String { + let mut hasher = DefaultHasher::new(); + + hasher.write_i64(self.location.unwrap_or(0_i64)); + + hasher.write_i32(self.elevation.unwrap_or(0_i32)); + + hasher.write_i32(self.gain.unwrap_or(0_i32)); + + hasher.write_u8(self.is_full_hotspot.unwrap_or(false) as u8); + + hasher.write_i32(self.num_location_asserts.unwrap_or(0_i32)); + + hasher.write_u8(self.is_active.unwrap_or(false) as u8); + + hasher.write_i64(self.dc_onboarding_fee_paid.unwrap_or(0_i64)); + + hasher.finish().to_string() + } + + pub fn stream(pool: &Pool) -> impl Stream> + '_ { + sqlx::query_as::<_, Self>( + r#" + SELECT DISTINCT ON (kta.entity_key) + kta.entity_key, + infos.location::bigint, + infos.elevation::integer, + infos.gain::integer, + infos.is_full_hotspot, + infos.num_location_asserts, + infos.is_active, + infos.dc_onboarding_fee_paid::bigint, + infos.refreshed_at, + infos.created_at + FROM iot_hotspot_infos AS infos + JOIN key_to_assets AS kta ON infos.asset = kta.asset + ORDER BY kta.entity_key, infos.refreshed_at DESC + "#, + ) + .fetch(pool) + } + + pub fn to_gateway(&self) -> anyhow::Result> { + let location = self.location.map(|loc| loc as u64); + + Ok(Some(Gateway { + address: self.entity_key.clone(), + created_at: self.created_at, + elevation: self.elevation.map(|e| e as u32), + gain: self.gain.map(|e| e as u32), + hash: self.compute_hash(), + is_active: self.is_active, + is_full_hotspot: self.is_full_hotspot, + // Updated via SQL query see Gateway::insert + last_changed_at: Utc::now(), + location, + location_asserts: self.num_location_asserts.map(|n| n as u32), + // Set to refreshed_at when hotspot has a location, None otherwise + location_changed_at: if location.is_some() { + Some(self.refreshed_at.unwrap_or_else(Utc::now)) + } else { + None + }, + refreshed_at: self.refreshed_at, + updated_at: Utc::now(), + })) + } +} + +impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for IOTHotspotInfo { + fn from_row(row: &sqlx::postgres::PgRow) -> sqlx::Result { + Ok(Self { + entity_key: PublicKeyBinary::from_str( + &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), + ) + .map_err(|err| sqlx::Error::Decode(Box::new(err)))?, + location: row.get::, &str>("location"), + elevation: row.get::, &str>("elevation"), + gain: row.get::, &str>("gain"), + is_full_hotspot: row.get::, &str>("is_full_hotspot"), + num_location_asserts: row.get::, &str>("num_location_asserts"), + is_active: row.get::, &str>("is_active"), + dc_onboarding_fee_paid: row.get::, &str>("dc_onboarding_fee_paid"), + refreshed_at: row.get::>, &str>("refreshed_at"), + created_at: row.get::, &str>("created_at"), + }) + } +} diff --git a/iot_config/src/gateway/mod.rs b/iot_config/src/gateway/mod.rs new file mode 100644 index 000000000..1dfa8716b --- /dev/null +++ b/iot_config/src/gateway/mod.rs @@ -0,0 +1,4 @@ +pub mod db; +pub mod metadata_db; +pub mod service; +pub mod tracker; diff --git a/iot_config/src/gateway_info.rs b/iot_config/src/gateway/service/info.rs similarity index 55% rename from iot_config/src/gateway_info.rs rename to iot_config/src/gateway/service/info.rs index 2081c67cc..9eb23b387 100644 --- a/iot_config/src/gateway_info.rs +++ b/iot_config/src/gateway/service/info.rs @@ -1,6 +1,7 @@ -use crate::region_map; +use crate::{gateway::db::Gateway, region_map}; use anyhow::anyhow; -use futures::stream::BoxStream; +use chrono::{DateTime, Utc}; +use futures::{stream::BoxStream, Stream, StreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::{ services::iot_config::{ @@ -8,9 +9,32 @@ use helium_proto::{ }, Region, }; +use sqlx::PgExecutor; pub type GatewayInfoStream = BoxStream<'static, GatewayInfo>; +// Hotspot gain default; dbi * 10 +const DEFAULT_GAIN: u32 = 12; +// Hotspot elevation default; meters above sea level +const DEFAULT_ELEVATION: u32 = 0; + +pub async fn get( + db: impl PgExecutor<'_>, + address: &PublicKeyBinary, +) -> anyhow::Result> { + let gateway = Gateway::get_by_address(db, address).await?; + Ok(gateway.map(IotMetadata::from)) +} + +pub fn stream<'a>( + db: impl PgExecutor<'a> + 'a, + min_last_changed_at: DateTime, + min_location_changed_at: Option>, +) -> impl Stream + 'a { + let stream = Gateway::stream(db, min_last_changed_at, min_location_changed_at); + stream.map(IotMetadata::from).boxed() +} + #[derive(Clone, Debug)] pub struct GatewayMetadata { pub location: u64, @@ -26,9 +50,17 @@ pub struct GatewayInfo { pub is_full_hotspot: bool, } +pub struct IotMetadata { + pub address: PublicKeyBinary, + pub location: Option, + pub elevation: i32, + pub gain: i32, + pub is_full_hotspot: bool, +} + impl GatewayInfo { pub fn chain_metadata_to_info( - meta: db::IotMetadata, + meta: IotMetadata, region_map: ®ion_map::RegionMapReader, ) -> Self { let metadata = if let Some(location) = meta.location { @@ -112,69 +144,14 @@ impl TryFrom for GatewayInfoProto { } } -pub(crate) mod db { - use futures::stream::{Stream, StreamExt}; - use helium_crypto::PublicKeyBinary; - use sqlx::{PgExecutor, Row}; - use std::str::FromStr; - - // Hotspot gain default; dbi * 10 - const DEFAULT_GAIN: i32 = 12; - // Hotspot elevation default; meters above sea level - const DEFAULT_ELEVATION: i32 = 0; - - pub struct IotMetadata { - pub address: PublicKeyBinary, - pub location: Option, - pub elevation: i32, - pub gain: i32, - pub is_full_hotspot: bool, - } - - const GET_METADATA_SQL: &str = r#" - select kta.entity_key, infos.location::bigint, CAST(infos.elevation AS integer), CAST(infos.gain as integer), infos.is_full_hotspot - from iot_hotspot_infos infos - join key_to_assets kta on infos.asset = kta.asset - "#; - - pub async fn get_info( - db: impl PgExecutor<'_>, - address: &PublicKeyBinary, - ) -> anyhow::Result> { - let entity_key = bs58::decode(address.to_string()).into_vec()?; - let mut query: sqlx::QueryBuilder = - sqlx::QueryBuilder::new(GET_METADATA_SQL); - query.push(" where kta.entity_key = $1 "); - Ok(query - .build_query_as::() - .bind(entity_key) - .fetch_optional(db) - .await?) - } - - pub fn all_info_stream<'a>( - db: impl PgExecutor<'a> + 'a, - ) -> impl Stream + 'a { - sqlx::query_as::<_, IotMetadata>(GET_METADATA_SQL) - .fetch(db) - .filter_map(|metadata| async move { metadata.ok() }) - .boxed() - } - - impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for IotMetadata { - fn from_row(row: &sqlx::postgres::PgRow) -> sqlx::Result { - Ok(Self { - address: PublicKeyBinary::from_str( - &bs58::encode(row.get::<&[u8], &str>("entity_key")).into_string(), - ) - .map_err(|err| sqlx::Error::Decode(Box::new(err)))?, - location: row.get::, &str>("location").map(|v| v as u64), - elevation: row - .get::, &str>("elevation") - .unwrap_or(DEFAULT_ELEVATION), - gain: row.get::, &str>("gain").unwrap_or(DEFAULT_GAIN), - is_full_hotspot: row.get("is_full_hotspot"), - }) +impl From for IotMetadata { + fn from(gateway: Gateway) -> Self { + Self { + address: gateway.address, + location: gateway.location, + elevation: gateway.elevation.unwrap_or(DEFAULT_ELEVATION) as i32, + gain: gateway.gain.unwrap_or(DEFAULT_GAIN) as i32, + is_full_hotspot: gateway.is_full_hotspot.unwrap_or(false), } } } diff --git a/iot_config/src/gateway_service.rs b/iot_config/src/gateway/service/mod.rs similarity index 80% rename from iot_config/src/gateway_service.rs rename to iot_config/src/gateway/service/mod.rs index 5e6851481..951652f69 100644 --- a/iot_config/src/gateway_service.rs +++ b/iot_config/src/gateway/service/mod.rs @@ -1,21 +1,18 @@ use crate::{ - admin::AuthCache, - gateway_info::{self, GatewayInfo}, - org, - region_map::RegionMapReader, - telemetry, verify_public_key, GrpcResult, GrpcStreamResult, Settings, + admin::AuthCache, gateway::service::info::GatewayInfo, org, region_map::RegionMapReader, + telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; use anyhow::Result; -use chrono::Utc; +use chrono::{DateTime, TimeZone, Utc}; use file_store::traits::TimestampEncode; use file_store_oracles::traits::MsgVerify; use futures::stream::StreamExt; use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; use helium_proto::{ services::iot_config::{ - self, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, GatewayInfoStreamResV1, - GatewayLocationReqV1, GatewayLocationResV1, GatewayRegionParamsReqV1, - GatewayRegionParamsResV1, + self, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, + GatewayInfoStreamResV1, GatewayLocationReqV1, GatewayLocationResV1, + GatewayRegionParamsReqV1, GatewayRegionParamsResV1, }, Message, Region, }; @@ -26,13 +23,15 @@ use std::{sync::Arc, time::Duration}; use tokio::sync::watch; use tonic::{Request, Response, Status}; +pub mod info; + const CACHE_EVICTION_FREQUENCY: Duration = Duration::from_secs(60 * 60); const CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 3); pub struct GatewayService { auth_cache: AuthCache, gateway_cache: Arc>, - metadata_pool: Pool, + pool: Pool, region_map: RegionMapReader, signing_key: Arc, delegate_cache: watch::Receiver, @@ -40,8 +39,8 @@ pub struct GatewayService { impl GatewayService { pub fn new( - settings: &Settings, - metadata_pool: Pool, + signing_key: Arc, + pool: Pool, region_map: RegionMapReader, auth_cache: AuthCache, delegate_cache: watch::Receiver, @@ -53,9 +52,9 @@ impl GatewayService { Ok(Self { auth_cache, gateway_cache, - metadata_pool, + pool, region_map, - signing_key: Arc::new(settings.signing_keypair()?), + signing_key, delegate_cache, }) } @@ -104,7 +103,7 @@ impl GatewayService { Some(gateway) => Ok(gateway.value().clone()), None => { let metadata = tokio::select! { - query_result = gateway_info::db::get_info(&self.metadata_pool, pubkey) => { + query_result = info::get(&self.pool, pubkey) => { query_result.map_err(|_| Status::internal("error fetching gateway info"))? .ok_or_else(|| { telemetry::count_gateway_info_lookup("not-found"); @@ -286,7 +285,7 @@ impl iot_config::Gateway for GatewayService { tracing::debug!("fetching all gateways' info"); - let pool = self.metadata_pool.clone(); + let pool = self.pool.clone(); let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let region_map = self.region_map.clone(); @@ -294,6 +293,7 @@ impl iot_config::Gateway for GatewayService { let (tx, rx) = tokio::sync::mpsc::channel(20); tokio::spawn(async move { + let epoch: DateTime = "1970-01-01T00:00:00Z".parse().unwrap(); tokio::select! { _ = stream_all_gateways_info( &pool, @@ -301,6 +301,63 @@ impl iot_config::Gateway for GatewayService { &signing_key, region_map.clone(), batch_size, + epoch, + None, + ) => (), + } + }); + + Ok(Response::new(GrpcStreamResult::new(rx))) + } + + type info_stream_v2Stream = GrpcStreamResult; + async fn info_stream_v2( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-stream"); + + let signer = verify_public_key(&request.signer)?; + self.verify_request_signature(&signer, &request)?; + + tracing::debug!("fetching all gateways' info"); + + let pool = self.pool.clone(); + let signing_key = self.signing_key.clone(); + let batch_size = request.batch_size; + let min_last_changed_at = Utc + .timestamp_opt(request.min_updated_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_refreshed_at argument", + ))?; + + let min_location_changed_at = if request.min_location_changed_at == 0 { + None + } else { + Some( + Utc.timestamp_opt(request.min_location_changed_at as i64, 0) + .single() + .ok_or(Status::invalid_argument( + "Invalid min_location_changed_at argument", + ))?, + ) + }; + let region_map = self.region_map.clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(20); + + tokio::spawn(async move { + tokio::select! { + _ = stream_all_gateways_info( + &pool, + tx.clone(), + &signing_key, + region_map.clone(), + batch_size, + min_last_changed_at, + min_location_changed_at, ) => (), } }); @@ -315,10 +372,14 @@ async fn stream_all_gateways_info( signing_key: &Keypair, region_map: RegionMapReader, batch_size: u32, + min_last_changed_at: DateTime, + min_location_changed_at: Option>, ) -> anyhow::Result<()> { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); - let mut stream = gateway_info::db::all_info_stream(pool).chunks(batch_size as usize); + + let mut stream = info::stream(pool, min_last_changed_at, min_location_changed_at) + .chunks(batch_size as usize); while let Some(infos) = stream.next().await { let gateway_infos = infos .into_iter() diff --git a/iot_config/src/gateway/tracker.rs b/iot_config/src/gateway/tracker.rs new file mode 100644 index 000000000..cd6a75295 --- /dev/null +++ b/iot_config/src/gateway/tracker.rs @@ -0,0 +1,79 @@ +use crate::gateway::{db::Gateway, metadata_db::IOTHotspotInfo}; +use futures::stream::TryChunksError; +use futures_util::TryStreamExt; +use sqlx::{Pool, Postgres}; +use std::time::{Duration, Instant}; +use task_manager::ManagedTask; + +const EXECUTE_DURATION_METRIC: &str = + concat!(env!("CARGO_PKG_NAME"), "-", "tracker-execute-duration"); + +pub struct Tracker { + pool: Pool, + metadata: Pool, + interval: Duration, +} + +impl ManagedTask for Tracker { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> task_manager::TaskLocalBoxFuture { + task_manager::spawn(self.run(shutdown)) + } +} + +impl Tracker { + pub fn new(pool: Pool, metadata: Pool, interval: Duration) -> Self { + Self { + pool, + metadata, + interval, + } + } + + async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting with interval: {:?}", self.interval); + let mut interval = tokio::time::interval(self.interval); + + loop { + tokio::select! { + biased; + _ = &mut shutdown => break, + _ = interval.tick() => { + if let Err(err) = execute(&self.pool, &self.metadata).await { + tracing::error!(?err, "error in tracking changes to mobile radios"); + } + } + } + } + + tracing::info!("stopping"); + + Ok(()) + } +} + +pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow::Result<()> { + tracing::info!("starting execute"); + let start = Instant::now(); + + const BATCH_SIZE: usize = 1_000; + + let total: u64 = IOTHotspotInfo::stream(metadata) + .map_err(anyhow::Error::from) + .try_filter_map(|mhi| async move { mhi.to_gateway() }) + .try_chunks(BATCH_SIZE) + .map_err(|TryChunksError(_gateways, err)| err) + .try_fold(0, |total, batch| async move { + let affected = Gateway::insert_bulk(pool, &batch).await?; + Ok(total + affected) + }) + .await?; + + let elapsed = start.elapsed(); + tracing::info!(?elapsed, affected = total, "done execute"); + metrics::histogram!(EXECUTE_DURATION_METRIC).record(elapsed); + + Ok(()) +} diff --git a/iot_config/src/grpc_server.rs b/iot_config/src/grpc_server.rs new file mode 100644 index 000000000..9374f411e --- /dev/null +++ b/iot_config/src/grpc_server.rs @@ -0,0 +1,90 @@ +use crate::sub_dao_service::SubDaoService; +use crate::{ + admin_service::AdminService, gateway::service::GatewayService, org_service::OrgService, + route_service::RouteService, +}; +use anyhow::Error; +use futures_util::TryFutureExt; +use helium_proto::services::{ + iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer}, + sub_dao::SubDaoServer, +}; +use std::{net::SocketAddr, time::Duration}; +use task_manager::ManagedTask; +use tonic::transport; + +pub struct GrpcServer { + listen_addr: SocketAddr, + gateway_svc: GatewayService, + route_svc: RouteService, + org_svc: OrgService, + admin_svc: AdminService, + subdao_svc: SubDaoService, +} + +impl GrpcServer { + pub fn new( + listen_addr: SocketAddr, + gateway_svc: GatewayService, + route_svc: RouteService, + org_svc: OrgService, + admin_svc: AdminService, + subdao_svc: SubDaoService, + ) -> Self { + Self { + listen_addr, + gateway_svc, + route_svc, + org_svc, + admin_svc, + subdao_svc, + } + } +} + +impl ManagedTask for GrpcServer { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> task_manager::TaskLocalBoxFuture { + task_manager::spawn(async move { + let grpc_server = transport::Server::builder() + .http2_keepalive_interval(Some(Duration::from_secs(250))) + .http2_keepalive_timeout(Some(Duration::from_secs(60))) + .layer(custom_tracing::grpc_layer::new_with_span(make_span)) + .add_service(GatewayServer::new(self.gateway_svc)) + .add_service(OrgServer::new(self.org_svc)) + .add_service(RouteServer::new(self.route_svc)) + .add_service(AdminServer::new(self.admin_svc)) + .add_service(SubDaoServer::new(self.subdao_svc)) + .serve(self.listen_addr) + .map_err(Error::from); + + tokio::select! { + _ = shutdown => { + tracing::warn!("grpc server shutting down"); + Ok(()) + } + res = grpc_server => { + match res { + Ok(()) => Ok(()), + Err(err) => { + tracing::error!(?err, "grpc server failed with error"); + Err(anyhow::anyhow!("grpc server exiting with error")) + } + } + } + } + }) + } +} + +fn make_span(_request: &http::request::Request) -> tracing::Span { + tracing::info_span!( + custom_tracing::DEFAULT_SPAN, + pub_key = tracing::field::Empty, + signer = tracing::field::Empty, + oui = tracing::field::Empty, + route_id = tracing::field::Empty, + ) +} diff --git a/iot_config/src/lib.rs b/iot_config/src/lib.rs index 3a493c3fd..4e9e78b99 100644 --- a/iot_config/src/lib.rs +++ b/iot_config/src/lib.rs @@ -2,10 +2,11 @@ extern crate tls_init; pub mod admin; pub mod admin_service; +pub mod cli; pub mod client; pub mod db_cleaner; -pub mod gateway_info; -pub mod gateway_service; +pub mod gateway; +pub mod grpc_server; mod helium_netids; pub mod lora_field; pub mod org; @@ -22,7 +23,7 @@ pub mod sub_dao_service; pub use admin_service::AdminService; use chrono::{DateTime, Duration, Utc}; pub use client::{Client, Settings as ClientSettings}; -pub use gateway_service::GatewayService; +pub use gateway::service::GatewayService; pub use org_service::OrgService; pub use route_service::RouteService; pub use settings::Settings; diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index 8fbe83ea4..5a729509d 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -1,193 +1,8 @@ -use anyhow::{Error, Result}; use clap::Parser; -use futures_util::TryFutureExt; -use helium_proto::services::{ - iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer}, - sub_dao::SubDaoServer, -}; -use iot_config::sub_dao_service::SubDaoService; -use iot_config::{ - admin::AuthCache, admin_service::AdminService, db_cleaner::DbCleaner, - gateway_service::GatewayService, org, org_service::OrgService, region_map::RegionMapReader, - route_service::RouteService, settings::Settings, telemetry, -}; -use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; -use task_manager::{ManagedTask, TaskManager}; -use tonic::transport; - -#[derive(Debug, clap::Parser)] -#[clap(version = env!("CARGO_PKG_VERSION"))] -#[clap(about = "Helium IoT Config Service")] -pub struct Cli { - /// Optional configuration file to use. If present, the toml file at the - /// given path will be loaded. Environment variables can override the - /// settings in the given file. - #[clap(short = 'c')] - config: Option, - - #[clap(subcommand)] - cmd: Cmd, -} - -impl Cli { - pub async fn run(self) -> Result<()> { - let settings = Settings::new(self.config)?; - self.cmd.run(settings).await - } -} - -#[derive(Debug, clap::Subcommand)] -pub enum Cmd { - Server(Daemon), -} - -impl Cmd { - pub async fn run(&self, settings: Settings) -> Result<()> { - match self { - Self::Server(cmd) => cmd.run(&settings).await, - } - } -} - -#[derive(Debug, clap::Args)] -pub struct Daemon; - -impl Daemon { - pub async fn run(&self, settings: &Settings) -> Result<()> { - custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?; - - // Install prometheus metrics exporter - poc_metrics::start_metrics(&settings.metrics)?; - telemetry::initialize(); - - // Create database pool - let pool = settings.database.connect("iot-config-store").await?; - sqlx::migrate!().run(&pool).await?; - - // Create on-chain metadata pool - let metadata_pool = settings.metadata.connect("iot-config-metadata").await?; - - let (auth_updater, auth_cache) = AuthCache::new(settings.admin_pubkey()?, &pool).await?; - let (region_updater, region_map) = RegionMapReader::new(&pool).await?; - let (delegate_key_updater, delegate_key_cache) = org::delegate_keys_cache(&pool).await?; - - let signing_keypair = Arc::new(settings.signing_keypair()?); - - let gateway_svc = GatewayService::new( - settings, - metadata_pool.clone(), - region_map.clone(), - auth_cache.clone(), - delegate_key_cache, - )?; - - let route_svc = - RouteService::new(signing_keypair.clone(), auth_cache.clone(), pool.clone()); - - let org_svc = OrgService::new( - signing_keypair.clone(), - auth_cache.clone(), - pool.clone(), - route_svc.clone_update_channel(), - delegate_key_updater, - )?; - - let admin_svc = AdminService::new( - settings, - auth_cache.clone(), - auth_updater, - pool.clone(), - region_map.clone(), - region_updater, - )?; - - let subdao_svc = SubDaoService::new(settings, auth_cache, metadata_pool)?; - - let listen_addr = settings.listen; - let pubkey = settings - .signing_keypair() - .map(|keypair| keypair.public_key().to_string())?; - tracing::debug!("listening on {listen_addr}"); - tracing::debug!("signing as {pubkey}"); - - let grpc_server = GrpcServer { - listen_addr, - gateway_svc, - route_svc, - org_svc, - admin_svc, - subdao_svc, - }; - - let db_cleaner = DbCleaner::new(pool.clone(), settings.deleted_entry_retention); - - TaskManager::builder() - .add_task(grpc_server) - .add_task(db_cleaner) - .build() - .start() - .await - } -} - -pub struct GrpcServer { - listen_addr: SocketAddr, - gateway_svc: GatewayService, - route_svc: RouteService, - org_svc: OrgService, - admin_svc: AdminService, - subdao_svc: SubDaoService, -} - -impl ManagedTask for GrpcServer { - fn start_task( - self: Box, - shutdown: triggered::Listener, - ) -> task_manager::TaskLocalBoxFuture { - task_manager::spawn(async move { - let grpc_server = transport::Server::builder() - .http2_keepalive_interval(Some(Duration::from_secs(250))) - .http2_keepalive_timeout(Some(Duration::from_secs(60))) - .layer(custom_tracing::grpc_layer::new_with_span(make_span)) - .add_service(GatewayServer::new(self.gateway_svc)) - .add_service(OrgServer::new(self.org_svc)) - .add_service(RouteServer::new(self.route_svc)) - .add_service(AdminServer::new(self.admin_svc)) - .add_service(SubDaoServer::new(self.subdao_svc)) - .serve(self.listen_addr) - .map_err(Error::from); - - tokio::select! { - _ = shutdown => { - tracing::warn!("grpc server shutting down"); - Ok(()) - } - res = grpc_server => { - match res { - Ok(()) => Ok(()), - Err(err) => { - tracing::error!(?err, "grpc server failed with error"); - Err(anyhow::anyhow!("grpc server exiting with error")) - } - } - } - } - }) - } -} - -fn make_span(_request: &http::request::Request) -> tracing::Span { - tracing::info_span!( - custom_tracing::DEFAULT_SPAN, - pub_key = tracing::field::Empty, - signer = tracing::field::Empty, - oui = tracing::field::Empty, - route_id = tracing::field::Empty, - ) -} +use iot_config::cli::Cli; #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); cli.run().await } diff --git a/iot_config/src/settings.rs b/iot_config/src/settings.rs index 95938cafa..d494661d1 100644 --- a/iot_config/src/settings.rs +++ b/iot_config/src/settings.rs @@ -21,6 +21,8 @@ pub struct Settings { #[serde(with = "humantime_serde", default = "default_deleted_entry_retention")] pub deleted_entry_retention: Duration, pub database: db_store::Settings, + #[serde(with = "humantime_serde", default = "default_gateway_tracker_interval")] + pub gateway_tracker_interval: std::time::Duration, /// Settings passed to the db_store crate for connecting to /// the database for Solana on-chain data pub metadata: db_store::Settings, @@ -39,6 +41,10 @@ fn default_deleted_entry_retention() -> Duration { humantime::parse_duration("48 hours").unwrap() } +fn default_gateway_tracker_interval() -> std::time::Duration { + humantime::parse_duration("1 hour").unwrap() +} + impl Settings { /// Settings can be loaded from a given optional path and /// can be overridden with environment variables. diff --git a/iot_config/tests/integrations/common/gateway_metadata_db.rs b/iot_config/tests/integrations/common/gateway_metadata_db.rs new file mode 100644 index 000000000..1ef397688 --- /dev/null +++ b/iot_config/tests/integrations/common/gateway_metadata_db.rs @@ -0,0 +1,164 @@ +use bs58; +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use sqlx::PgPool; + +#[allow(clippy::too_many_arguments)] +pub async fn insert_gateway( + pool: &PgPool, + address: &str, + asset: &str, + location: Option, + elevation: Option, + gain: Option, + is_full_hotspot: Option, + num_location_asserts: Option, + is_active: Option, + dc_onboarding_fee_paid: Option, + created_at: DateTime, + refreshed_at: Option>, + last_block: Option, + key: PublicKeyBinary, +) -> anyhow::Result<()> { + insert_iot_hotspot_infos( + pool, + address, + asset, + location, + elevation, + gain, + is_full_hotspot, + num_location_asserts, + is_active, + dc_onboarding_fee_paid, + created_at, + refreshed_at, + last_block, + ) + .await?; + + insert_asset_key(pool, asset, key).await?; + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn insert_iot_hotspot_infos( + pool: &PgPool, + address: &str, + asset: &str, + location: Option, + elevation: Option, + gain: Option, + is_full_hotspot: Option, + num_location_asserts: Option, + is_active: Option, + dc_onboarding_fee_paid: Option, + created_at: DateTime, + refreshed_at: Option>, + last_block: Option, +) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO iot_hotspot_infos ( + address, + asset, + location, + elevation, + gain, + is_full_hotspot, + num_location_asserts, + is_active, + dc_onboarding_fee_paid, + created_at, + refreshed_at, + last_block + ) + VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, COALESCE($12, 0) + ) + ON CONFLICT (address) + DO UPDATE SET + asset = EXCLUDED.asset, + location = EXCLUDED.location, + elevation = EXCLUDED.elevation, + gain = EXCLUDED.gain, + is_full_hotspot = EXCLUDED.is_full_hotspot, + num_location_asserts = EXCLUDED.num_location_asserts, + is_active = EXCLUDED.is_active, + dc_onboarding_fee_paid = EXCLUDED.dc_onboarding_fee_paid, + refreshed_at = EXCLUDED.refreshed_at, + last_block = EXCLUDED.last_block; + "#, + ) + .bind(address) + .bind(asset) + .bind(location) + .bind(elevation) + .bind(gain) + .bind(is_full_hotspot) + .bind(num_location_asserts) + .bind(is_active) + .bind(dc_onboarding_fee_paid) + .bind(created_at) + .bind(refreshed_at) + .bind(last_block) + .execute(pool) + .await?; + + Ok(()) +} + +async fn insert_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) -> anyhow::Result<()> { + let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); + sqlx::query( + r#" + INSERT INTO + "key_to_assets" ("asset", "entity_key") + VALUES ($1, $2); + "#, + ) + .bind(asset) + .bind(b58) + .execute(pool) + .await?; + + Ok(()) +} + +pub async fn create_tables(pool: &PgPool) -> anyhow::Result<()> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS iot_hotspot_infos ( + address VARCHAR(255) NOT NULL, + asset VARCHAR(255), + bump_seed INTEGER, + location NUMERIC, + elevation NUMERIC, + gain NUMERIC, + is_full_hotspot BOOLEAN, + num_location_asserts INTEGER, + is_active BOOLEAN, + dc_onboarding_fee_paid NUMERIC, + refreshed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL, + last_block NUMERIC NOT NULL DEFAULT 0, + PRIMARY KEY (address) + );"#, + ) + .execute(pool) + .await + .unwrap(); + + sqlx::query( + r#" + CREATE TABLE key_to_assets ( + asset character varying(255) NULL, + entity_key bytea NULL + );"#, + ) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/iot_config/tests/integrations/common/mod.rs b/iot_config/tests/integrations/common/mod.rs new file mode 100644 index 000000000..6c93ebf61 --- /dev/null +++ b/iot_config/tests/integrations/common/mod.rs @@ -0,0 +1,57 @@ +use chrono::{DateTime, Duration, DurationRound, Utc}; +use helium_crypto::PublicKey; +use helium_crypto::{KeyTag, Keypair}; +use helium_proto::services::iot_config::{self as proto}; +use iot_config::admin::AuthCache; +use iot_config::region_map::RegionMapReader; +use iot_config::{org, GatewayService}; +use sqlx::PgPool; +use std::sync::Arc; +use tokio::net::TcpListener; +use tonic::transport; + +pub mod gateway_metadata_db; + +pub fn make_keypair() -> Keypair { + Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) +} + +pub async fn spawn_gateway_service( + pool: PgPool, + admin_pub_key: PublicKey, +) -> anyhow::Result<( + String, + tokio::task::JoinHandle>, +)> { + let server_key = make_keypair(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let (_auth_updater, auth_cache) = AuthCache::new(admin_pub_key.clone(), &pool).await?; + let (_region_updater, region_map) = RegionMapReader::new(&pool).await?; + let (_delegate_key_updater, delegate_key_cache) = org::delegate_keys_cache(&pool).await?; + + // Start the gateway server + let gws = GatewayService::new( + Arc::new(server_key), + pool.clone(), + region_map.clone(), + auth_cache.clone(), + delegate_key_cache, + )?; + + let handle = tokio::spawn( + transport::Server::builder() + .add_service(proto::GatewayServer::new(gws)) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)), + ); + + Ok((format!("http://{addr}"), handle)) +} + +// When retrieving a timestamp from DB, depending on the version of postgres +// the timestamp may be truncated. When comparing datetimes, to ones generated +// in a test with `Utc::now()`, you should truncate it. +pub fn nanos_trunc(ts: DateTime) -> DateTime { + ts.duration_trunc(Duration::nanoseconds(1000)).unwrap() +} diff --git a/iot_config/tests/integrations/fixtures/key_to_assets_es.sql b/iot_config/tests/integrations/fixtures/key_to_assets_es.sql new file mode 100644 index 000000000..031f32afa --- /dev/null +++ b/iot_config/tests/integrations/fixtures/key_to_assets_es.sql @@ -0,0 +1,16 @@ +CREATE TABLE public.key_to_assets ( + address character varying(255) NOT NULL, + dao character varying(255), + asset character varying(255), + entity_key bytea, + bump_seed integer, + refreshed_at timestamp with time zone, + created_at timestamp with time zone NOT NULL, + key_serialization jsonb +); + +INSERT INTO "key_to_assets" ("address", "dao", "asset", "entity_key", "bump_seed", "refreshed_at", "created_at", "key_serialization") VALUES +('4TCw73EqhtXbDp19D4WY72vWZo9QWY6gcphRFswzKBNk', 'BQ3MCuTT5zVBhNfQ4SjMh3NPVhFy73MPV8rjfq5d1zie', 'HJtATvtga22LQPViQGoSdwqoHMS8uxirNNsRyGpQK1Nc', 'Helium Mobile Mapping Rewards', 254, '2025-06-12 02:24:01.305+00', '2025-06-12 02:24:01.305+00', '"utf8"'); + +INSERT INTO "key_to_assets" ("address", "dao", "asset", "entity_key", "bump_seed", "refreshed_at", "created_at", "key_serialization") VALUES +('4RsbdRtGNiMEUPPJrmkVpSUswYXLKtZLUKaiyaGFxyd5', 'BQ3MCuTT5zVBhNfQ4SjMh3NPVhFy73MPV8rjfq5d1zie', '4xpWF7KjbcShMt3LnEJmPkbDVyEZ19zrwtJwS1e1e827', decode('0000d5a568ab7b418ba68eabe61b5b042f84afcfa86f1cdfcdba644401b6bcb65c84af622936', 'hex'), 253, '2025-04-30 16:49:26.499+00', '2023-05-04 14:36:16.429+00', '"b58"'); diff --git a/iot_config/tests/integrations/gateway_db.rs b/iot_config/tests/integrations/gateway_db.rs new file mode 100644 index 000000000..ddeb24030 --- /dev/null +++ b/iot_config/tests/integrations/gateway_db.rs @@ -0,0 +1,66 @@ +use chrono::Utc; +use helium_crypto::PublicKeyBinary; +use iot_config::gateway::db::Gateway; +use sqlx::PgPool; + +use crate::common; + +#[sqlx::test] +async fn gateway_bulk_insert_and_get(pool: PgPool) -> anyhow::Result<()> { + let now = Utc::now(); + + let a1 = pk_binary(); + let a2 = pk_binary(); + let a3 = pk_binary(); + + let g1 = gw(a1.clone(), now); + let g2 = gw(a2.clone(), now); + let g3 = gw(a3.clone(), now); + + let affected = Gateway::insert_bulk(&pool, &[g1, g2, g3]).await?; + assert_eq!(affected, 3, "should insert 3 rows"); + + for addr in [&a1, &a2, &a3] { + let got = Gateway::get_by_address(&pool, addr) + .await? + .expect("row should exist"); + assert_eq!(got.created_at, common::nanos_trunc(now)); + assert_eq!(got.elevation, Some(1)); + assert_eq!(got.gain, Some(2)); + assert_eq!(got.hash, "h0"); + assert_eq!(got.is_active, Some(true)); + assert_eq!(got.is_full_hotspot, Some(true)); + assert_eq!(got.last_changed_at, common::nanos_trunc(now)); + assert_eq!(got.location, Some(3)); + assert_eq!(got.location_asserts, Some(4)); + assert!(got.location_changed_at.is_some()); + assert_eq!(got.location_changed_at.unwrap(), common::nanos_trunc(now)); + assert!(got.refreshed_at.is_some()); + assert_eq!(got.refreshed_at.unwrap(), common::nanos_trunc(now)); + assert_eq!(got.updated_at, common::nanos_trunc(now)); + } + + Ok(()) +} + +fn pk_binary() -> PublicKeyBinary { + common::make_keypair().public_key().clone().into() +} + +fn gw(address: PublicKeyBinary, ts: chrono::DateTime) -> Gateway { + Gateway { + address, + created_at: ts, + elevation: Some(1), + gain: Some(2), + hash: "h0".to_string(), + is_active: Some(true), + is_full_hotspot: Some(true), + last_changed_at: ts, + location: Some(3), + location_asserts: Some(4), + location_changed_at: Some(ts), + refreshed_at: Some(ts), + updated_at: ts, + } +} diff --git a/iot_config/tests/integrations/gateway_service.rs b/iot_config/tests/integrations/gateway_service.rs new file mode 100644 index 000000000..6b9133a3b --- /dev/null +++ b/iot_config/tests/integrations/gateway_service.rs @@ -0,0 +1,406 @@ +use crate::common::{make_keypair, spawn_gateway_service}; +use chrono::{DateTime, Utc}; +use futures::StreamExt; +use h3o::{LatLng, Resolution}; +use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; +use helium_proto::{ + services::iot_config::{self as proto, GatewayClient, GatewayInfo}, + BlockchainRegionParamsV1, Region, +}; +use hextree::Cell; +use iot_config::{gateway::db::Gateway, region_map}; +use libflate::gzip::Encoder; +use prost::Message; +use sqlx::PgPool; +use std::io::Write; +use std::vec; +use tonic::Code; + +const DEFAULT_REGION: Region = Region::Us915; + +#[sqlx::test] +async fn gateway_info_v1_authorization_errors(pool: PgPool) -> anyhow::Result<()> { + // NOTE: The information we're requesting does not exist in the DB for + // this test. But we're only interested in Authization Errors. + + let admin_key = make_keypair(); // unlimited access + let gw_key = make_keypair(); // access to self + let unknown_key = make_keypair(); // no access + + // Start the gateway server + let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?; + + // Connect with the assigned address + let mut client = GatewayClient::connect(addr).await?; + + // Request gateway info as administrator + let req = make_signed_info_request(gw_key.public_key(), &admin_key); + let err = client.info(req).await.expect_err("testing expects error"); + assert_ne!( + err.code(), + Code::PermissionDenied, + "admins have full access" + ); + + // Request gateway from unknown key + let req = make_signed_info_request(gw_key.public_key(), &unknown_key); + let err = client.info(req).await.expect_err("testing expects errors"); + assert_eq!( + err.code(), + Code::PermissionDenied, + "unknown keys are denied" + ); + + // Request self with a different signer + let mut req = make_signed_info_request(gw_key.public_key(), &gw_key); + req.signature = vec![]; + req.signature = admin_key.sign(&req.encode_to_vec()).unwrap(); + let err = client.info(req).await.expect_err("testing expects errors"); + assert_eq!( + err.code(), + Code::PermissionDenied, + "signature must match signer" + ); + + Ok(()) +} + +#[sqlx::test] +async fn gateway_location_v1(pool: PgPool) -> anyhow::Result<()> { + let admin_key = make_keypair(); + let pub_key = make_keypair().public_key().clone(); + let now = Utc::now(); + + let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?; + + let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?; + + let mut client = GatewayClient::connect(addr).await?; + + let res = req_gateway_location_v1(&mut client, &pub_key, &admin_key).await?; + + let cell = Cell::from_raw(gateway.location.unwrap() as u64)?; + assert_eq!(res.location, cell.to_string()); + + Ok(()) +} + +#[sqlx::test] +async fn gateway_region_params_v1(pool: PgPool) -> anyhow::Result<()> { + let admin_key = make_keypair(); + let keypair = make_keypair(); + let pub_key = keypair.public_key().clone(); + let now = Utc::now(); + + let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?; + + let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?; + + let mut client = GatewayClient::connect(addr).await?; + + let res = req_gateway_region_params_v1(&mut client, &pub_key, &keypair).await?; + + assert_eq!(res.region, DEFAULT_REGION as i32); + assert_eq!( + res.params, + Some(BlockchainRegionParamsV1 { + region_params: vec![], + }) + ); + assert_eq!(res.gain, gateway.gain.unwrap() as u64); + + Ok(()) +} + +#[sqlx::test] +async fn gateway_info_v1(pool: PgPool) -> anyhow::Result<()> { + let admin_key = make_keypair(); + let pub_key = make_keypair().public_key().clone(); + let now = Utc::now(); + + let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?; + + let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?; + + let mut client = GatewayClient::connect(addr).await?; + + let res = req_gateway_info_v1(&mut client, &pub_key, &admin_key).await?; + + assert!(res.info.is_some()); + + let info = res.info.unwrap(); + assert_eq!(info.address, pub_key.to_vec()); + assert_eq!(info.is_full_hotspot, gateway.is_full_hotspot.unwrap()); + assert!(info.metadata.is_some()); + + let metadata = info.metadata.unwrap(); + let cell = Cell::from_raw(gateway.location.unwrap() as u64)?; + assert_eq!(metadata.location, cell.to_string()); + assert_eq!(metadata.region, Region::Us915 as i32); + assert_eq!(metadata.gain, gateway.gain.unwrap() as i32); + assert_eq!(metadata.elevation, gateway.elevation.unwrap() as i32); + + Ok(()) +} + +#[sqlx::test] +async fn gateway_info_stream_v1(pool: PgPool) -> anyhow::Result<()> { + let admin_key = make_keypair(); + let pub_key = make_keypair().public_key().clone(); + let now = Utc::now(); + + let gateway = insert_gateway(&pool, now, pub_key.clone().into()).await?; + + let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?; + + let mut client = GatewayClient::connect(addr).await?; + + let res = req_gateway_info_stream_v1(&mut client, &admin_key).await?; + + assert_eq!(res.gateways.len(), 1); + + let info: &GatewayInfo = res.gateways.first().unwrap(); + + assert_eq!(info.address, pub_key.to_vec()); + assert_eq!(info.is_full_hotspot, gateway.is_full_hotspot.unwrap()); + assert!(info.metadata.is_some()); + + let metadata = info.metadata.clone().unwrap(); + let cell = Cell::from_raw(gateway.location.unwrap() as u64)?; + assert_eq!(metadata.location, cell.to_string()); + assert_eq!(metadata.region, Region::Us915 as i32); + assert_eq!(metadata.gain, gateway.gain.unwrap() as i32); + assert_eq!(metadata.elevation, gateway.elevation.unwrap() as i32); + + Ok(()) +} + +#[sqlx::test] +async fn gateway_info_stream_v2(pool: PgPool) -> anyhow::Result<()> { + let admin_key = make_keypair(); + let pub_key1 = make_keypair().public_key().clone(); + let pub_key2 = make_keypair().public_key().clone(); + let now_min_15 = Utc::now() - chrono::Duration::minutes(15); + let now_min_10 = Utc::now() - chrono::Duration::minutes(10); + + let gateway1 = insert_gateway(&pool, now_min_15, pub_key1.clone().into()).await?; + let gateway2 = insert_gateway(&pool, now_min_10, pub_key2.clone().into()).await?; + + let (addr, _) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await?; + + let mut client = GatewayClient::connect(addr).await?; + + // Get them ALL + let res = req_gateway_info_stream_v2(&mut client, &admin_key, 0, 0).await?; + + assert_eq!(res.gateways.len(), 2); + + let info: &GatewayInfo = res.gateways.first().unwrap(); + + assert_eq!(info.address, pub_key1.to_vec()); + assert_eq!(info.is_full_hotspot, gateway1.is_full_hotspot.unwrap()); + assert!(info.metadata.is_some()); + + let metadata = info.metadata.clone().unwrap(); + let cell = Cell::from_raw(gateway1.location.unwrap() as u64)?; + assert_eq!(metadata.location, cell.to_string()); + assert_eq!(metadata.region, Region::Us915 as i32); + assert_eq!(metadata.gain, gateway1.gain.unwrap() as i32); + assert_eq!(metadata.elevation, gateway1.elevation.unwrap() as i32); + + // Get min_updated_at = now_min_10 + let res = req_gateway_info_stream_v2(&mut client, &admin_key, now_min_10.timestamp() as u64, 0) + .await?; + + assert_eq!(res.gateways.len(), 1); + + let info: &GatewayInfo = res.gateways.first().unwrap(); + + assert_eq!(info.address, pub_key2.to_vec()); + assert_eq!(info.is_full_hotspot, gateway2.is_full_hotspot.unwrap()); + assert!(info.metadata.is_some()); + + let metadata = info.metadata.clone().unwrap(); + let cell = Cell::from_raw(gateway2.location.unwrap() as u64)?; + assert_eq!(metadata.location, cell.to_string()); + assert_eq!(metadata.region, Region::Us915 as i32); + assert_eq!(metadata.gain, gateway2.gain.unwrap() as i32); + assert_eq!(metadata.elevation, gateway2.elevation.unwrap() as i32); + + // Get min_location_changed_at = now_min_10 + let res = req_gateway_info_stream_v2(&mut client, &admin_key, 0, now_min_10.timestamp() as u64) + .await?; + + assert_eq!(res.gateways.len(), 1); + + let info: &GatewayInfo = res.gateways.first().unwrap(); + + assert_eq!(info.address, pub_key2.to_vec()); + assert_eq!(info.is_full_hotspot, gateway2.is_full_hotspot.unwrap()); + assert!(info.metadata.is_some()); + + let metadata = info.metadata.clone().unwrap(); + let cell = Cell::from_raw(gateway2.location.unwrap() as u64)?; + assert_eq!(metadata.location, cell.to_string()); + assert_eq!(metadata.region, Region::Us915 as i32); + assert_eq!(metadata.gain, gateway2.gain.unwrap() as i32); + assert_eq!(metadata.elevation, gateway2.elevation.unwrap() as i32); + + Ok(()) +} + +fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::GatewayInfoReqV1 { + let mut req = proto::GatewayInfoReqV1 { + address: address.to_vec(), + signer: signer.public_key().to_vec(), + signature: vec![], + }; + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + req +} + +async fn req_gateway_location_v1( + client: &mut GatewayClient, + address: &PublicKey, + signer: &Keypair, +) -> anyhow::Result { + let mut req = proto::GatewayLocationReqV1 { + gateway: address.to_vec(), + signer: signer.public_key().to_vec(), + signature: vec![], + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + + let res = client.location(req).await?.into_inner(); + Ok(res) +} + +async fn req_gateway_region_params_v1( + client: &mut GatewayClient, + address: &PublicKey, + signer: &Keypair, +) -> anyhow::Result { + let mut req = proto::GatewayRegionParamsReqV1 { + region: 0, + address: address.to_vec(), + signature: vec![], + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + + let res = client.region_params(req).await?.into_inner(); + Ok(res) +} + +async fn req_gateway_info_v1( + client: &mut GatewayClient, + address: &PublicKey, + signer: &Keypair, +) -> anyhow::Result { + let mut req = proto::GatewayInfoReqV1 { + address: address.to_vec(), + signer: signer.public_key().to_vec(), + signature: vec![], + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + + let res = client.info(req).await?.into_inner(); + Ok(res) +} + +async fn req_gateway_info_stream_v1( + client: &mut GatewayClient, + signer: &Keypair, +) -> anyhow::Result { + let mut req = proto::GatewayInfoStreamReqV1 { + batch_size: 10_000, + signer: signer.public_key().to_vec(), + signature: vec![], + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + + let mut stream = client.info_stream(req).await?.into_inner(); + + let first = stream + .next() + .await + .transpose()? // map tonic Status into Err + .ok_or_else(|| anyhow::Error::msg("no response"))?; + + Ok(first) +} + +async fn req_gateway_info_stream_v2( + client: &mut GatewayClient, + signer: &Keypair, + min_updated_at: u64, + min_location_changed_at: u64, +) -> anyhow::Result { + let mut req = proto::GatewayInfoStreamReqV2 { + batch_size: 10_000, + min_updated_at, + min_location_changed_at, + signer: signer.public_key().to_vec(), + signature: vec![], + }; + + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + + let mut stream = client.info_stream_v2(req).await?.into_inner(); + + let first = stream + .next() + .await + .transpose()? // map tonic Status into Err + .ok_or_else(|| anyhow::Error::msg("no response"))?; + + Ok(first) +} + +async fn insert_gateway( + pool: &PgPool, + now: DateTime, + pub_key: PublicKeyBinary, +) -> anyhow::Result { + let sf = LatLng::new(37.7749, -122.4194)?; // San Francisco + let cell = sf.to_cell(Resolution::Twelve); // resolution 12 + let h3_index: u64 = cell.into(); // u64 + + let gateway = Gateway { + address: pub_key.clone(), + created_at: now, + elevation: Some(1), + gain: Some(2), + hash: "hash1".to_string(), + is_active: Some(true), + is_full_hotspot: Some(true), + last_changed_at: now, + location: Some(h3_index), + location_asserts: Some(1), + location_changed_at: Some(now), + refreshed_at: Some(now), + updated_at: now, + }; + + Gateway::insert_bulk(pool, std::slice::from_ref(&gateway)).await?; + + let loc_bytes = h3_index.to_le_bytes(); + let mut encoder = Encoder::new(Vec::new())?; + encoder.write_all(&loc_bytes)?; + let compressed = encoder.finish().into_result()?; + + region_map::update_region( + DEFAULT_REGION, + &BlockchainRegionParamsV1 { + region_params: vec![], + }, + Some(&compressed), + pool, + ) + .await?; + + Ok(gateway) +} diff --git a/iot_config/tests/integrations/gateway_tracker.rs b/iot_config/tests/integrations/gateway_tracker.rs new file mode 100644 index 000000000..fea58002b --- /dev/null +++ b/iot_config/tests/integrations/gateway_tracker.rs @@ -0,0 +1,115 @@ +use crate::common::{ + self, + gateway_metadata_db::{create_tables, insert_gateway, insert_iot_hotspot_infos}, + make_keypair, +}; +use chrono::Utc; +use custom_tracing::Settings; +use iot_config::gateway::{db::Gateway, tracker}; +use sqlx::PgPool; + +#[sqlx::test] +async fn execute_test(pool: PgPool) -> anyhow::Result<()> { + custom_tracing::init("iot_config=debug,info".to_string(), Settings::default()).await?; + + let pubkey1 = make_keypair().public_key().clone(); + let location = 631_711_281_837_647_359_i64; + let now = Utc::now() - chrono::Duration::seconds(10); + + // Ensure tables exist + create_tables(&pool).await?; + + // Insert test data into iot_hotspot_infos + insert_gateway( + &pool, + "address1", // address (PRIMARY KEY) + "asset1", // asset + Some(location), // location + Some(1), // elevation + Some(2), // gain + Some(true), // is_full_hotspot + Some(3), // num_location_asserts + Some(true), // is_active + Some(0), // dc_onboarding_fee_paid + now, // created_at + Some(now), // refreshed_at + Some(0), // last_block + pubkey1.clone().into(), // key (PublicKeyBinary) + ) + .await?; + + // Execute tracker logic + tracker::execute(&pool, &pool).await?; + + // Retrieve gateway record and assert fields + let gateway = Gateway::get_by_address(&pool, &pubkey1.clone().into()) + .await? + .expect("gateway not found"); + + assert_eq!(gateway.created_at, common::nanos_trunc(now)); + assert_eq!(gateway.elevation, Some(1)); + assert_eq!(gateway.gain, Some(2)); + assert_eq!(gateway.is_active, Some(true)); + assert_eq!(gateway.is_full_hotspot, Some(true)); + assert!(gateway.last_changed_at > now); + assert_eq!(gateway.location, Some(location as u64)); + assert_eq!(gateway.location_asserts, Some(3)); + assert!(gateway.location_changed_at.is_some()); + assert_eq!( + gateway.location_changed_at.unwrap(), + common::nanos_trunc(now) + ); + assert!(gateway.refreshed_at.is_some()); + assert_eq!(gateway.refreshed_at.unwrap(), common::nanos_trunc(now)); + assert!(gateway.updated_at > now); + + let refreshed_at = Utc::now(); + let location = 666_711_281_837_647_360_i64; + // Insert test data into iot_hotspot_infos + insert_iot_hotspot_infos( + &pool, + "address1", // address (PRIMARY KEY) + "asset1", // asset + Some(location), // location + Some(10), // elevation + Some(20), // gain + Some(false), // is_full_hotspot + Some(30), // num_location_asserts + Some(false), // is_active + Some(0), // dc_onboarding_fee_paid + now, // created_at + Some(refreshed_at), // refreshed_at + Some(0), // last_block + ) + .await?; + + // Execute tracker logic + tracker::execute(&pool, &pool).await?; + + // Retrieve gateway record and assert fields + let gateway = Gateway::get_by_address(&pool, &pubkey1.clone().into()) + .await? + .expect("gateway not found"); + + assert_eq!(gateway.created_at, common::nanos_trunc(now)); + assert_eq!(gateway.elevation, Some(10)); + assert_eq!(gateway.gain, Some(20)); + assert_eq!(gateway.is_active, Some(false)); + assert_eq!(gateway.is_full_hotspot, Some(false)); + assert_eq!(gateway.last_changed_at, common::nanos_trunc(refreshed_at)); + assert_eq!(gateway.location, Some(location as u64)); + assert_eq!(gateway.location_asserts, Some(30)); + assert!(gateway.location_changed_at.is_some()); + assert_eq!( + gateway.location_changed_at.unwrap(), + common::nanos_trunc(refreshed_at) + ); + assert!(gateway.refreshed_at.is_some()); + assert_eq!( + gateway.refreshed_at.unwrap(), + common::nanos_trunc(refreshed_at) + ); + assert!(gateway.updated_at > refreshed_at); + + Ok(()) +} diff --git a/iot_config/tests/integrations/main.rs b/iot_config/tests/integrations/main.rs new file mode 100644 index 000000000..51bd3f58a --- /dev/null +++ b/iot_config/tests/integrations/main.rs @@ -0,0 +1,6 @@ +mod common; + +mod gateway_db; +mod gateway_service; +mod gateway_tracker; +mod route_service; diff --git a/iot_config/tests/route_service.rs b/iot_config/tests/integrations/route_service.rs similarity index 100% rename from iot_config/tests/route_service.rs rename to iot_config/tests/integrations/route_service.rs diff --git a/iot_verifier/src/gateway_cache.rs b/iot_verifier/src/gateway_cache.rs index 3843eb93f..78509353a 100644 --- a/iot_verifier/src/gateway_cache.rs +++ b/iot_verifier/src/gateway_cache.rs @@ -6,7 +6,7 @@ use crate::gateway_updater::MessageReceiver; use helium_crypto::PublicKeyBinary; -use iot_config::gateway_info::GatewayInfo; +use iot_config::gateway::service::info::GatewayInfo; #[derive(Clone)] pub struct GatewayCache { diff --git a/iot_verifier/src/gateway_updater.rs b/iot_verifier/src/gateway_updater.rs index 07c926b5b..300c676a2 100644 --- a/iot_verifier/src/gateway_updater.rs +++ b/iot_verifier/src/gateway_updater.rs @@ -5,7 +5,7 @@ use futures::stream::StreamExt; use helium_crypto::PublicKeyBinary; use iot_config::client::ClientError; -use iot_config::{client::Gateways, gateway_info::GatewayInfo}; +use iot_config::{client::Gateways, gateway::service::info::GatewayInfo}; use std::{collections::HashMap, time::Duration}; use task_manager::ManagedTask; use tokio::sync::watch; diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index c2770e4f3..77ed8359b 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -26,7 +26,7 @@ use helium_proto::{ }; use iot_config::{ client::Gateways, - gateway_info::{GatewayInfo, GatewayMetadata}, + gateway::service::info::{GatewayInfo, GatewayMetadata}, }; use rust_decimal::Decimal; use sqlx::PgPool; diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 0769ff215..4621d3536 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -56,7 +56,7 @@ use helium_proto::services::poc_lora::{ InvalidDetails, InvalidParticipantSide, InvalidReason, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, LoraPocV1, VerificationStatus, }; -use iot_config::{client::Gateways, gateway_info::GatewayInfo}; +use iot_config::{client::Gateways, gateway::service::info::GatewayInfo}; use rust_decimal::{Decimal, MathematicalOps}; use rust_decimal_macros::dec; use sqlx::PgPool; diff --git a/iot_verifier/tests/integrations/common/mod.rs b/iot_verifier/tests/integrations/common/mod.rs index 29dc81ded..db7b3674b 100644 --- a/iot_verifier/tests/integrations/common/mod.rs +++ b/iot_verifier/tests/integrations/common/mod.rs @@ -17,7 +17,7 @@ use helium_proto::{ }; use iot_config::{ client::RegionParamsInfo, - gateway_info::{GatewayInfo, GatewayMetadata}, + gateway::service::info::{GatewayInfo, GatewayMetadata}, }; use iot_verifier::{ entropy::Entropy, diff --git a/iot_verifier/tests/integrations/runner_tests.rs b/iot_verifier/tests/integrations/runner_tests.rs index 9d9682068..244e5b1fa 100644 --- a/iot_verifier/tests/integrations/runner_tests.rs +++ b/iot_verifier/tests/integrations/runner_tests.rs @@ -12,7 +12,7 @@ use helium_proto::Region as ProtoRegion; use iot_config::client::ClientError; use iot_config::{ client::{Gateways, RegionParamsInfo}, - gateway_info::{GatewayInfo, GatewayInfoStream}, + gateway::service::info::{GatewayInfo, GatewayInfoStream}, }; use iot_verifier::witness_updater::WitnessUpdater; use iot_verifier::{