diff --git a/Cargo.lock b/Cargo.lock index c0f58d916..7d61445c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4947,6 +4947,7 @@ dependencies = [ "strum", "strum_macros", "task-manager", + "tempfile", "thiserror 1.0.69", "tls-init", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 3de5c52e5..55a480b32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,10 +126,10 @@ aws-smithy-types-convert = { version = "0.60.9", features = [ url = "2.5.4" ### Protobuf -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "connor/historical", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "connor/historical" } # Pickup versions from above prost = "*" tonic = { version = "*", features = ["tls-aws-lc", "tls-native-roots"] } diff --git a/file_store_oracles/src/traits/msg_verify.rs b/file_store_oracles/src/traits/msg_verify.rs index 52356530e..121fb5653 100644 --- a/file_store_oracles/src/traits/msg_verify.rs +++ b/file_store_oracles/src/traits/msg_verify.rs @@ -102,6 +102,7 @@ impl_msg_verify!(mobile_config::GatewayInfoStreamResV3, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature); impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature); +impl_msg_verify!(mobile_config::GatewayInfoAtTimestampReqV1, signature); impl_msg_verify!(sub_dao::SubDaoEpochRewardInfoReqV1, signature); impl_msg_verify!(sub_dao::SubDaoEpochRewardInfoResV1, signature); impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature); diff --git a/mobile_config/Cargo.toml b/mobile_config/Cargo.toml index 8d5ad731c..d0a08b3b7 100644 --- a/mobile_config/Cargo.toml +++ b/mobile_config/Cargo.toml @@ -61,3 +61,4 @@ tls-init = { path = "../tls_init" } [dev-dependencies] rand = { workspace = true } tokio-stream = { workspace = true, features = ["net"] } +tempfile = "3" diff --git a/mobile_config/migrations/20251003000000_gateways_historical.sql b/mobile_config/migrations/20251003000000_gateways_historical.sql new file mode 100644 index 000000000..5fb7aa774 --- /dev/null +++ b/mobile_config/migrations/20251003000000_gateways_historical.sql @@ -0,0 +1,28 @@ +-- 1. Drop the primary key constraint on address +ALTER TABLE + gateways DROP CONSTRAINT IF EXISTS gateways_pkey; + +-- 2. Rename column updated_at -> inserted_at +ALTER TABLE + gateways RENAME COLUMN updated_at TO inserted_at; + +-- 3. Backfill inserted_at with created_at values +UPDATE + gateways +SET + inserted_at = created_at; + +-- 4. Ensure inserted_at is NOT NULL and has a default value of now() +ALTER TABLE + gateways +ALTER COLUMN inserted_at SET DEFAULT now(), +ALTER COLUMN inserted_at SET NOT NULL; + +-- 5. Create an index on (address, inserted_at DESC) +CREATE INDEX IF NOT EXISTS gateways_address_inserted_idx ON gateways (address, inserted_at DESC); + +-- 6. Create an PK on (address, inserted_at DESC) +ALTER TABLE + gateways +ADD + CONSTRAINT gateways_pkey PRIMARY KEY (address, inserted_at); \ No newline at end of file diff --git a/mobile_config/src/gateway/db.rs b/mobile_config/src/gateway/db.rs index c7a4c039a..7821f752e 100644 --- a/mobile_config/src/gateway/db.rs +++ b/mobile_config/src/gateway/db.rs @@ -81,8 +81,8 @@ pub struct Gateway { pub gateway_type: GatewayType, // When the record was first created from metadata DB pub created_at: DateTime, - // When record was last updated - pub updated_at: DateTime, + // When record was inserted + pub inserted_at: DateTime, // When record was last updated from metadata DB (could be set to now if no metadata DB info) pub refreshed_at: DateTime, // When location or hash last changed, set to refreshed_at (updated via SQL query see Gateway::insert) @@ -96,6 +96,7 @@ pub struct Gateway { pub location_changed_at: Option>, pub location_asserts: Option, } + #[derive(Debug)] pub struct LocationChangedAtUpdate { pub address: PublicKeyBinary, @@ -113,7 +114,6 @@ impl Gateway { address, gateway_type, created_at, - updated_at, refreshed_at, last_changed_at, hash, @@ -130,7 +130,6 @@ impl Gateway { b.push_bind(g.address.as_ref()) .push_bind(g.gateway_type) .push_bind(g.created_at) - .push_bind(g.updated_at) .push_bind(g.refreshed_at) .push_bind(g.last_changed_at) .push_bind(g.hash.as_str()) @@ -142,31 +141,6 @@ impl Gateway { .push_bind(g.location_asserts.map(|v| v as i64)); }); - qb.push( - " ON CONFLICT (address) DO UPDATE SET - gateway_type = EXCLUDED.gateway_type, - created_at = EXCLUDED.created_at, - updated_at = EXCLUDED.updated_at, - refreshed_at = EXCLUDED.refreshed_at, - last_changed_at = CASE - WHEN gateways.location IS DISTINCT FROM EXCLUDED.location - OR gateways.hash IS DISTINCT FROM EXCLUDED.hash - THEN EXCLUDED.refreshed_at - ELSE gateways.last_changed_at - END, - hash = EXCLUDED.hash, - antenna = EXCLUDED.antenna, - elevation = EXCLUDED.elevation, - azimuth = EXCLUDED.azimuth, - location = EXCLUDED.location, - location_changed_at = CASE - WHEN gateways.location IS DISTINCT FROM EXCLUDED.location - THEN EXCLUDED.refreshed_at - ELSE gateways.location_changed_at - END, - location_asserts = EXCLUDED.location_asserts", - ); - let res = qb.build().execute(pool).await?; Ok(res.rows_affected()) } @@ -178,7 +152,6 @@ impl Gateway { address, gateway_type, created_at, - updated_at, refreshed_at, last_changed_at, hash, @@ -191,37 +164,13 @@ impl Gateway { ) VALUES ( $1, $2, $3, $4, $5, $6, $7, - $8, $9, $10, $11, $12, $13 + $8, $9, $10, $11, $12 ) - ON CONFLICT (address) - DO UPDATE SET - gateway_type = EXCLUDED.gateway_type, - created_at = EXCLUDED.created_at, - updated_at = EXCLUDED.updated_at, - refreshed_at = EXCLUDED.refreshed_at, - last_changed_at = CASE - WHEN gateways.location IS DISTINCT FROM EXCLUDED.location - OR gateways.hash IS DISTINCT FROM EXCLUDED.hash - THEN EXCLUDED.refreshed_at - ELSE gateways.last_changed_at - END, - hash = EXCLUDED.hash, - antenna = EXCLUDED.antenna, - elevation = EXCLUDED.elevation, - azimuth = EXCLUDED.azimuth, - location = EXCLUDED.location, - location_changed_at = CASE - WHEN gateways.location IS DISTINCT FROM EXCLUDED.location - THEN EXCLUDED.refreshed_at - ELSE gateways.location_changed_at - END, - location_asserts = EXCLUDED.location_asserts "#, ) .bind(self.address.as_ref()) .bind(self.gateway_type) .bind(self.created_at) - .bind(self.updated_at) .bind(self.refreshed_at) .bind(self.last_changed_at) .bind(self.hash.as_str()) @@ -247,7 +196,7 @@ impl Gateway { address, gateway_type, created_at, - updated_at, + inserted_at, refreshed_at, last_changed_at, hash, @@ -259,6 +208,8 @@ impl Gateway { location_asserts FROM gateways WHERE address = $1 + ORDER BY inserted_at DESC + LIMIT 1 "#, ) .bind(address.as_ref()) @@ -268,6 +219,76 @@ impl Gateway { Ok(gateway) } + pub async fn get_by_addresses<'a>( + db: impl PgExecutor<'a>, + addresses: Vec, + ) -> anyhow::Result> { + let addr_array: Vec> = addresses.iter().map(|a| a.as_ref().to_vec()).collect(); + + let rows = sqlx::query_as::<_, Self>( + r#" + SELECT DISTINCT ON (address) + address, + gateway_type, + created_at, + inserted_at, + refreshed_at, + last_changed_at, + hash, + antenna, + elevation, + azimuth, + location, + location_changed_at, + location_asserts + FROM gateways + WHERE address = ANY($1) + ORDER BY address, inserted_at DESC + "#, + ) + .bind(addr_array) + .fetch_all(db) + .await?; + + Ok(rows) + } + + pub async fn get_by_address_and_inserted_at<'a>( + db: impl PgExecutor<'a>, + address: &PublicKeyBinary, + inserted_at_max: &DateTime, + ) -> anyhow::Result> { + let gateway = sqlx::query_as::<_, Self>( + r#" + SELECT + address, + gateway_type, + created_at, + inserted_at, + refreshed_at, + last_changed_at, + hash, + antenna, + elevation, + azimuth, + location, + location_changed_at, + location_asserts + FROM gateways + WHERE address = $1 + AND inserted_at <= $2 + ORDER BY inserted_at DESC + LIMIT 1 + "#, + ) + .bind(address.as_ref()) + .bind(inserted_at_max) + .fetch_optional(db) + .await?; + + Ok(gateway) + } + pub fn stream_by_addresses<'a>( db: impl PgExecutor<'a> + 'a, addresses: Vec, @@ -277,11 +298,11 @@ impl Gateway { sqlx::query_as::<_, Self>( r#" - SELECT + SELECT DISTINCT ON (address) address, gateway_type, created_at, - updated_at, + inserted_at, refreshed_at, last_changed_at, hash, @@ -294,6 +315,7 @@ impl Gateway { FROM gateways WHERE address = ANY($1) AND last_changed_at >= $2 + ORDER BY address, inserted_at DESC "#, ) .bind(addr_array) @@ -311,11 +333,11 @@ impl Gateway { ) -> impl Stream + 'a { sqlx::query_as::<_, Self>( r#" - SELECT + SELECT DISTINCT ON (address) address, gateway_type, created_at, - updated_at, + inserted_at, refreshed_at, last_changed_at, hash, @@ -332,6 +354,7 @@ impl Gateway { $3::timestamptz IS NULL OR (location IS NOT NULL AND location_changed_at >= $3) ) + ORDER BY address, inserted_at DESC "#, ) .bind(types) @@ -395,7 +418,7 @@ impl FromRow<'_, PgRow> for Gateway { address: PublicKeyBinary::from(row.try_get::, _>("address")?), gateway_type: row.try_get("gateway_type")?, created_at: row.try_get("created_at")?, - updated_at: row.try_get("updated_at")?, + inserted_at: row.try_get("inserted_at")?, refreshed_at: row.try_get("refreshed_at")?, last_changed_at: row.try_get("last_changed_at")?, hash: row.try_get("hash")?, diff --git a/mobile_config/src/gateway/metadata_db.rs b/mobile_config/src/gateway/metadata_db.rs index 7987579cd..983382532 100644 --- a/mobile_config/src/gateway/metadata_db.rs +++ b/mobile_config/src/gateway/metadata_db.rs @@ -119,14 +119,15 @@ impl MobileHotspotInfo { None => (None, None, None), }; + let refreshed_at = self.refreshed_at.unwrap_or_else(Utc::now); + Ok(Some(Gateway { address: self.entity_key.clone(), gateway_type: GatewayType::try_from(self.device_type.clone())?, created_at: self.created_at, - updated_at: Utc::now(), - refreshed_at: self.refreshed_at.unwrap_or_else(Utc::now), - // Updated via SQL query see Gateway::insert - last_changed_at: Utc::now(), + inserted_at: Utc::now(), + refreshed_at, + last_changed_at: refreshed_at, hash: self.compute_hash(), antenna, elevation, @@ -134,7 +135,7 @@ impl MobileHotspotInfo { location, // Set to refreshed_at when hotspot has a location, None otherwise location_changed_at: if location.is_some() { - Some(self.refreshed_at.unwrap_or_else(Utc::now)) + Some(refreshed_at) } else { None }, @@ -164,7 +165,7 @@ impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for MobileHotspotInfo { ) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?, refreshed_at: row.get::>, &str>("refreshed_at"), - created_at: row.get::, &str>("refreshed_at"), + created_at: row.get::, &str>("created_at"), location: row.get::, &str>("location"), is_full_hotspot: row.get::, &str>("is_full_hotspot"), num_location_asserts: row.get::, &str>("num_location_asserts"), diff --git a/mobile_config/src/gateway/service/info.rs b/mobile_config/src/gateway/service/info.rs index f64828e96..16c8ba973 100644 --- a/mobile_config/src/gateway/service/info.rs +++ b/mobile_config/src/gateway/service/info.rs @@ -453,3 +453,12 @@ pub fn stream_by_types<'a>( Gateway::stream_by_types(db, gateway_types, min_date, None).map(|gateway| gateway.into()) } + +pub async fn get_by_address_and_inserted_at( + db: impl PgExecutor<'_>, + pubkey_bin: &PublicKeyBinary, + inserted_at_max: &DateTime, +) -> anyhow::Result> { + let gateway = Gateway::get_by_address_and_inserted_at(db, pubkey_bin, inserted_at_max).await?; + Ok(gateway.map(|g| g.into())) +} diff --git a/mobile_config/src/gateway/service/mod.rs b/mobile_config/src/gateway/service/mod.rs index 0d21261c3..48ef6d4f0 100644 --- a/mobile_config/src/gateway/service/mod.rs +++ b/mobile_config/src/gateway/service/mod.rs @@ -1,5 +1,5 @@ use crate::{ - gateway::service::{info::DeviceType, info_v3::DeviceTypeV2}, + gateway::service::{info::DeviceType, info::GatewayInfo, info_v3::DeviceTypeV2}, key_cache::KeyCache, telemetry, verify_public_key, GrpcResult, GrpcStreamResult, }; @@ -13,9 +13,10 @@ use futures::{ use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign}; use helium_proto::{ services::mobile_config::{ - self, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoResV2, - GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamReqV3, - GatewayInfoStreamResV1, GatewayInfoStreamResV2, GatewayInfoStreamResV3, GatewayInfoV2, + self, GatewayInfoAtTimestampReqV1, GatewayInfoBatchReqV1, GatewayInfoReqV1, + GatewayInfoResV1, GatewayInfoResV2, GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, + GatewayInfoStreamReqV3, GatewayInfoStreamResV1, GatewayInfoStreamResV2, + GatewayInfoStreamResV3, GatewayInfoV2, }, Message, }; @@ -52,9 +53,17 @@ impl GatewayService { Err(Status::permission_denied("unauthorized request signature")) } - fn verify_request_signature_for_info(&self, request: &GatewayInfoReqV1) -> Result<(), Status> { - let signer = verify_public_key(&request.signer)?; - let address = verify_public_key(&request.address)?; + fn verify_request_signature_for_info( + &self, + request: &R, + signer: &[u8], + address: &[u8], + ) -> Result<(), Status> + where + R: MsgVerify, + { + let signer = verify_public_key(signer)?; + let address = verify_public_key(address)?; if address == signer && request.verify(&signer).is_ok() { tracing::debug!(%signer, "self authorized"); @@ -69,6 +78,27 @@ impl GatewayService { .sign(response) .map_err(|_| Status::internal("response signing error")) } + + fn map_info_v2_response(&self, info: GatewayInfo) -> GrpcResult { + if info.metadata.is_some() { + telemetry::count_gateway_chain_lookup("asserted"); + } else { + telemetry::count_gateway_chain_lookup("not-asserted"); + }; + + let info: GatewayInfoV2 = info + .try_into() + .map_err(|_| Status::internal("error serializing historical gateway info (v2)"))?; + + let mut res = GatewayInfoResV2 { + info: Some(info), + timestamp: Utc::now().encode_timestamp(), + signer: self.signing_key.public_key().into(), + signature: vec![], + }; + res.signature = self.sign_response(&res.encode_to_vec())?; + Ok(Response::new(res)) + } } #[tonic::async_trait] @@ -80,7 +110,7 @@ impl mobile_config::Gateway for GatewayService { custom_tracing::record_b58("pub_key", &request.address); custom_tracing::record_b58("signer", &request.signer); - self.verify_request_signature_for_info(&request)?; + self.verify_request_signature_for_info(&request, &request.signer, &request.address)?; let pubkey: PublicKeyBinary = request.address.into(); tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info"); @@ -120,7 +150,7 @@ impl mobile_config::Gateway for GatewayService { custom_tracing::record_b58("pub_key", &request.address); custom_tracing::record_b58("signer", &request.signer); - self.verify_request_signature_for_info(&request)?; + self.verify_request_signature_for_info(&request, &request.signer, &request.address)?; let pubkey: PublicKeyBinary = request.address.into(); tracing::debug!(pubkey = pubkey.to_string(), "fetching gateway info (v2)"); @@ -133,26 +163,41 @@ impl mobile_config::Gateway for GatewayService { telemetry::count_gateway_chain_lookup("not-found"); Err(Status::not_found(pubkey.to_string())) }, - |info| { - if info.metadata.is_some() { - telemetry::count_gateway_chain_lookup("asserted"); - } else { - telemetry::count_gateway_chain_lookup("not-asserted"); - }; + |info| self.map_info_v2_response(info), + ) + } - let info: GatewayInfoV2 = info - .try_into() - .map_err(|_| Status::internal("error serializing gateway info (v2)"))?; + async fn info_at_timestamp( + &self, + request: Request, + ) -> GrpcResult { + let request = request.into_inner(); + telemetry::count_request("gateway", "info-v2"); + custom_tracing::record_b58("pub_key", &request.address); + custom_tracing::record_b58("signer", &request.signer); - let mut res = GatewayInfoResV2 { - info: Some(info), - timestamp: Utc::now().encode_timestamp(), - signer: self.signing_key.public_key().into(), - signature: vec![], - }; - res.signature = self.sign_response(&res.encode_to_vec())?; - Ok(Response::new(res)) + self.verify_request_signature_for_info(&request, &request.signer, &request.address)?; + + let pubkey: PublicKeyBinary = request.address.into(); + tracing::debug!( + pubkey = pubkey.to_string(), + "fetching gateway info at timestamp" + ); + + let query_time = Utc + .timestamp_opt(request.query_time as i64, 0) + .single() + .ok_or(Status::invalid_argument("Invalid query_time argument"))?; + + info::get_by_address_and_inserted_at(&self.pool, &pubkey, &query_time) + .await + .map_err(|_| Status::internal("error fetching gateway info at timestamp"))? + .map_or_else( + || { + telemetry::count_gateway_chain_lookup("not-found"); + Err(Status::not_found(pubkey.to_string())) }, + |info| self.map_info_v2_response(info), ) } diff --git a/mobile_config/src/gateway/tracker.rs b/mobile_config/src/gateway/tracker.rs index 8d40798d9..afdd171d3 100644 --- a/mobile_config/src/gateway/tracker.rs +++ b/mobile_config/src/gateway/tracker.rs @@ -2,7 +2,10 @@ use crate::gateway::{db::Gateway, metadata_db::MobileHotspotInfo}; use futures::stream::TryChunksError; use futures_util::TryStreamExt; use sqlx::{Pool, Postgres}; -use std::time::{Duration, Instant}; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; use task_manager::ManagedTask; const EXECUTE_DURATION_METRIC: &str = @@ -62,11 +65,61 @@ pub async fn execute(pool: &Pool, metadata: &Pool) -> anyhow let total: u64 = MobileHotspotInfo::stream(metadata) .map_err(anyhow::Error::from) - .try_filter_map(|mhi| async move { mhi.to_gateway() }) + .try_filter_map(|mhi| async move { + match mhi.to_gateway() { + Ok(Some(gw)) => Ok(Some(gw)), + Ok(None) => Ok(None), + Err(e) => { + tracing::error!(?e, "error converting gateway"); + Err(e) + } + } + }) .try_chunks(BATCH_SIZE) .map_err(|TryChunksError(_gateways, err)| err) .try_fold(0, |total, batch| async move { - let affected = Gateway::insert_bulk(pool, &batch).await?; + let addresses: Vec<_> = batch.iter().map(|gw| gw.address.clone()).collect(); + let existing_gateways = Gateway::get_by_addresses(pool, addresses).await?; + let mut existing_map = existing_gateways + .into_iter() + .map(|gw| (gw.address.clone(), gw)) + .collect::>(); + + let mut to_insert = Vec::with_capacity(batch.len()); + + for mut gw in batch { + match existing_map.remove(&gw.address) { + None => { + // New gateway + to_insert.push(gw); + } + Some(last_gw) => { + let loc_changed = gw.location != last_gw.location; + let hash_changed = gw.hash != last_gw.hash; + + // FYI hash includes location + gw.last_changed_at = if hash_changed { + gw.refreshed_at + } else { + last_gw.last_changed_at + }; + + gw.location_changed_at = if loc_changed { + Some(gw.refreshed_at) + } else { + last_gw.location_changed_at + }; + + // We only add record if something changed + // FYI hash includes location + if hash_changed { + to_insert.push(gw); + } + } + } + } + + let affected = Gateway::insert_bulk(pool, &to_insert).await?; Ok(total + affected) }) .await?; diff --git a/mobile_config/tests/integrations/common/gateway_db.rs b/mobile_config/tests/integrations/common/gateway_db.rs new file mode 100644 index 000000000..7ca9dd2d0 --- /dev/null +++ b/mobile_config/tests/integrations/common/gateway_db.rs @@ -0,0 +1,71 @@ +use chrono::{DateTime, Utc}; +use helium_crypto::PublicKeyBinary; +use mobile_config::gateway::db::GatewayType; +use sqlx::PgPool; + +#[derive(Debug, Clone)] +pub struct PreHistoricalGateway { + pub address: PublicKeyBinary, + pub gateway_type: GatewayType, + // When the record was first created from metadata DB + pub created_at: DateTime, + // When record was last updated + pub updated_at: DateTime, + // When record was last updated from metadata DB (could be set to now if no metadata DB info) + pub refreshed_at: DateTime, + // When location or hash last changed, set to refreshed_at (updated via SQL query see Gateway::insert) + pub last_changed_at: DateTime, + pub hash: String, + pub antenna: Option, + pub elevation: Option, + pub azimuth: Option, + pub location: Option, + // When location last changed, set to refreshed_at (updated via SQL query see Gateway::insert) + pub location_changed_at: Option>, + pub location_asserts: Option, +} + +impl PreHistoricalGateway { + pub async fn insert(&self, pool: &PgPool) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO gateways ( + address, + gateway_type, + created_at, + updated_at, + refreshed_at, + last_changed_at, + hash, + antenna, + elevation, + azimuth, + location, + location_changed_at, + location_asserts + ) + VALUES ( + $1, $2, $3, $4, $5, $6, $7, + $8, $9, $10, $11, $12, $13 + ) + "#, + ) + .bind(self.address.as_ref()) + .bind(self.gateway_type) + .bind(self.created_at) + .bind(self.updated_at) + .bind(self.refreshed_at) + .bind(self.last_changed_at) + .bind(self.hash.as_str()) + .bind(self.antenna.map(|v| v as i64)) + .bind(self.elevation.map(|v| v as i64)) + .bind(self.azimuth.map(|v| v as i64)) + .bind(self.location.map(|v| v as i64)) + .bind(self.location_changed_at) + .bind(self.location_asserts.map(|v| v as i64)) + .execute(pool) + .await?; + + Ok(()) + } +} diff --git a/mobile_config/tests/integrations/common/gateway_metadata_db.rs b/mobile_config/tests/integrations/common/gateway_metadata_db.rs index e92d4259a..40ea78a5e 100644 --- a/mobile_config/tests/integrations/common/gateway_metadata_db.rs +++ b/mobile_config/tests/integrations/common/gateway_metadata_db.rs @@ -1,76 +1,111 @@ use bs58; use chrono::{DateTime, Utc}; +use futures::{stream, StreamExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; -use sqlx::PgPool; +use sqlx::{PgPool, Postgres, QueryBuilder}; -#[allow(clippy::too_many_arguments)] -pub async fn insert_gateway( +pub struct GatewayInsert { + pub asset: String, + pub location: Option, + pub device_type: String, + pub key: PublicKeyBinary, + pub created_at: DateTime, + pub refreshed_at: Option>, + pub deployment_info: Option, +} + +pub async fn insert_gateway_bulk( pool: &PgPool, - asset: &str, - location: Option, - device_type: &str, - key: PublicKeyBinary, - created_at: DateTime, - refreshed_at: Option>, - deployment_info: Option<&str>, -) { - insert_mobile_hotspot_infos( - pool, - asset, - location, - device_type, - created_at, - refreshed_at, - deployment_info, - ) - .await; - insert_asset_key(pool, asset, key).await; + gateways: &[GatewayInsert], + chunk_size: usize, +) -> anyhow::Result<()> { + stream::iter(gateways.chunks(chunk_size)) + .map(Ok) // convert chunks to a Result for try_for_each_concurrent + .try_for_each_concurrent(20, |chunk| { + let pool = pool.clone(); + async move { + // insert mobile_hotspot_infos + let mut qb = QueryBuilder::::new( + r#" + INSERT INTO mobile_hotspot_infos ( + asset, location, device_type, created_at, + refreshed_at, deployment_info, num_location_asserts + ) + "#, + ); + + qb.push_values(chunk, |mut b, gw| { + let num_locations = if gw.location.is_some() { + Some(1) + } else { + Some(0) + }; + + let device_type_json: serde_json::Value = + serde_json::from_str(&gw.device_type).unwrap(); + let deployment_info_json: serde_json::Value = + serde_json::from_str(gw.deployment_info.as_deref().unwrap_or("null")) + .unwrap(); + + b.push_bind(&gw.asset) + .push_bind(gw.location) + .push_bind(device_type_json) + .push_bind(gw.created_at) + .push_bind(gw.refreshed_at) + .push_bind(deployment_info_json) + .push_bind(num_locations); + }); + + qb.build().execute(&pool).await?; + + // insert key_to_assets + let mut qb1 = QueryBuilder::::new( + r#" + INSERT INTO key_to_assets ( + asset, entity_key + ) + "#, + ); + + qb1.push_values(chunk, |mut b, gw| { + let b58 = bs58::decode(gw.key.to_string()).into_vec().unwrap(); + b.push_bind(&gw.asset).push_bind(b58); + }); + + qb1.build().execute(&pool).await?; + + Ok::<_, anyhow::Error>(()) + } + }) + .await?; + + Ok(()) } -async fn insert_mobile_hotspot_infos( +pub async fn update_gateway( pool: &PgPool, asset: &str, - location: Option, - device_type: &str, - created_at: DateTime, - refreshed_at: Option>, - deployment_info: Option<&str>, -) { - let num_locations = if location.is_some() { Some(1) } else { Some(0) }; + location: i64, + refreshed_at: DateTime, + num_location_asserts: i32, +) -> anyhow::Result<()> { sqlx::query( r#" - INSERT INTO -"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info", "num_location_asserts") - VALUES -($1, $2, $3::jsonb, $4, $5, $6::jsonb, $7); - "#, + UPDATE mobile_hotspot_infos + SET location = $1, + num_location_asserts = $2, + refreshed_at = $3 + WHERE asset = $4 + "#, ) - .bind(asset) .bind(location) - .bind(device_type) - .bind(created_at) + .bind(num_location_asserts) .bind(refreshed_at) - .bind(deployment_info) - .bind(num_locations) - .execute(pool) - .await - .unwrap(); -} - -async fn insert_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) { - let b58 = bs58::decode(key.to_string()).into_vec().unwrap(); - sqlx::query( - r#" - INSERT INTO - "key_to_assets" ("asset", "entity_key") - VALUES ($1, $2); - "#, - ) .bind(asset) - .bind(b58) .execute(pool) - .await - .unwrap(); + .await?; + + Ok(()) } pub async fn create_tables(pool: &PgPool) { diff --git a/mobile_config/tests/integrations/common/mod.rs b/mobile_config/tests/integrations/common/mod.rs index 89e2c6cc2..a23a134f7 100644 --- a/mobile_config/tests/integrations/common/mod.rs +++ b/mobile_config/tests/integrations/common/mod.rs @@ -12,7 +12,9 @@ use std::sync::Arc; use tokio::net::TcpListener; use tonic::transport; +pub mod gateway_db; pub mod gateway_metadata_db; +pub mod partial_migrator; pub fn make_keypair() -> Keypair { Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng) diff --git a/mobile_config/tests/integrations/common/partial_migrator.rs b/mobile_config/tests/integrations/common/partial_migrator.rs new file mode 100644 index 000000000..de90e5788 --- /dev/null +++ b/mobile_config/tests/integrations/common/partial_migrator.rs @@ -0,0 +1,99 @@ +use chrono::Utc; +use sqlx::{ + migrate::{Migration, Migrator}, + PgPool, +}; + +pub struct PartialMigrator { + pool: PgPool, + versions: Vec, + migrator: Migrator, +} + +impl PartialMigrator { + pub async fn new(pool: PgPool, versions: Vec) -> anyhow::Result { + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM _sqlx_migrations") + .fetch_one(&pool) + .await + .unwrap_or(0); + + if count > 0 { + anyhow::bail!("PartialMigrator: database already has applied migrations. Did you forget `migrations = false`?"); + } + + Ok(Self { + pool, + versions, + migrator: sqlx::migrate!(), + }) + } + + pub async fn run_partial(&self) -> anyhow::Result<()> { + { + // Run tmp_migrator to create _sqlx_migrations table + let tmp_dir = tempfile::tempdir()?; + let tmp_migrator = Migrator::new(tmp_dir.path()).await?; + tmp_migrator.run(&self.pool).await?; + } + + // Mark skipped migrations as applied first + for m in self.migrator.iter() { + if self.versions.contains(&m.version) { + tracing::info!("⏭️ Skipping migration {} {}", m.version, m.description); + self.skip_migration(m).await?; + } + } + + // Now run the migrator normally + self.migrator.run(&self.pool).await?; + + Ok(()) + } + + pub async fn run_skipped(&self) -> anyhow::Result<()> { + tracing::info!("Re applying skipped migrations... {:?}", self.versions); + + // Delete skipped migrations first + self.delete_skipped().await?; + + // Now run the migrator normally + self.migrator.run(&self.pool).await?; + + Ok(()) + } + + async fn skip_migration(&self, migration: &Migration) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO _sqlx_migrations + (version, description, installed_on, success, checksum, execution_time) + VALUES ($1, $2, $3, TRUE, $4, 0) + ON CONFLICT (version) DO NOTHING + "#, + ) + .bind(migration.version) + .bind(format!("SKIPPED - {}", migration.description.clone())) + .bind(Utc::now()) + .bind(migration.checksum.as_ref()) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn delete_skipped(&self) -> anyhow::Result<()> { + for version in &self.versions { + sqlx::query( + r#" + DELETE FROM _sqlx_migrations + WHERE version = $1 + "#, + ) + .bind(version) + .execute(&self.pool) + .await?; + } + + Ok(()) + } +} diff --git a/mobile_config/tests/integrations/gateway_db.rs b/mobile_config/tests/integrations/gateway_db.rs index de7f8cb43..4df7844fa 100644 --- a/mobile_config/tests/integrations/gateway_db.rs +++ b/mobile_config/tests/integrations/gateway_db.rs @@ -1,11 +1,10 @@ +use crate::common; use chrono::{TimeZone, Utc}; use futures::{pin_mut, StreamExt}; use helium_crypto::PublicKeyBinary; use mobile_config::gateway::db::{Gateway, GatewayType}; use sqlx::PgPool; -use crate::common; - #[sqlx::test] async fn gateway_insert_and_get_by_address(pool: PgPool) -> anyhow::Result<()> { let addr = pk_binary(); @@ -20,7 +19,7 @@ async fn gateway_insert_and_get_by_address(pool: PgPool) -> anyhow::Result<()> { assert_eq!(gateway.gateway_type, GatewayType::WifiIndoor); assert_eq!(gateway.created_at, common::nanos_trunc(now)); - assert_eq!(gateway.updated_at, common::nanos_trunc(now)); + assert!(gateway.inserted_at > now); assert_eq!(gateway.refreshed_at, common::nanos_trunc(now)); assert_eq!(gateway.last_changed_at, common::nanos_trunc(now)); // first insert: equals refreshed_at assert_eq!(gateway.location, Some(123)); @@ -29,52 +28,32 @@ async fn gateway_insert_and_get_by_address(pool: PgPool) -> anyhow::Result<()> { } #[sqlx::test] -async fn gateway_upsert_last_changed_at_on_location_or_hash_change( - pool: PgPool, -) -> anyhow::Result<()> { +async fn gateway_get_by_address_and_inserted_at(pool: PgPool) -> anyhow::Result<()> { let addr = pk_binary(); - let t0 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap(); - let t1 = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap(); - let t2 = Utc.with_ymd_and_hms(2025, 1, 3, 0, 0, 0).unwrap(); - let t3 = Utc.with_ymd_and_hms(2025, 1, 4, 0, 0, 0).unwrap(); + let now = Utc::now(); - // insert baseline - gw(addr.clone(), GatewayType::WifiOutdoor, t0) - .insert(&pool) - .await?; + // Insert gateway first time + let gateway = gw(addr.clone(), GatewayType::WifiIndoor, now); + gateway.insert(&pool).await?; + + // Insert gateway second time with different type + let gateway = gw(addr.clone(), GatewayType::WifiDataOnly, now); + gateway.insert(&pool).await?; - // upsert with no change (only timestamps move) - let mut same = gw(addr.clone(), GatewayType::WifiOutdoor, t0); - same.updated_at = t1; - same.refreshed_at = t1; - same.last_changed_at = t1; // should be ignored by SQL if no change - same.insert(&pool).await?; - - let after_same = Gateway::get_by_address(&pool, &addr).await?.unwrap(); - assert_eq!(after_same.refreshed_at, t1); - assert_eq!(after_same.last_changed_at, t0); // unchanged - - // upsert with location change -> last_changed_at bumps to refreshed_at (t2) - let mut loc = after_same.clone(); - loc.updated_at = t2; - loc.refreshed_at = t2; - loc.location = Some(456); - loc.insert(&pool).await?; - - let after_loc = Gateway::get_by_address(&pool, &addr).await?.unwrap(); - assert_eq!(after_loc.location, Some(456)); - assert_eq!(after_loc.last_changed_at, t2); - - // upsert with hash change (location same) -> last_changed_at bumps again - let mut h = after_loc.clone(); - h.updated_at = t3; - h.refreshed_at = t3; - h.hash = "h1".into(); - h.insert(&pool).await?; - - let after_hash = Gateway::get_by_address(&pool, &addr).await?.unwrap(); - assert_eq!(after_hash.hash, "h1"); - assert_eq!(after_hash.last_changed_at, t3); + let later = now + chrono::Duration::minutes(10); + + let gateway = Gateway::get_by_address_and_inserted_at(&pool, &addr, &later) + .await? + .expect("gateway should exist"); + + // Assert most recent gateway was returned + assert_eq!(gateway.gateway_type, GatewayType::WifiDataOnly); + assert_eq!(gateway.created_at, common::nanos_trunc(now)); + assert!(gateway.inserted_at > now); + assert_eq!(gateway.refreshed_at, common::nanos_trunc(now)); + assert_eq!(gateway.last_changed_at, common::nanos_trunc(now)); + assert_eq!(gateway.location, Some(123)); + assert_eq!(gateway.hash, "h0"); Ok(()) } @@ -99,7 +78,7 @@ async fn gateway_bulk_insert_and_get(pool: PgPool) -> anyhow::Result<()> { .await? .expect("row should exist"); assert_eq!(got.created_at, common::nanos_trunc(now)); - assert_eq!(got.updated_at, common::nanos_trunc(now)); + assert!(got.inserted_at > now); assert_eq!(got.refreshed_at, common::nanos_trunc(now)); assert_eq!(got.last_changed_at, common::nanos_trunc(now)); assert_eq!(got.location, Some(123)); @@ -109,87 +88,6 @@ async fn gateway_bulk_insert_and_get(pool: PgPool) -> anyhow::Result<()> { Ok(()) } -#[sqlx::test] -async fn gateway_bulk_upsert_updates_and_change(pool: PgPool) -> anyhow::Result<()> { - // seed two rows at t0 - let t0 = Utc::now(); - let a1 = pk_binary(); - let a2 = pk_binary(); - - let mut g1 = gw(a1.clone(), GatewayType::WifiIndoor, t0); - let mut g2 = gw(a2.clone(), GatewayType::WifiOutdoor, t0); - let _ = Gateway::insert_bulk(&pool, &[g1.clone(), g2.clone()]).await?; - - // upsert at t1: change only timestamps for g1 (no loc/hash change) - // and change location for g2 (should bump last_changed_at) - let t1 = Utc::now(); - - g1.updated_at = t1; - g1.refreshed_at = t1; - // leave g1.location / g1.hash the same - - g2.updated_at = t1; - g2.refreshed_at = t1; - g2.location = Some(456); // change => last_changed_at should bump to t1 - // g2.hash unchanged - - let affected = Gateway::insert_bulk(&pool, &[g1.clone(), g2.clone()]).await?; - // 2 rows should be affected (both upserted) - assert_eq!(affected, 2); - - // verify g1: timestamps updated, last_changed_at unchanged (no relevant change) - let got1 = Gateway::get_by_address(&pool, &a1) - .await? - .expect("row should exist"); - assert_eq!(got1.updated_at, common::nanos_trunc(t1)); - assert_eq!(got1.refreshed_at, common::nanos_trunc(t1)); - assert_eq!( - got1.last_changed_at, - common::nanos_trunc(t0), - "no loc/hash change ⇒ last_changed_at stays t0" - ); - assert_eq!(got1.location, Some(123)); - assert_eq!(got1.hash, "h0"); - - // verify g2: timestamps updated, last_changed_at bumped due to location change - let got2 = Gateway::get_by_address(&pool, &a2) - .await? - .expect("row should exist"); - assert_eq!(got2.updated_at, common::nanos_trunc(t1)); - assert_eq!(got2.refreshed_at, common::nanos_trunc(t1)); - assert_eq!( - got2.last_changed_at, - common::nanos_trunc(t1), - "location changed ⇒ last_changed_at = refreshed_at" - ); - assert_eq!(got2.location, Some(456)); - assert_eq!(got2.hash, "h0"); - - // second upsert at t2: change hash only for g1, ensure bump - let t2 = Utc::now(); - - g1.updated_at = t2; - g1.refreshed_at = t2; - g1.hash = "h1".into(); // change ⇒ bump last_changed_at - - let affected2 = Gateway::insert_bulk(&pool, &[g1.clone()]).await?; - assert_eq!(affected2, 1); - - let got1b = Gateway::get_by_address(&pool, &a1) - .await? - .expect("row should exist"); - assert_eq!(got1b.updated_at, common::nanos_trunc(t2)); - assert_eq!(got1b.refreshed_at, common::nanos_trunc(t2)); - assert_eq!( - got1b.last_changed_at, - common::nanos_trunc(t2), - "hash changed ⇒ last_changed_at = refreshed_at" - ); - assert_eq!(got1b.hash, "h1"); - - Ok(()) -} - #[sqlx::test] async fn stream_by_addresses_filters_by_min_last_changed_at(pool: PgPool) -> anyhow::Result<()> { let a1 = pk_binary(); @@ -213,8 +111,8 @@ async fn stream_by_addresses_filters_by_min_last_changed_at(pool: PgPool) -> any // bump g1.last_changed_at to t2 let mut g1b = g1.clone(); - g1b.hash = "h1".to_string(); - g1b.refreshed_at = t2; + g1b.hash = "x1".to_string(); + g1b.last_changed_at = t2; g1b.insert(&pool).await?; // now we should see g1 only @@ -240,7 +138,8 @@ async fn stream_by_types_filters_by_min_date(pool: PgPool) -> anyhow::Result<()> gw(pk_binary(), GatewayType::WifiOutdoor, t1) .insert(&pool) .await?; - gw(pk_binary(), GatewayType::WifiIndoor, t2) + let key = pk_binary(); + gw(key.clone(), GatewayType::WifiIndoor, t2) .insert(&pool) .await?; @@ -249,7 +148,7 @@ async fn stream_by_types_filters_by_min_date(pool: PgPool) -> anyhow::Result<()> pin_mut!(s); let first = s.next().await.expect("row expected"); assert_eq!(first.gateway_type, GatewayType::WifiIndoor); - assert_eq!(first.created_at, t2); + assert_eq!(first.address, key); assert!(s.next().await.is_none()); Ok(()) @@ -292,9 +191,9 @@ fn gw(address: PublicKeyBinary, gateway_type: GatewayType, t: chrono::DateTime anyhow::Result<()> { // NOTE(mj): The information we're requesting does not exist in the DB for - // this test. But we're only interested in Authization Errors. + // this test. But we're only interested in Authorization Errors. let admin_key = make_keypair(); // unlimited access let gw_key = make_keypair(); // access to self @@ -117,7 +117,7 @@ async fn gateway_stream_info_v1(pool: PgPool) -> anyhow::Result<()> { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at: now, - updated_at: now, + inserted_at: now, refreshed_at: now, last_changed_at: now, hash: "".to_string(), @@ -134,7 +134,7 @@ async fn gateway_stream_info_v1(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at: now_plus_10, - updated_at: now_plus_10, + inserted_at: now_plus_10, refreshed_at: now_plus_10, last_changed_at: now_plus_10, hash: "".to_string(), @@ -182,7 +182,7 @@ async fn gateway_stream_info_v2_by_type(pool: PgPool) -> anyhow::Result<()> { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at: now, - updated_at: now, + inserted_at: now, refreshed_at: now, last_changed_at: now, hash: "".to_string(), @@ -199,7 +199,7 @@ async fn gateway_stream_info_v2_by_type(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at: now_plus_10, - updated_at: now_plus_10, + inserted_at: now_plus_10, refreshed_at: now_plus_10, last_changed_at: now_plus_10, hash: "".to_string(), @@ -247,21 +247,21 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { let loc4 = 0x8c44a82aed527ff_u64; let created_at = Utc::now() - Duration::hours(5); - let updated_at = Utc::now() - Duration::hours(3); + let inserted_at = Utc::now() - Duration::hours(3); let gateway1 = Gateway { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at, - refreshed_at: updated_at, - last_changed_at: updated_at, + inserted_at, + refreshed_at: inserted_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: None, elevation: None, azimuth: None, location: Some(loc1), - location_changed_at: Some(updated_at), + location_changed_at: Some(inserted_at), location_asserts: Some(1), }; gateway1.insert(&pool).await?; @@ -270,15 +270,15 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at, - refreshed_at: updated_at, - last_changed_at: updated_at, + inserted_at, + refreshed_at: inserted_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: Some(1), elevation: None, azimuth: None, location: Some(loc2), - location_changed_at: Some(updated_at), + location_changed_at: Some(inserted_at), location_asserts: Some(1), }; gateway2.insert(&pool).await?; @@ -287,15 +287,15 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { address: address3.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at, - updated_at, - refreshed_at: updated_at, - last_changed_at: updated_at, + inserted_at, + refreshed_at: inserted_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: Some(1), elevation: Some(2), azimuth: Some(3), location: Some(loc3), - location_changed_at: Some(updated_at), + location_changed_at: Some(inserted_at), location_asserts: Some(1), }; gateway3.insert(&pool).await?; @@ -304,7 +304,7 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { address: address4.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at, - updated_at: created_at, + inserted_at: created_at, refreshed_at: created_at, last_changed_at: created_at, hash: "".to_string(), @@ -320,8 +320,8 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; let mut client = GatewayClient::connect(addr).await?; - let res = - gateway_info_stream_v2(&mut client, &admin_key, &[], updated_at.timestamp() as u64).await?; + let res = gateway_info_stream_v2(&mut client, &admin_key, &[], inserted_at.timestamp() as u64) + .await?; assert_eq!(res.gateways.len(), 3); let gateways = res.gateways; @@ -335,7 +335,7 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { u64::from_str_radix(&gw1.metadata.clone().unwrap().location, 16).unwrap(), loc1 ); - assert_eq!(gw1.updated_at, updated_at.timestamp() as u64); + assert_eq!(gw1.updated_at, inserted_at.timestamp() as u64); assert_eq!(gw1.metadata.clone().unwrap().deployment_info, None); let gw2 = gateways @@ -347,7 +347,7 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { u64::from_str_radix(&gw2.metadata.clone().unwrap().location, 16).unwrap(), loc2 ); - assert_eq!(gw2.updated_at, updated_at.timestamp() as u64); + assert_eq!(gw2.updated_at, inserted_at.timestamp() as u64); let deployment_info = gw2.metadata.clone().unwrap().deployment_info.unwrap(); match deployment_info { DeploymentInfo::WifiDeploymentInfo(v) => { @@ -369,7 +369,7 @@ async fn gateway_stream_info_v2(pool: PgPool) -> anyhow::Result<()> { u64::from_str_radix(&gw3.metadata.clone().unwrap().location, 16).unwrap(), loc3 ); - assert_eq!(gw3.updated_at, updated_at.timestamp() as u64); + assert_eq!(gw3.updated_at, inserted_at.timestamp() as u64); let deployment_info = gw3.metadata.clone().unwrap().deployment_info.unwrap(); match deployment_info { DeploymentInfo::WifiDeploymentInfo(v) => { @@ -399,21 +399,21 @@ async fn gateway_info_batch_v1(pool: PgPool) -> anyhow::Result<()> { let loc2 = 631711286145955327_u64; let created_at = Utc::now() - Duration::hours(5); - let updated_at = Utc::now() - Duration::hours(3); + let inserted_at = Utc::now() - Duration::hours(3); let gateway1 = Gateway { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at, - refreshed_at: updated_at, - last_changed_at: updated_at, + inserted_at, + refreshed_at: inserted_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: Some(18), elevation: Some(2), azimuth: Some(161), location: Some(loc1), - location_changed_at: Some(updated_at), + location_changed_at: Some(inserted_at), location_asserts: Some(1), }; gateway1.insert(&pool).await?; @@ -422,7 +422,7 @@ async fn gateway_info_batch_v1(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at, - updated_at: created_at, + inserted_at: created_at, refreshed_at: created_at, last_changed_at: created_at, hash: "".to_string(), @@ -485,21 +485,21 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { let loc2 = 631711286145955327_u64; let created_at = Utc::now() - Duration::hours(5); - let updated_at = Utc::now() - Duration::hours(3); + let inserted_at = Utc::now() - Duration::hours(3); let gateway1 = Gateway { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at, - refreshed_at: updated_at, - last_changed_at: updated_at, + inserted_at, + refreshed_at: inserted_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: Some(18), elevation: Some(2), azimuth: Some(161), location: Some(loc1), - location_changed_at: Some(updated_at), + location_changed_at: Some(inserted_at), location_asserts: Some(1), }; gateway1.insert(&pool).await?; @@ -508,7 +508,7 @@ async fn gateway_info_batch_v2(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at, - updated_at: created_at, + inserted_at: created_at, refreshed_at: created_at, last_changed_at: created_at, hash: "".to_string(), @@ -594,13 +594,13 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< let created_at = Utc::now() - Duration::hours(5); let refreshed_at = Utc::now() - Duration::hours(3); - let updated_at = Utc::now() - Duration::hours(4); + let inserted_at = Utc::now() - Duration::hours(4); let gateway1 = Gateway { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at: refreshed_at, + inserted_at: refreshed_at, refreshed_at, last_changed_at: refreshed_at, hash: "".to_string(), @@ -617,7 +617,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< address: address2.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at: created_at, + inserted_at: created_at, refreshed_at: created_at, last_changed_at: created_at, hash: "".to_string(), @@ -634,15 +634,15 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< address: address3.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at, - updated_at, + inserted_at, refreshed_at, - last_changed_at: updated_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: Some(18), elevation: Some(2), azimuth: Some(161), location: Some(loc3), - location_changed_at: Some(updated_at), + location_changed_at: Some(inserted_at), location_asserts: Some(1), }; gateway3.insert(&pool).await?; @@ -651,9 +651,9 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< address: address4.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at, + inserted_at, refreshed_at: created_at, - last_changed_at: updated_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: Some(18), elevation: Some(2), @@ -707,7 +707,7 @@ async fn gateway_info_batch_v2_updated_at_check(pool: PgPool) -> anyhow::Result< .find(|v| v.address == address3.to_vec()) .unwrap() .updated_at, - updated_at.timestamp() as u64 + inserted_at.timestamp() as u64 ); Ok(()) @@ -730,7 +730,7 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at: refreshed_at, + inserted_at: refreshed_at, refreshed_at, last_changed_at: refreshed_at, hash: "".to_string(), @@ -747,7 +747,7 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at: created_at, + inserted_at: created_at, refreshed_at: created_at, last_changed_at: created_at, hash: "".to_string(), @@ -812,6 +812,109 @@ async fn gateway_info_v2(pool: PgPool) -> anyhow::Result<()> { Ok(()) } +#[sqlx::test] +async fn gateway_info_at_timestamp(pool: PgPool) -> anyhow::Result<()> { + let admin_key = make_keypair(); + + let address = make_keypair().public_key().clone(); + let loc_original = 631711281837647359_u64; + let loc_recent = 631711281837647358_u64; + + let created_at = Utc::now() - Duration::hours(5); + let refreshed_at = Utc::now() - Duration::hours(3); + + let gateway_original = Gateway { + address: address.clone().into(), + gateway_type: GatewayType::WifiIndoor, + created_at, + inserted_at: refreshed_at, + refreshed_at, + last_changed_at: refreshed_at, + hash: "".to_string(), + antenna: Some(10), + elevation: Some(4), + azimuth: Some(168), + location: Some(loc_original), + location_changed_at: Some(refreshed_at), + location_asserts: Some(1), + }; + gateway_original.insert(&pool).await?; + + let pubkey = address.clone().into(); + + // Change original gateway's inserted_at value to 10 minutes ago + let new_inserted_at = Utc::now() - Duration::minutes(10); + update_gateway_inserted_at(&pool, &pubkey, &new_inserted_at).await?; + + let query_time_original = Utc::now(); + + let gateway_recent = Gateway { + address: address.clone().into(), + gateway_type: GatewayType::WifiIndoor, + created_at, + inserted_at: created_at, + refreshed_at: created_at, + last_changed_at: created_at, + hash: "".to_string(), + antenna: Some(18), + elevation: Some(2), + azimuth: Some(161), + location: Some(loc_recent), + location_changed_at: Some(created_at), + location_asserts: Some(1), + }; + gateway_recent.insert(&pool).await?; + + let (addr, _handle) = spawn_gateway_service(pool.clone(), admin_key.public_key().clone()).await; + let mut client = GatewayClient::connect(addr).await?; + + // Get most recent gateway info + let query_time_recent = Utc::now() + Duration::minutes(10); + let res = + info_at_timestamp_request(&mut client, &address, &admin_key, &query_time_recent).await; + + // Assert that recent gateway was returned + let gw_info = res?.info.unwrap(); + assert_eq!(gw_info.address, address.to_vec()); + let deployment_info = gw_info.metadata.clone().unwrap().deployment_info.unwrap(); + match deployment_info { + DeploymentInfo::WifiDeploymentInfo(v) => { + assert_eq!(v.antenna, 18); + assert_eq!(v.azimuth, 161); + assert_eq!(v.elevation, 2); + } + DeploymentInfo::CbrsDeploymentInfo(_) => panic!(), + }; + assert_eq!( + u64::from_str_radix(&gw_info.metadata.clone().unwrap().location, 16).unwrap(), + loc_recent + ); + + // Get original gateway info by using an earlier inserted_at condition + let res = + info_at_timestamp_request(&mut client, &address, &admin_key, &query_time_original).await; + + // Assert that original gateway was returned + let gw_info = res?.info.unwrap(); + assert_eq!(gw_info.address, address.to_vec()); + let deployment_info = gw_info.metadata.clone().unwrap().deployment_info.unwrap(); + match deployment_info { + DeploymentInfo::WifiDeploymentInfo(v) => { + assert_eq!(v.antenna, 10); + assert_eq!(v.azimuth, 168); + assert_eq!(v.elevation, 4); + } + DeploymentInfo::CbrsDeploymentInfo(_) => panic!(), + }; + + assert_eq!( + u64::from_str_radix(&gw_info.metadata.clone().unwrap().location, 16).unwrap(), + loc_original + ); + + Ok(()) +} + fn make_signed_info_request(address: &PublicKey, signer: &Keypair) -> proto::GatewayInfoReqV1 { let mut req = proto::GatewayInfoReqV1 { address: address.to_vec(), @@ -837,6 +940,23 @@ async fn info_request_v2( Ok(res) } +async fn info_at_timestamp_request( + client: &mut GatewayClient, + address: &PublicKey, + signer: &Keypair, + query_time: &DateTime, +) -> anyhow::Result { + let mut req = proto::GatewayInfoAtTimestampReqV1 { + address: address.to_vec(), + signer: signer.public_key().to_vec(), + signature: vec![], + query_time: query_time.timestamp() as u64, + }; + req.signature = signer.sign(&req.encode_to_vec()).unwrap(); + let res = client.info_at_timestamp(req).await?.into_inner(); + Ok(res) +} + async fn gateway_info_stream_v1( client: &mut GatewayClient, signer: &Keypair, @@ -866,14 +986,14 @@ async fn gateway_info_stream_v2( client: &mut GatewayClient, signer: &Keypair, device_types: &[DeviceType], - min_updated_at: u64, + min_inserted_at: u64, ) -> anyhow::Result { let mut req = GatewayInfoStreamReqV2 { batch_size: 10000, signer: signer.public_key().to_vec(), signature: vec![], device_types: device_types.iter().map(|v| DeviceType::into(*v)).collect(), - min_updated_at, + min_updated_at: min_inserted_at, }; req.signature = signer.sign(&req.encode_to_vec()).unwrap(); @@ -922,3 +1042,23 @@ async fn info_batch_v2( Ok(stream) } + +async fn update_gateway_inserted_at( + pool: &PgPool, + address: &PublicKeyBinary, + new_inserted_at: &DateTime, +) -> anyhow::Result<()> { + sqlx::query( + r#" + UPDATE gateways + SET inserted_at = $1 + WHERE address = $2; + "#, + ) + .bind(new_inserted_at) + .bind(address.as_ref()) + .execute(pool) + .await?; + + Ok(()) +} diff --git a/mobile_config/tests/integrations/gateway_service_v3.rs b/mobile_config/tests/integrations/gateway_service_v3.rs index c0457ec90..1ee5b311e 100644 --- a/mobile_config/tests/integrations/gateway_service_v3.rs +++ b/mobile_config/tests/integrations/gateway_service_v3.rs @@ -29,7 +29,7 @@ async fn gateway_stream_info_v3_basic(pool: PgPool) -> anyhow::Result<()> { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at: now, - updated_at: now, + inserted_at: now, refreshed_at: now, last_changed_at: now_plus_10, hash: "".to_string(), @@ -46,7 +46,7 @@ async fn gateway_stream_info_v3_basic(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiOutdoor, created_at: now_plus_10, - updated_at: now_plus_10, + inserted_at: now_plus_10, refreshed_at: now_plus_10, last_changed_at: now_plus_10, hash: "".to_string(), @@ -107,7 +107,7 @@ async fn gateway_stream_info_v3_no_metadata(pool: PgPool) -> anyhow::Result<()> address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at: now, - updated_at: now, + inserted_at: now, refreshed_at: now, last_changed_at: now_plus_10, hash: "".to_string(), @@ -153,7 +153,7 @@ async fn gateway_stream_info_v3_no_deployment_info(pool: PgPool) -> anyhow::Resu address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at: now, - updated_at: now, + inserted_at: now, refreshed_at: now, last_changed_at: now_plus_10, hash: "".to_string(), @@ -205,21 +205,21 @@ async fn gateway_stream_info_v3_updated_at(pool: PgPool) -> anyhow::Result<()> { let loc2 = 631711286145955327_u64; let created_at = Utc::now() - Duration::hours(5); - let updated_at = Utc::now() - Duration::hours(3); + let inserted_at = Utc::now() - Duration::hours(3); let gateway1 = Gateway { address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at, - updated_at, - refreshed_at: updated_at, - last_changed_at: updated_at, + inserted_at, + refreshed_at: inserted_at, + last_changed_at: inserted_at, hash: "".to_string(), antenna: Some(18), elevation: Some(2), azimuth: Some(161), location: Some(loc1), - location_changed_at: Some(updated_at), + location_changed_at: Some(inserted_at), location_asserts: Some(1), }; gateway1.insert(&pool).await?; @@ -228,7 +228,7 @@ async fn gateway_stream_info_v3_updated_at(pool: PgPool) -> anyhow::Result<()> { address: address2.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at, - updated_at: created_at, + inserted_at: created_at, refreshed_at: created_at, last_changed_at: created_at, hash: "".to_string(), @@ -248,7 +248,7 @@ async fn gateway_stream_info_v3_updated_at(pool: PgPool) -> anyhow::Result<()> { &mut client, &admin_key, &[], - updated_at.timestamp() as u64, + inserted_at.timestamp() as u64, 0, ) .await?; @@ -299,7 +299,7 @@ async fn gateway_stream_info_v3_min_location_changed_at_zero(pool: PgPool) -> an address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at: now_minus_six, - updated_at: now_minus_six, + inserted_at: now_minus_six, refreshed_at: now_minus_six, last_changed_at: now_minus_three, hash: "".to_string(), @@ -316,7 +316,7 @@ async fn gateway_stream_info_v3_min_location_changed_at_zero(pool: PgPool) -> an address: address2.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at: now_minus_six, - updated_at: now_minus_six, + inserted_at: now_minus_six, refreshed_at: now_minus_six, last_changed_at: now_minus_three, hash: "".to_string(), @@ -370,7 +370,7 @@ async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) -> anyhow::Res address: address1.clone().into(), gateway_type: GatewayType::WifiIndoor, created_at: now_minus_six, - updated_at: now_minus_six, + inserted_at: now_minus_six, refreshed_at: now, last_changed_at: now_minus_three, hash: "".to_string(), @@ -387,7 +387,7 @@ async fn gateway_stream_info_v3_location_changed_at(pool: PgPool) -> anyhow::Res address: address2.clone().into(), gateway_type: GatewayType::WifiDataOnly, created_at: now_minus_six, - updated_at: now_minus_six, + inserted_at: now_minus_six, refreshed_at: now, last_changed_at: now_minus_three, hash: "".to_string(), @@ -449,7 +449,7 @@ async fn gateway_info_stream_v3( client: &mut GatewayClient, signer: &Keypair, device_types: &[DeviceTypeV2], - min_updated_at: u64, + min_inserted_at: u64, min_location_changed_at: u64, ) -> anyhow::Result { let mut req = GatewayInfoStreamReqV3 { @@ -460,7 +460,7 @@ async fn gateway_info_stream_v3( .iter() .map(|v| DeviceTypeV2::into(*v)) .collect(), - min_updated_at, + min_updated_at: min_inserted_at, min_location_changed_at, }; req.signature = signer.sign(&req.encode_to_vec())?; diff --git a/mobile_config/tests/integrations/gateway_tracker.rs b/mobile_config/tests/integrations/gateway_tracker.rs index 79a0e7aa4..c356553ec 100644 --- a/mobile_config/tests/integrations/gateway_tracker.rs +++ b/mobile_config/tests/integrations/gateway_tracker.rs @@ -1,53 +1,120 @@ -use crate::common::{ - gateway_metadata_db::{create_tables, insert_gateway}, - make_keypair, -}; +use crate::common::{gateway_metadata_db, make_keypair}; use chrono::{Timelike, Utc}; -use custom_tracing::Settings; use mobile_config::gateway::{ db::{Gateway, GatewayType}, tracker, }; +use rand::{seq::SliceRandom, thread_rng}; use sqlx::PgPool; #[sqlx::test] -async fn execute_test(pool: PgPool) -> anyhow::Result<()> { - custom_tracing::init("mobile_config=debug,info".to_string(), Settings::default()).await?; +async fn test_gateway_tracker_updates_changed_gateways(pool: PgPool) -> anyhow::Result<()> { + const TOTAL: usize = 2_000; - let pubkey1 = make_keypair().public_key().clone(); - let hex1 = 631711281837647359_i64; let now = Utc::now() .with_nanosecond(Utc::now().timestamp_subsec_micros() * 1000) .unwrap(); - create_tables(&pool).await; - insert_gateway( - &pool, - "asset1", - Some(hex1), - "\"wifiIndoor\"", - pubkey1.clone().into(), - now, - Some(now), - None, - ) - .await; + // ensure tables exist + gateway_metadata_db::create_tables(&pool).await; + + // Prepare the bulk insert data + let gateways: Vec = (0..TOTAL) + .map(|i| { + let pubkey = make_keypair().public_key().clone(); + let hex_val = 631_711_281_837_647_359_i64 + i as i64; + + gateway_metadata_db::GatewayInsert { + asset: format!("asset{}", i), + location: Some(hex_val), + device_type: "\"wifiIndoor\"".to_string(), + key: pubkey.into(), + created_at: now, + refreshed_at: Some(now), + deployment_info: None, + } + }) + .collect(); + + // Bulk insert all gateways in one shot + gateway_metadata_db::insert_gateway_bulk(&pool, &gateways, 1000).await?; + + tracing::info!("inserted {} gateways, running tracker", TOTAL); + + // now run the tracker execute function + tracker::execute(&pool, &pool).await?; + + // Check that we have TOTAL gateways in the DB + let total = count_gateways(&pool).await?; + assert_eq!(TOTAL as i64, total); + + // Sample 100 gateways to verify + let mut rng = thread_rng(); + let sample_size = 100; + let sample: Vec<_> = gateways.choose_multiple(&mut rng, sample_size).collect(); + + let new_loc = 0_i64; + let now = Utc::now() + .with_nanosecond(Utc::now().timestamp_subsec_micros() * 1000) + .unwrap(); + + for gw_insert in sample.clone() { + let gateway = Gateway::get_by_address(&pool, &gw_insert.key) + .await? + .expect("gateway not found"); + + assert_eq!(gateway.address, gw_insert.key.clone()); + assert_eq!(gateway.gateway_type, GatewayType::WifiIndoor); + assert_eq!(gateway.created_at, gw_insert.created_at); + assert_eq!(Some(gateway.refreshed_at), gw_insert.refreshed_at); + assert_eq!(Some(gateway.last_changed_at), gw_insert.refreshed_at); + assert_eq!(gateway.antenna, None); + assert_eq!(gateway.elevation, None); + assert_eq!(gateway.azimuth, None); + assert_eq!(gateway.location, gw_insert.location.map(|v| v as u64)); + assert_eq!(gateway.location_changed_at, gw_insert.refreshed_at); // matches logic in tracker + assert_eq!(gateway.location_asserts, gw_insert.location.map(|_| 1)); + + // Update sample gateways + gateway_metadata_db::update_gateway(&pool, &gw_insert.asset, new_loc, now, 2).await?; + } + + // now run the tracker again after updates tracker::execute(&pool, &pool).await?; - let gateway1 = Gateway::get_by_address(&pool, &pubkey1.clone().into()) - .await? - .expect("asset1 gateway not found"); - - assert_eq!(gateway1.address, pubkey1.clone().into()); - assert_eq!(gateway1.gateway_type, GatewayType::WifiIndoor); - assert_eq!(gateway1.created_at, now); - assert_eq!(gateway1.refreshed_at, now); - assert_eq!(gateway1.antenna, None); - assert_eq!(gateway1.elevation, None); - assert_eq!(gateway1.azimuth, None); - assert_eq!(gateway1.location, Some(hex1 as u64)); - assert_eq!(gateway1.location_changed_at, Some(now)); - assert_eq!(gateway1.location_asserts, Some(1)); + // We should have TOTAL + sample_size gateways in the DB + let total = count_gateways(&pool).await?; + assert_eq!(TOTAL as i64 + sample_size as i64, total); + + for gw_insert in sample.clone() { + let gateway = Gateway::get_by_address(&pool, &gw_insert.key) + .await? + .expect("gateway not found"); + + assert_eq!(gateway.address, gw_insert.key.clone()); + assert_eq!(gateway.gateway_type, GatewayType::WifiIndoor); + assert_eq!(gateway.created_at, gw_insert.created_at); + assert_eq!(gateway.refreshed_at, now); + assert_eq!(gateway.last_changed_at, now); + assert_eq!(gateway.antenna, None); + assert_eq!(gateway.elevation, None); + assert_eq!(gateway.azimuth, None); + assert_eq!(gateway.location, Some(0)); + assert_eq!(gateway.location_changed_at, Some(now)); + assert_eq!(gateway.location_asserts, Some(2)); + } Ok(()) } + +async fn count_gateways(pool: &PgPool) -> anyhow::Result { + let count = sqlx::query_scalar( + r#" + SELECT COUNT(*) FROM gateways; + "#, + ) + .fetch_one(pool) + .await?; + + Ok(count) +} diff --git a/mobile_config/tests/integrations/main.rs b/mobile_config/tests/integrations/main.rs index 358b16a15..318f93eb7 100644 --- a/mobile_config/tests/integrations/main.rs +++ b/mobile_config/tests/integrations/main.rs @@ -5,3 +5,4 @@ mod gateway_db; mod gateway_service; mod gateway_service_v3; mod gateway_tracker; +mod migrations; diff --git a/mobile_config/tests/integrations/migrations.rs b/mobile_config/tests/integrations/migrations.rs new file mode 100644 index 000000000..d09fb7642 --- /dev/null +++ b/mobile_config/tests/integrations/migrations.rs @@ -0,0 +1,76 @@ +use crate::common::{self, gateway_db::PreHistoricalGateway, partial_migrator::PartialMigrator}; +use chrono::{Duration, Utc}; +use helium_crypto::PublicKeyBinary; +use mobile_config::gateway::db::{Gateway, GatewayType}; +use sqlx::PgPool; + +#[sqlx::test(migrations = false)] +async fn gateways_historical(pool: PgPool) -> anyhow::Result<()> { + let partial_migrator = PartialMigrator::new(pool.clone(), vec![20251003000000]).await?; + + partial_migrator.run_partial().await?; + + let address = pk_binary(); + let now = Utc::now(); + let one_min_ago = now - Duration::minutes(1); + + let pre_gw = PreHistoricalGateway { + address: address.clone(), + gateway_type: GatewayType::WifiIndoor, + created_at: one_min_ago, + updated_at: now, + refreshed_at: now, + last_changed_at: now, + hash: "h0".to_string(), + antenna: Some(1), + elevation: Some(2), + azimuth: Some(3), + location: Some(123), + location_changed_at: Some(now), + location_asserts: Some(1), + }; + + pre_gw.insert(&pool).await?; + + partial_migrator.run_skipped().await?; + + let gw = Gateway::get_by_address(&pool, &address) + .await? + .expect("should find gateway"); + + assert_eq!(pre_gw.address, gw.address); + assert_eq!(pre_gw.gateway_type, gw.gateway_type); + assert_eq!( + common::nanos_trunc(pre_gw.created_at), + common::nanos_trunc(gw.created_at) + ); + // The real change is updated_at renamed to inserted_at AND inserted_at = created_at; + assert_eq!( + common::nanos_trunc(pre_gw.created_at), + common::nanos_trunc(gw.inserted_at) + ); + assert_eq!( + common::nanos_trunc(pre_gw.refreshed_at), + common::nanos_trunc(gw.refreshed_at) + ); + assert_eq!( + common::nanos_trunc(pre_gw.last_changed_at), + common::nanos_trunc(gw.last_changed_at) + ); + assert_eq!(pre_gw.hash, gw.hash); + assert_eq!(pre_gw.antenna, gw.antenna); + assert_eq!(pre_gw.elevation, gw.elevation); + assert_eq!(pre_gw.azimuth, gw.azimuth); + assert_eq!(pre_gw.location, gw.location); + assert_eq!( + common::nanos_trunc(pre_gw.location_changed_at.unwrap()), + common::nanos_trunc(gw.location_changed_at.unwrap()) + ); + assert_eq!(pre_gw.location_asserts, gw.location_asserts); + + Ok(()) +} + +fn pk_binary() -> PublicKeyBinary { + common::make_keypair().public_key().clone().into() +} diff --git a/mobile_config_cli/src/client.rs b/mobile_config_cli/src/client.rs index 7a74a66ab..b4c093abb 100644 --- a/mobile_config_cli/src/client.rs +++ b/mobile_config_cli/src/client.rs @@ -12,8 +12,8 @@ use helium_proto::{ AdminAddKeyReqV1, AdminKeyResV1, AdminRemoveKeyReqV1, AuthorizationListReqV1, AuthorizationListResV1, AuthorizationVerifyReqV1, AuthorizationVerifyResV1, CarrierIncentivePromotionListReqV1, CarrierIncentivePromotionListResV1, EntityVerifyReqV1, - EntityVerifyResV1, GatewayInfoBatchReqV1, GatewayInfoReqV1, GatewayInfoResV2, - GatewayInfoStreamResV2, + EntityVerifyResV1, GatewayInfoAtTimestampReqV1, GatewayInfoBatchReqV1, GatewayInfoReqV1, + GatewayInfoResV2, GatewayInfoStreamResV2, }, Message, }; @@ -269,6 +269,27 @@ impl GatewayClient { Ok(stream) } + + pub async fn info_at_timestamp( + &mut self, + gateway: &PublicKey, + query_time: u64, + keypair: &Keypair, + ) -> Result { + let mut request = GatewayInfoAtTimestampReqV1 { + address: gateway.into(), + query_time, + signer: keypair.public_key().into(), + signature: vec![], + }; + request.signature = request.sign(keypair)?; + let response = self.client.info_at_timestamp(request).await?.into_inner(); + response.verify(&self.server_pubkey)?; + let info = response + .info + .ok_or_else(|| anyhow::anyhow!("gateway not found"))?; + GatewayInfo::try_from(info) + } } pub trait MsgSign: Message + std::clone::Clone { @@ -296,6 +317,7 @@ impl_sign!(AuthorizationListReqV1, signature); impl_sign!(EntityVerifyReqV1, signature); impl_sign!(GatewayInfoReqV1, signature); impl_sign!(GatewayInfoBatchReqV1, signature); +impl_sign!(GatewayInfoAtTimestampReqV1, signature); impl_sign!(CarrierIncentivePromotionListReqV1, signature); pub trait MsgVerify: Message + std::clone::Clone { diff --git a/mobile_config_cli/src/cmds/gateway.rs b/mobile_config_cli/src/cmds/gateway.rs index 3f74e4a7e..8c45a32ce 100644 --- a/mobile_config_cli/src/cmds/gateway.rs +++ b/mobile_config_cli/src/cmds/gateway.rs @@ -1,4 +1,4 @@ -use super::{GetHotspot, GetHotspotBatch, PathBufKeypair}; +use super::{GetHotspot, GetHotspotAtTimestamp, GetHotspotBatch, PathBufKeypair}; use crate::{client, Msg, PrettyJson, Result}; use angry_purple_tiger::AnimalName; use futures::StreamExt; @@ -59,6 +59,20 @@ pub async fn info_batch(args: GetHotspotBatch) -> Result { } } +pub async fn info_at_timestamp(args: GetHotspotAtTimestamp) -> Result { + let mut client = client::GatewayClient::new(&args.config_host, &args.config_pubkey).await?; + match client + .info_at_timestamp(&args.hotspot, args.query_time, &args.keypair.to_keypair()?) + .await + { + Ok(info) => Msg::ok(info.pretty_json()?), + Err(err) => Msg::err(format!( + "failed to retrieve {} info: {}", + &args.hotspot, err + )), + } +} + impl TryFrom for GatewayInfo { type Error = anyhow::Error; diff --git a/mobile_config_cli/src/cmds/mod.rs b/mobile_config_cli/src/cmds/mod.rs index 65baf823c..0c0808a0c 100644 --- a/mobile_config_cli/src/cmds/mod.rs +++ b/mobile_config_cli/src/cmds/mod.rs @@ -167,6 +167,8 @@ pub enum GatewayCommands { /// Retrieve the on-chain registered info for the batch of hotspots /// requested by list of Public Key Binaries InfoBatch(GetHotspotBatch), + /// Retrieve the on-chain registered info for the hotspot at a timestamp + InfoAtTimestamp(GetHotspotAtTimestamp), } #[derive(Debug, Args)] @@ -195,6 +197,20 @@ pub struct GetHotspotBatch { pub config_pubkey: String, } +#[derive(Debug, Args)] +pub struct GetHotspotAtTimestamp { + #[arg(long)] + pub hotspot: PublicKey, + #[arg(long)] + pub query_time: u64, + #[arg(from_global)] + pub keypair: PathBuf, + #[arg(from_global)] + pub config_host: String, + #[arg(from_global)] + pub config_pubkey: String, +} + #[derive(Debug, Subcommand)] pub enum EnvCommands { /// Make Environment variable to ease use diff --git a/mobile_config_cli/src/main.rs b/mobile_config_cli/src/main.rs index a219dfbdb..3919055c3 100644 --- a/mobile_config_cli/src/main.rs +++ b/mobile_config_cli/src/main.rs @@ -46,6 +46,7 @@ pub async fn handle_cli(cli: Cli) -> Result { Commands::Gateway { command } => match command { cmds::GatewayCommands::Info(args) => gateway::info(args).await, cmds::GatewayCommands::InfoBatch(args) => gateway::info_batch(args).await, + cmds::GatewayCommands::InfoAtTimestamp(args) => gateway::info_at_timestamp(args).await, }, } }