Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ae2696b
Create migration test
macpie Oct 3, 2025
98f8b4f
Update tracker
macpie Oct 6, 2025
029dd39
Fix gateway DB calls
macpie Oct 6, 2025
6240c9d
Fix Clippy
macpie Oct 6, 2025
eb1498f
PR comments
macpie Oct 7, 2025
e0531af
Add some print to debug
macpie Oct 7, 2025
497e0e4
Fix timestamp check
macpie Oct 7, 2025
ca0c79f
Added new API for retrieving historical gateway info
connormck333 Oct 17, 2025
4dfc96c
Add v1 to historical gateway info protobuf
connormck333 Oct 17, 2025
e7def30
Refactored gateway service reducing code dupe
connormck333 Oct 17, 2025
bb64843
Changed proto branch
connormck333 Oct 22, 2025
bda0b40
Fixed formatting errors
connormck333 Oct 22, 2025
968b6be
Gateway historical info test assertions updated
connormck333 Oct 22, 2025
507a18d
Fixed formatting errors
connormck333 Oct 22, 2025
8c8f374
Fixed formatting errors
connormck333 Oct 22, 2025
9cdf3af
Increased sleep in historical info test
connormck333 Oct 22, 2025
7f6003d
Tests cleanup, removed println, changed assert! to assert_eq
connormck333 Oct 23, 2025
01376f0
Removed sleep from test
connormck333 Oct 23, 2025
5c7eb38
Updated gateway tracker test name & reduced test size
connormck333 Oct 23, 2025
7556b56
Removed println in tests
connormck333 Oct 23, 2025
f7bec6a
Add info_historical command to mobile_config_cli
connormck333 Oct 24, 2025
53078d0
Changed historical info to info_at_timestamp
connormck333 Oct 24, 2025
e83d7b8
Update mobile verifiers to use new info_at_timestamp API
connormck333 Oct 28, 2025
924d2d4
Revert "Update mobile verifiers to use new info_at_timestamp API"
connormck333 Oct 28, 2025
37e586f
Merge branch 'main' into macpie/historical
connormck333 Oct 29, 2025
9fcf30b
Merge branch 'main' into macpie/historical
connormck333 Nov 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ aws-smithy-types-convert = { version = "0.60.9", features = [
url = "2.5.4"

### Protobuf
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "connor/historical", features = [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert before merge

"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "connor/historical" }
# Pickup versions from above
prost = "*"
tonic = { version = "*", features = ["tls-aws-lc", "tls-native-roots"] }
Expand Down
1 change: 1 addition & 0 deletions file_store_oracles/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl_msg_verify!(mobile_config::GatewayInfoStreamResV3, signature);
impl_msg_verify!(mobile_config::BoostedHexInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexModifiedInfoStreamReqV1, signature);
impl_msg_verify!(mobile_config::BoostedHexInfoStreamResV1, signature);
impl_msg_verify!(mobile_config::GatewayInfoAtTimestampReqV1, signature);
impl_msg_verify!(sub_dao::SubDaoEpochRewardInfoReqV1, signature);
impl_msg_verify!(sub_dao::SubDaoEpochRewardInfoResV1, signature);
impl_msg_verify!(poc_mobile::SubscriberVerifiedMappingEventReqV1, signature);
Expand Down
1 change: 1 addition & 0 deletions mobile_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ tls-init = { path = "../tls_init" }
[dev-dependencies]
rand = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
tempfile = "3"
28 changes: 28 additions & 0 deletions mobile_config/migrations/20251003000000_gateways_historical.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- 1. Drop the primary key constraint on address
ALTER TABLE
gateways DROP CONSTRAINT IF EXISTS gateways_pkey;

-- 2. Rename column updated_at -> inserted_at
ALTER TABLE
gateways RENAME COLUMN updated_at TO inserted_at;

-- 3. Backfill inserted_at with created_at values
UPDATE
gateways
SET
inserted_at = created_at;

-- 4. Ensure inserted_at is NOT NULL and has a default value of now()
ALTER TABLE
gateways
ALTER COLUMN inserted_at SET DEFAULT now(),
ALTER COLUMN inserted_at SET NOT NULL;

-- 5. Create an index on (address, inserted_at DESC)
CREATE INDEX IF NOT EXISTS gateways_address_inserted_idx ON gateways (address, inserted_at DESC);

-- 6. Create an PK on (address, inserted_at DESC)
ALTER TABLE
gateways
ADD
CONSTRAINT gateways_pkey PRIMARY KEY (address, inserted_at);
145 changes: 84 additions & 61 deletions mobile_config/src/gateway/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ pub struct Gateway {
pub gateway_type: GatewayType,
// When the record was first created from metadata DB
pub created_at: DateTime<Utc>,
// When record was last updated
pub updated_at: DateTime<Utc>,
// When record was inserted
pub inserted_at: DateTime<Utc>,
// When record was last updated from metadata DB (could be set to now if no metadata DB info)
pub refreshed_at: DateTime<Utc>,
// When location or hash last changed, set to refreshed_at (updated via SQL query see Gateway::insert)
Expand All @@ -96,6 +96,7 @@ pub struct Gateway {
pub location_changed_at: Option<DateTime<Utc>>,
pub location_asserts: Option<u32>,
}

#[derive(Debug)]
pub struct LocationChangedAtUpdate {
pub address: PublicKeyBinary,
Expand All @@ -113,7 +114,6 @@ impl Gateway {
address,
gateway_type,
created_at,
updated_at,
refreshed_at,
last_changed_at,
hash,
Expand All @@ -130,7 +130,6 @@ impl Gateway {
b.push_bind(g.address.as_ref())
.push_bind(g.gateway_type)
.push_bind(g.created_at)
.push_bind(g.updated_at)
.push_bind(g.refreshed_at)
.push_bind(g.last_changed_at)
.push_bind(g.hash.as_str())
Expand All @@ -142,31 +141,6 @@ impl Gateway {
.push_bind(g.location_asserts.map(|v| v as i64));
});

qb.push(
" ON CONFLICT (address) DO UPDATE SET
gateway_type = EXCLUDED.gateway_type,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
refreshed_at = EXCLUDED.refreshed_at,
last_changed_at = CASE
WHEN gateways.location IS DISTINCT FROM EXCLUDED.location
OR gateways.hash IS DISTINCT FROM EXCLUDED.hash
THEN EXCLUDED.refreshed_at
ELSE gateways.last_changed_at
END,
hash = EXCLUDED.hash,
antenna = EXCLUDED.antenna,
elevation = EXCLUDED.elevation,
azimuth = EXCLUDED.azimuth,
location = EXCLUDED.location,
location_changed_at = CASE
WHEN gateways.location IS DISTINCT FROM EXCLUDED.location
THEN EXCLUDED.refreshed_at
ELSE gateways.location_changed_at
END,
location_asserts = EXCLUDED.location_asserts",
);

let res = qb.build().execute(pool).await?;
Ok(res.rows_affected())
}
Expand All @@ -178,7 +152,6 @@ impl Gateway {
address,
gateway_type,
created_at,
updated_at,
refreshed_at,
last_changed_at,
hash,
Expand All @@ -191,37 +164,13 @@ impl Gateway {
)
VALUES (
$1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, $11, $12, $13
$8, $9, $10, $11, $12
)
ON CONFLICT (address)
DO UPDATE SET
gateway_type = EXCLUDED.gateway_type,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
refreshed_at = EXCLUDED.refreshed_at,
last_changed_at = CASE
WHEN gateways.location IS DISTINCT FROM EXCLUDED.location
OR gateways.hash IS DISTINCT FROM EXCLUDED.hash
THEN EXCLUDED.refreshed_at
ELSE gateways.last_changed_at
END,
hash = EXCLUDED.hash,
antenna = EXCLUDED.antenna,
elevation = EXCLUDED.elevation,
azimuth = EXCLUDED.azimuth,
location = EXCLUDED.location,
location_changed_at = CASE
WHEN gateways.location IS DISTINCT FROM EXCLUDED.location
THEN EXCLUDED.refreshed_at
ELSE gateways.location_changed_at
END,
location_asserts = EXCLUDED.location_asserts
"#,
)
.bind(self.address.as_ref())
.bind(self.gateway_type)
.bind(self.created_at)
.bind(self.updated_at)
.bind(self.refreshed_at)
.bind(self.last_changed_at)
.bind(self.hash.as_str())
Expand All @@ -247,7 +196,7 @@ impl Gateway {
address,
gateway_type,
created_at,
updated_at,
inserted_at,
refreshed_at,
last_changed_at,
hash,
Expand All @@ -259,6 +208,8 @@ impl Gateway {
location_asserts
FROM gateways
WHERE address = $1
ORDER BY inserted_at DESC
LIMIT 1
"#,
)
.bind(address.as_ref())
Expand All @@ -268,6 +219,76 @@ impl Gateway {
Ok(gateway)
}

pub async fn get_by_addresses<'a>(
db: impl PgExecutor<'a>,
addresses: Vec<PublicKeyBinary>,
) -> anyhow::Result<Vec<Self>> {
let addr_array: Vec<Vec<u8>> = addresses.iter().map(|a| a.as_ref().to_vec()).collect();

let rows = sqlx::query_as::<_, Self>(
r#"
SELECT DISTINCT ON (address)
address,
gateway_type,
created_at,
inserted_at,
refreshed_at,
last_changed_at,
hash,
antenna,
elevation,
azimuth,
location,
location_changed_at,
location_asserts
FROM gateways
WHERE address = ANY($1)
ORDER BY address, inserted_at DESC
"#,
)
.bind(addr_array)
.fetch_all(db)
.await?;

Ok(rows)
}

pub async fn get_by_address_and_inserted_at<'a>(
db: impl PgExecutor<'a>,
address: &PublicKeyBinary,
inserted_at_max: &DateTime<Utc>,
) -> anyhow::Result<Option<Self>> {
let gateway = sqlx::query_as::<_, Self>(
r#"
SELECT
address,
gateway_type,
created_at,
inserted_at,
refreshed_at,
last_changed_at,
hash,
antenna,
elevation,
azimuth,
location,
location_changed_at,
location_asserts
FROM gateways
WHERE address = $1
AND inserted_at <= $2
ORDER BY inserted_at DESC
LIMIT 1
"#,
)
.bind(address.as_ref())
.bind(inserted_at_max)
.fetch_optional(db)
.await?;

Ok(gateway)
}

pub fn stream_by_addresses<'a>(
db: impl PgExecutor<'a> + 'a,
addresses: Vec<PublicKeyBinary>,
Expand All @@ -277,11 +298,11 @@ impl Gateway {

sqlx::query_as::<_, Self>(
r#"
SELECT
SELECT DISTINCT ON (address)
address,
gateway_type,
created_at,
updated_at,
inserted_at,
refreshed_at,
last_changed_at,
hash,
Expand All @@ -294,6 +315,7 @@ impl Gateway {
FROM gateways
WHERE address = ANY($1)
AND last_changed_at >= $2
ORDER BY address, inserted_at DESC
"#,
)
.bind(addr_array)
Expand All @@ -311,11 +333,11 @@ impl Gateway {
) -> impl Stream<Item = Self> + 'a {
sqlx::query_as::<_, Self>(
r#"
SELECT
SELECT DISTINCT ON (address)
address,
gateway_type,
created_at,
updated_at,
inserted_at,
refreshed_at,
last_changed_at,
hash,
Expand All @@ -332,6 +354,7 @@ impl Gateway {
$3::timestamptz IS NULL
OR (location IS NOT NULL AND location_changed_at >= $3)
)
ORDER BY address, inserted_at DESC
"#,
)
.bind(types)
Expand Down Expand Up @@ -395,7 +418,7 @@ impl FromRow<'_, PgRow> for Gateway {
address: PublicKeyBinary::from(row.try_get::<Vec<u8>, _>("address")?),
gateway_type: row.try_get("gateway_type")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
inserted_at: row.try_get("inserted_at")?,
refreshed_at: row.try_get("refreshed_at")?,
last_changed_at: row.try_get("last_changed_at")?,
hash: row.try_get("hash")?,
Expand Down
13 changes: 7 additions & 6 deletions mobile_config/src/gateway/metadata_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,23 @@ impl MobileHotspotInfo {
None => (None, None, None),
};

let refreshed_at = self.refreshed_at.unwrap_or_else(Utc::now);

Ok(Some(Gateway {
address: self.entity_key.clone(),
gateway_type: GatewayType::try_from(self.device_type.clone())?,
created_at: self.created_at,
updated_at: Utc::now(),
refreshed_at: self.refreshed_at.unwrap_or_else(Utc::now),
// Updated via SQL query see Gateway::insert
last_changed_at: Utc::now(),
inserted_at: Utc::now(),
refreshed_at,
last_changed_at: refreshed_at,
hash: self.compute_hash(),
antenna,
elevation,
azimuth,
location,
// Set to refreshed_at when hotspot has a location, None otherwise
location_changed_at: if location.is_some() {
Some(self.refreshed_at.unwrap_or_else(Utc::now))
Some(refreshed_at)
} else {
None
},
Expand Down Expand Up @@ -164,7 +165,7 @@ impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for MobileHotspotInfo {
)
.map_err(|err| sqlx::Error::Decode(Box::new(err)))?,
refreshed_at: row.get::<Option<DateTime<Utc>>, &str>("refreshed_at"),
created_at: row.get::<DateTime<Utc>, &str>("refreshed_at"),
created_at: row.get::<DateTime<Utc>, &str>("created_at"),
location: row.get::<Option<i64>, &str>("location"),
is_full_hotspot: row.get::<Option<bool>, &str>("is_full_hotspot"),
num_location_asserts: row.get::<Option<i32>, &str>("num_location_asserts"),
Expand Down
9 changes: 9 additions & 0 deletions mobile_config/src/gateway/service/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,12 @@ pub fn stream_by_types<'a>(

Gateway::stream_by_types(db, gateway_types, min_date, None).map(|gateway| gateway.into())
}

pub async fn get_by_address_and_inserted_at(
db: impl PgExecutor<'_>,
pubkey_bin: &PublicKeyBinary,
inserted_at_max: &DateTime<Utc>,
) -> anyhow::Result<Option<GatewayInfo>> {
let gateway = Gateway::get_by_address_and_inserted_at(db, pubkey_bin, inserted_at_max).await?;
Ok(gateway.map(|g| g.into()))
}
Loading