Skip to content

Commit af6c6c3

Browse files
committed
Implement gateway_info_stream_v2
1 parent 28eefd7 commit af6c6c3

File tree

5 files changed

+203
-25
lines changed

5 files changed

+203
-25
lines changed

Cargo.lock

Lines changed: 33 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,10 @@ aws-smithy-types-convert = { version = "0.60.9", features = [
125125
url = "2.5.4"
126126

127127
### Protobuf
128-
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
128+
helium-proto = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2", features = [
129129
"services",
130130
] }
131-
beacon = { git = "https://github.com/helium/proto", branch = "master" }
131+
beacon = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2" }
132132
# Pickup versions from above
133133
prost = "*"
134134
tonic = { version = "*", features = ["tls-aws-lc", "tls-native-roots"] }

file_store/src/traits/msg_verify.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl_msg_verify!(iot_config::AdminLoadRegionReqV1, signature);
7373
impl_msg_verify!(iot_config::AdminRemoveKeyReqV1, signature);
7474
impl_msg_verify!(iot_config::GatewayInfoReqV1, signature);
7575
impl_msg_verify!(iot_config::GatewayInfoStreamReqV1, signature);
76+
impl_msg_verify!(iot_config::GatewayInfoStreamReqV2, signature);
7677
impl_msg_verify!(iot_config::RegionParamsReqV1, signature);
7778
impl_msg_verify!(iot_config::GatewayInfoResV1, signature);
7879
impl_msg_verify!(iot_config::GatewayInfoStreamResV1, signature);

iot_config/src/gateway/service/mod.rs

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ use crate::{
33
telemetry, verify_public_key, GrpcResult, GrpcStreamResult,
44
};
55
use anyhow::Result;
6-
use chrono::{DateTime, Utc};
6+
use chrono::{DateTime, TimeZone, Utc};
77
use file_store::traits::{MsgVerify, TimestampEncode};
88
use futures::stream::StreamExt;
99
use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign};
1010
use helium_proto::{
1111
services::iot_config::{
12-
self, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, GatewayInfoStreamResV1,
13-
GatewayLocationReqV1, GatewayLocationResV1, GatewayRegionParamsReqV1,
14-
GatewayRegionParamsResV1,
12+
self, GatewayInfoReqV1, GatewayInfoResV1, GatewayInfoStreamReqV1, GatewayInfoStreamReqV2,
13+
GatewayInfoStreamResV1, GatewayLocationReqV1, GatewayLocationResV1,
14+
GatewayRegionParamsReqV1, GatewayRegionParamsResV1,
1515
},
1616
Message, Region,
1717
};
@@ -292,13 +292,71 @@ impl iot_config::Gateway for GatewayService {
292292
let (tx, rx) = tokio::sync::mpsc::channel(20);
293293

294294
tokio::spawn(async move {
295+
let epoch: DateTime<Utc> = "1970-01-01T00:00:00Z".parse().unwrap();
295296
tokio::select! {
296297
_ = stream_all_gateways_info(
297298
&pool,
298299
tx.clone(),
299300
&signing_key,
300301
region_map.clone(),
301302
batch_size,
303+
epoch,
304+
None,
305+
) => (),
306+
}
307+
});
308+
309+
Ok(Response::new(GrpcStreamResult::new(rx)))
310+
}
311+
312+
type info_stream_v2Stream = GrpcStreamResult<GatewayInfoStreamResV1>;
313+
async fn info_stream_v2(
314+
&self,
315+
request: Request<GatewayInfoStreamReqV2>,
316+
) -> GrpcResult<Self::info_stream_v2Stream> {
317+
let request = request.into_inner();
318+
telemetry::count_request("gateway", "info-stream");
319+
320+
let signer = verify_public_key(&request.signer)?;
321+
self.verify_request_signature(&signer, &request)?;
322+
323+
tracing::debug!("fetching all gateways' info");
324+
325+
let pool = self.pool.clone();
326+
let signing_key = self.signing_key.clone();
327+
let batch_size = request.batch_size;
328+
let min_last_changed_at = Utc
329+
.timestamp_opt(request.min_updated_at as i64, 0)
330+
.single()
331+
.ok_or(Status::invalid_argument(
332+
"Invalid min_refreshed_at argument",
333+
))?;
334+
335+
let min_location_changed_at = if request.min_location_changed_at == 0 {
336+
None
337+
} else {
338+
Some(
339+
Utc.timestamp_opt(request.min_location_changed_at as i64, 0)
340+
.single()
341+
.ok_or(Status::invalid_argument(
342+
"Invalid min_location_changed_at argument",
343+
))?,
344+
)
345+
};
346+
let region_map = self.region_map.clone();
347+
348+
let (tx, rx) = tokio::sync::mpsc::channel(20);
349+
350+
tokio::spawn(async move {
351+
tokio::select! {
352+
_ = stream_all_gateways_info(
353+
&pool,
354+
tx.clone(),
355+
&signing_key,
356+
region_map.clone(),
357+
batch_size,
358+
min_last_changed_at,
359+
min_location_changed_at,
302360
) => (),
303361
}
304362
});
@@ -313,13 +371,14 @@ async fn stream_all_gateways_info(
313371
signing_key: &Keypair,
314372
region_map: RegionMapReader,
315373
batch_size: u32,
374+
min_last_changed_at: DateTime<Utc>,
375+
min_location_changed_at: Option<DateTime<Utc>>,
316376
) -> anyhow::Result<()> {
317377
let timestamp = Utc::now().encode_timestamp();
318378
let signer: Vec<u8> = signing_key.public_key().into();
319379

320-
let epoch: DateTime<Utc> = "1970-01-01T00:00:00Z".parse().unwrap();
321-
322-
let mut stream = info::stream(pool, epoch, None).chunks(batch_size as usize);
380+
let mut stream = info::stream(pool, min_last_changed_at, min_location_changed_at)
381+
.chunks(batch_size as usize);
323382
while let Some(infos) = stream.next().await {
324383
let gateway_infos = infos
325384
.into_iter()

0 commit comments

Comments
 (0)