Skip to content

Commit dd45ef5

Browse files
committed
Use the generation time, not last seen time, for update timestamps
When we write an update, we have to include a timestamp that represents both the the time that the client will use to fetch the next diff and the timestamp used as a reference point when adding updates to the network graph (the client actually uses the included timestamp minus two weeks). Because of the second use, updates were being generated with the highest timestmap of all included updates, rounded down to the nearest update-interval multiple (because of the first use). In practice, because we expect to see many updates in any hour window, this meant we were really calculating the generation reference timestamp in a very indirect way. Here we simply change to using the reference timestamp directly. This updates regtest results when generating an initial sync against an empty graph to include a real timestamp, rather than 0, though this shouldn't impact clients' correctness.
1 parent 8493c61 commit dd45ef5

File tree

4 files changed

+32
-39
lines changed

4 files changed

+32
-39
lines changed

src/lib.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ async fn calculate_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
203203
serialization::serialize_delta_set(delta_set, node_delta_set, last_sync_timestamp)
204204
}
205205

206-
fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, serialization_version: u8, logger: L) -> SerializedResponse where L::Target: Logger {
206+
fn serialize_delta<L: Deref + Clone>(
207+
serialization_details: &SerializationSet, serialization_time: u32, serialization_version: u8, logger: L,
208+
) -> SerializedResponse where L::Target: Logger {
207209
let mut output: Vec<u8> = vec![];
208210
let snapshot_interval = config::snapshot_generation_interval();
209211

@@ -277,11 +279,9 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s
277279

278280
// always write the chain hash
279281
serialization_details.chain_hash.write(&mut prefixed_output).unwrap();
280-
// always write the latest seen timestamp
281-
let latest_seen_timestamp = serialization_details.latest_seen;
282-
let overflow_seconds = latest_seen_timestamp % snapshot_interval;
283-
let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds);
284-
serialized_seen_timestamp.write(&mut prefixed_output).unwrap();
282+
// always write the time we're generating this object
283+
assert_eq!(serialization_time % snapshot_interval, 0);
284+
serialization_time.write(&mut prefixed_output).unwrap();
285285

286286
if serialization_version >= 2 { // serialize the most common node features
287287
for mutated_node_id in serialization_details.node_mutations.keys() {
@@ -389,7 +389,7 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s
389389
prefixed_output.append(&mut output);
390390

391391
log_info!(logger, "duplicated node ids: {}", duplicate_node_ids);
392-
log_info!(logger, "latest seen timestamp: {:?}", serialization_details.latest_seen);
392+
log_info!(logger, "generated at: {}", serialization_time);
393393

394394
SerializedResponse {
395395
data: prefixed_output,

src/serialization.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::cmp::max;
21
use std::collections::HashMap;
32
use std::time::{SystemTime, UNIX_EPOCH};
43

@@ -16,7 +15,6 @@ pub(super) struct SerializationSet {
1615
pub(super) full_update_defaults: DefaultUpdateValues,
1716
pub(super) node_announcement_feature_defaults: Vec<NodeFeatures>,
1817
pub(super) node_mutations: NodeDeltaSet,
19-
pub(super) latest_seen: u32,
2018
pub(super) chain_hash: ChainHash,
2119
}
2220

@@ -130,7 +128,6 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N
130128
node_announcement_feature_defaults: vec![],
131129
node_mutations: Default::default(),
132130
chain_hash: ChainHash::using_genesis_block(config::network()),
133-
latest_seen: 0,
134131
};
135132

136133
let mut full_update_histograms = FullUpdateValueHistograms {
@@ -165,7 +162,6 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N
165162
};
166163
let send_announcement = is_new_announcement || is_newly_included_announcement;
167164
if send_announcement {
168-
serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen);
169165
serialization_set.announcements.push(channel_delta.announcement.unwrap().announcement);
170166
}
171167

@@ -178,10 +174,6 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N
178174
let latest_update = latest_update_delta.update;
179175
assert_eq!(latest_update.short_channel_id, scid, "Update in DB had wrong SCID column");
180176

181-
// the returned seen timestamp should be the latest of all the returned
182-
// announcements and latest updates
183-
serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen);
184-
185177
if let Some(update_delta) = updates.last_update_before_seen {
186178
let mutated_properties = updates.mutated_properties;
187179
if send_announcement || mutated_properties.len() == 5 || update_delta.seen <= non_incremental_previous_update_threshold_timestamp {

src/snapshot.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
7373
let snapshot_generation_time = SystemTime::now();
7474
let snapshot_generation_timestamp = snapshot_generation_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
7575
let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, snapshot_interval);
76+
let serialized_timestamp: u32 = reference_timestamp.try_into().unwrap();
7677
log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp);
7778

7879
// 2. sleep until the next round interval
@@ -121,8 +122,8 @@ impl<L: Deref + Clone> Snapshotter<L> where L::Target: Logger {
121122
log_info!(self.logger, "Calculating {}-second snapshot", current_scope);
122123
// calculate the snapshot
123124
let delta = super::calculate_delta(network_graph_clone.clone(), current_last_sync_timestamp.clone() as u32, Some(reference_timestamp), self.logger.clone()).await;
124-
let snapshot_v1 = super::serialize_delta(&delta, 1, self.logger.clone());
125-
let snapshot_v2 = super::serialize_delta(&delta, 2, self.logger.clone());
125+
let snapshot_v1 = super::serialize_delta(&delta, serialized_timestamp, 1, self.logger.clone());
126+
let snapshot_v2 = super::serialize_delta(&delta, serialized_timestamp, 2, self.logger.clone());
126127

127128
// persist the snapshot and update the symlink
128129
let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-scope__previous-sync:{}.lngossip", reference_timestamp, current_scope, current_last_sync_timestamp);

src/tests/mod.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,8 @@ async fn test_trivial_setup() {
263263
}
264264

265265
let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
266-
let serialization = serialize_delta(&delta, 1, logger.clone());
266+
let delta_time = timestamp % config::snapshot_generation_interval();
267+
let serialization = serialize_delta(&delta, delta_time, 1, logger.clone());
267268
logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
268269
clean_test_db().await;
269270

@@ -280,8 +281,7 @@ async fn test_trivial_setup() {
280281
let update_result = rgs.update_network_graph(&serialization.data).unwrap();
281282
println!("update result: {}", update_result);
282283
// the update result must be a multiple of our snapshot granularity
283-
assert_eq!(update_result % config::snapshot_generation_interval(), 0);
284-
assert!(update_result < timestamp);
284+
assert_eq!(update_result, delta_time);
285285

286286
let timestamp_delta = timestamp - update_result;
287287
println!("timestamp delta: {}", timestamp_delta);
@@ -428,7 +428,7 @@ async fn test_node_announcement_delta_detection() {
428428
}
429429

430430
let delta = calculate_delta(network_graph_arc.clone(), timestamp - 5, None, logger.clone()).await;
431-
let serialization = serialize_delta(&delta, 2, logger.clone());
431+
let serialization = serialize_delta(&delta, timestamp, 2, logger.clone());
432432
clean_test_db().await;
433433

434434
assert_eq!(serialization.message_count, 3);
@@ -479,7 +479,7 @@ async fn test_unidirectional_intermediate_update_consideration() {
479479
let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
480480

481481
let delta = calculate_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
482-
let serialization = serialize_delta(&delta, 1, logger.clone());
482+
let serialization = serialize_delta(&delta, current_time(), 1, logger.clone());
483483

484484
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
485485
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
@@ -547,7 +547,7 @@ async fn test_bidirectional_intermediate_update_consideration() {
547547
assert_eq!(channel_count, 1);
548548

549549
let delta = calculate_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
550-
let serialization = serialize_delta(&delta, 1, logger.clone());
550+
let serialization = serialize_delta(&delta, current_time(), 1, logger.clone());
551551

552552
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
553553
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
@@ -631,7 +631,7 @@ async fn test_channel_reminders() {
631631
assert_eq!(channel_count, 2);
632632

633633
let delta = calculate_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, None, logger.clone()).await;
634-
let serialization = serialize_delta(&delta, 1, logger.clone());
634+
let serialization = serialize_delta(&delta, current_time(), 1, logger.clone());
635635

636636
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
637637
logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1);
@@ -701,7 +701,8 @@ async fn test_full_snapshot_recency() {
701701

702702
{ // sync after initial seed
703703
let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
704-
let serialization = serialize_delta(&delta, 1, logger.clone());
704+
let delta_time = current_time() % config::snapshot_generation_interval();
705+
let serialization = serialize_delta(&delta, current_time(), 1, logger.clone());
705706
logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
706707

707708
let channel_count = network_graph_arc.read_only().channels().len();
@@ -714,8 +715,7 @@ async fn test_full_snapshot_recency() {
714715
let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
715716
let update_result = rgs.update_network_graph(&serialization.data).unwrap();
716717
// the update result must be a multiple of our snapshot granularity
717-
assert_eq!(update_result % config::snapshot_generation_interval(), 0);
718-
assert!(update_result < timestamp);
718+
assert_eq!(update_result, delta_time);
719719

720720
let readonly_graph = client_graph_arc.read_only();
721721
let channels = readonly_graph.channels();
@@ -782,7 +782,8 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() {
782782

783783
{ // sync after initial seed
784784
let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
785-
let serialization = serialize_delta(&delta, 1, logger.clone());
785+
let delta_time = current_time() % config::snapshot_generation_interval();
786+
let serialization = serialize_delta(&delta, delta_time, 1, logger.clone());
786787
logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
787788

788789
let channel_count = network_graph_arc.read_only().channels().len();
@@ -795,8 +796,7 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() {
795796
let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
796797
let update_result = rgs.update_network_graph(&serialization.data).unwrap();
797798
// the update result must be a multiple of our snapshot granularity
798-
assert_eq!(update_result % config::snapshot_generation_interval(), 0);
799-
assert!(update_result < timestamp);
799+
assert_eq!(update_result, delta_time);
800800

801801
let readonly_graph = client_graph_arc.read_only();
802802
let channels = readonly_graph.channels();
@@ -862,7 +862,8 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() {
862862

863863
{ // sync after initial seed
864864
let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
865-
let serialization = serialize_delta(&delta, 1, logger.clone());
865+
let delta_time = current_time() % config::snapshot_generation_interval();
866+
let serialization = serialize_delta(&delta, delta_time, 1, logger.clone());
866867
logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
867868

868869
let channel_count = network_graph_arc.read_only().channels().len();
@@ -875,8 +876,7 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() {
875876
let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
876877
let update_result = rgs.update_network_graph(&serialization.data).unwrap();
877878
// the update result must be a multiple of our snapshot granularity
878-
assert_eq!(update_result % config::snapshot_generation_interval(), 0);
879-
assert!(update_result < timestamp);
879+
assert_eq!(update_result, delta_time);
880880

881881
let readonly_graph = client_graph_arc.read_only();
882882
let channels = readonly_graph.channels();
@@ -996,7 +996,8 @@ async fn test_full_snapshot_mutiny_scenario() {
996996

997997
{ // sync after initial seed
998998
let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
999-
let serialization = serialize_delta(&delta, 1, logger.clone());
999+
let delta_time = current_time() % config::snapshot_generation_interval();
1000+
let serialization = serialize_delta(&delta, delta_time, 1, logger.clone());
10001001
logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
10011002

10021003
let channel_count = network_graph_arc.read_only().channels().len();
@@ -1010,8 +1011,7 @@ async fn test_full_snapshot_mutiny_scenario() {
10101011
let update_result = rgs.update_network_graph(&serialization.data).unwrap();
10111012
println!("update result: {}", update_result);
10121013
// the update result must be a multiple of our snapshot granularity
1013-
assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1014-
assert!(update_result < timestamp);
1014+
assert_eq!(update_result, delta_time);
10151015

10161016
let timestamp_delta = timestamp - update_result;
10171017
println!("timestamp delta: {}", timestamp_delta);
@@ -1110,7 +1110,8 @@ async fn test_full_snapshot_interlaced_channel_timestamps() {
11101110

11111111
{ // sync after initial seed
11121112
let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
1113-
let serialization = serialize_delta(&delta, 1, logger.clone());
1113+
let delta_time = current_time() % config::snapshot_generation_interval();
1114+
let serialization = serialize_delta(&delta, delta_time, 1, logger.clone());
11141115
logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
11151116

11161117
let channel_count = network_graph_arc.read_only().channels().len();
@@ -1123,8 +1124,7 @@ async fn test_full_snapshot_interlaced_channel_timestamps() {
11231124
let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
11241125
let update_result = rgs.update_network_graph(&serialization.data).unwrap();
11251126
// the update result must be a multiple of our snapshot granularity
1126-
assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1127-
assert!(update_result < timestamp);
1127+
assert_eq!(update_result, delta_time);
11281128

11291129
let readonly_graph = client_graph_arc.read_only();
11301130
let channels = readonly_graph.channels();

0 commit comments

Comments
 (0)