Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Replace Instant with SystemTime for Records and related code #5856

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/ipfs-kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ futures = { workspace = true }
anyhow = "1.0.86"
libp2p = { path = "../../libp2p", features = [ "tokio", "dns", "kad", "noise", "tcp", "yamux", "rsa"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
web-time = "1.1.0"

[lints]
workspace = true
9 changes: 3 additions & 6 deletions examples/ipfs-kad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@

#![doc = include_str!("../README.md")]

use std::{
num::NonZeroUsize,
ops::Add,
time::{Duration, Instant},
};
use std::{num::NonZeroUsize, ops::Add, time::Duration};

use anyhow::{bail, Result};
use clap::Parser;
Expand All @@ -36,6 +32,7 @@ use libp2p::{
tcp, yamux, PeerId,
};
use tracing_subscriber::EnvFilter;
use web_time::SystemTime;

const BOOTNODES: [&str; 4] = [
"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
Expand Down Expand Up @@ -99,7 +96,7 @@ async fn main() -> Result<()> {
let mut pk_record =
kad::Record::new(pk_record_key, local_key.public().encode_protobuf());
pk_record.publisher = Some(*swarm.local_peer_id());
pk_record.expires = Some(Instant::now().add(Duration::from_secs(60)));
pk_record.expires = Some(SystemTime::now().add(Duration::from_secs(60)));

swarm
.behaviour_mut()
Expand Down
18 changes: 9 additions & 9 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use libp2p_swarm::{
};
use thiserror::Error;
use tracing::Level;
use web_time::Instant;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my knowledge, SystemTime is not available on wasm targets (it would panic when being used at runtime). In this case, we should use web_time::SystemTime in its place, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh that sounds like a better idea.

use web_time::SystemTime;

pub use crate::query::QueryStats;
use crate::{
Expand Down Expand Up @@ -788,7 +788,7 @@ where
/// [`Event::OutboundQueryProgressed{QueryResult::GetRecord}`].
pub fn get_record(&mut self, key: record::Key) -> QueryId {
let record = if let Some(record) = self.store.get(&key) {
if record.is_expired(Instant::now()) {
if record.is_expired(SystemTime::now()) {
self.store.remove(&key);
None
} else {
Expand Down Expand Up @@ -865,7 +865,7 @@ where
self.store.put(record.clone())?;
record.expires = record
.expires
.or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
.or_else(|| self.record_ttl.map(|ttl| SystemTime::now() + ttl));
let quorum = quorum.eval(self.queries.config().replication_factor);
let target = kbucket::Key::new(record.key.clone());
let peers = self.kbuckets.closest_keys(&target);
Expand Down Expand Up @@ -910,7 +910,7 @@ where
};
record.expires = record
.expires
.or_else(|| self.record_ttl.map(|ttl| Instant::now() + ttl));
.or_else(|| self.record_ttl.map(|ttl| SystemTime::now() + ttl));
let context = PutRecordContext::Custom;
let info = QueryInfo::PutRecord {
context,
Expand Down Expand Up @@ -1053,7 +1053,7 @@ where
.store
.providers(&key)
.into_iter()
.filter(|p| !p.is_expired(Instant::now()))
.filter(|p| !p.is_expired(SystemTime::now()))
.map(|p| p.provider)
.collect();

Expand Down Expand Up @@ -1832,7 +1832,7 @@ where
return;
}

let now = Instant::now();
let now = SystemTime::now();

// Calculate the expiration exponentially inversely proportional to the
// number of nodes between the local node and the closest node to the key
Expand Down Expand Up @@ -1937,7 +1937,7 @@ where
let record = ProviderRecord {
key,
provider: provider.node_id,
expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
expires: self.provider_record_ttl.map(|ttl| SystemTime::now() + ttl),
addresses: provider.multiaddrs,
};
match self.record_filtering {
Expand Down Expand Up @@ -2407,7 +2407,7 @@ where
// Lookup the record locally.
let record = match self.store.get(&key) {
Some(record) => {
if record.is_expired(Instant::now()) {
if record.is_expired(SystemTime::now()) {
self.store.remove(&key);
None
} else {
Expand Down Expand Up @@ -2536,7 +2536,7 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
let now = Instant::now();
let now = SystemTime::now();

// Calculate the available capacity for queries triggered by background jobs.
let mut jobs_query_capacity = JOBS_MAX_QUERIES.saturating_sub(self.queries.size());
Expand Down
30 changes: 18 additions & 12 deletions protocols/kad/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use std::{
use futures::prelude::*;
use futures_timer::Delay;
use libp2p_identity::PeerId;
use web_time::Instant;
use web_time::SystemTime;

use crate::record::{self, store::RecordStore, ProviderRecord, Record};

Expand Down Expand Up @@ -102,15 +102,17 @@ impl<T> PeriodicJob<T> {
#[cfg(test)]
fn asap(&mut self) {
if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
let new_deadline = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
let new_deadline = SystemTime::now()
.checked_sub(Duration::from_secs(1))
.unwrap();
*deadline = new_deadline;
delay.reset(Duration::from_secs(1));
}
}

/// Returns `true` if the job is currently not running but ready
/// to be run, `false` otherwise.
fn check_ready(&mut self, cx: &mut Context<'_>, now: Instant) -> bool {
fn check_ready(&mut self, cx: &mut Context<'_>, now: SystemTime) -> bool {
if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() {
return true;
Expand All @@ -124,7 +126,7 @@ impl<T> PeriodicJob<T> {
#[derive(Debug)]
enum PeriodicJobState<T> {
Running(T),
Waiting(Delay, Instant),
Waiting(Delay, SystemTime),
}

//////////////////////////////////////////////////////////////////////////////
Expand All @@ -133,7 +135,7 @@ enum PeriodicJobState<T> {
/// Periodic job for replicating / publishing records.
pub(crate) struct PutRecordJob {
local_id: PeerId,
next_publish: Option<Instant>,
next_publish: Option<SystemTime>,
publish_interval: Option<Duration>,
record_ttl: Option<Duration>,
skipped: HashSet<record::Key>,
Expand All @@ -149,7 +151,7 @@ impl PutRecordJob {
publish_interval: Option<Duration>,
record_ttl: Option<Duration>,
) -> Self {
let now = Instant::now();
let now = SystemTime::now();
let deadline = now + replicate_interval;
let delay = Delay::new(replicate_interval);
let next_publish = publish_interval.map(|i| now + i);
Expand Down Expand Up @@ -185,7 +187,11 @@ impl PutRecordJob {
#[cfg(test)]
pub(crate) fn asap(&mut self, publish: bool) {
if publish {
self.next_publish = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())
self.next_publish = Some(
SystemTime::now()
.checked_sub(Duration::from_secs(1))
.unwrap(),
)
}
self.inner.asap()
}
Expand All @@ -199,7 +205,7 @@ impl PutRecordJob {
&mut self,
cx: &mut Context<'_>,
store: &mut T,
now: Instant,
now: SystemTime,
) -> Poll<Record>
where
T: RecordStore,
Expand Down Expand Up @@ -266,7 +272,7 @@ pub(crate) struct AddProviderJob {
impl AddProviderJob {
/// Creates a new periodic job for provider announcements.
pub(crate) fn new(interval: Duration) -> Self {
let now = Instant::now();
let now = SystemTime::now();
Self {
inner: PeriodicJob {
interval,
Expand Down Expand Up @@ -302,7 +308,7 @@ impl AddProviderJob {
&mut self,
cx: &mut Context<'_>,
store: &mut T,
now: Instant,
now: SystemTime,
) -> Poll<ProviderRecord>
where
T: RecordStore,
Expand Down Expand Up @@ -378,7 +384,7 @@ mod tests {
}

block_on(poll_fn(|ctx| {
let now = Instant::now() + job.inner.interval;
let now = SystemTime::now() + job.inner.interval;
// All (non-expired) records in the store must be yielded by the job.
for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
if !r.is_expired(now) {
Expand Down Expand Up @@ -408,7 +414,7 @@ mod tests {
}

block_on(poll_fn(|ctx| {
let now = Instant::now() + job.inner.interval;
let now = SystemTime::now() + job.inner.interval;
// All (non-expired) records in the store must be yielded by the job.
for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
if !r.is_expired(now) {
Expand Down
19 changes: 15 additions & 4 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use libp2p_core::{
use libp2p_identity::PeerId;
use libp2p_swarm::StreamProtocol;
use tracing::debug;
use web_time::Instant;
use web_time::SystemTime;

use crate::{
proto,
Expand Down Expand Up @@ -550,7 +550,7 @@ fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
};

let expires = if record.ttl > 0 {
Some(Instant::now() + Duration::from_secs(record.ttl as u64))
Some(SystemTime::now() + Duration::from_secs(record.ttl as u64))
} else {
None
};
Expand All @@ -571,9 +571,20 @@ fn record_to_proto(record: Record) -> proto::Record {
ttl: record
.expires
.map(|t| {
let now = Instant::now();
let now = SystemTime::now();
if t > now {
(t - now).as_secs() as u32
now.duration_since(SystemTime::UNIX_EPOCH)
.map(|now_duration| {
let target_duration = t
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(now_duration);
if target_duration > now_duration {
(target_duration - now_duration).as_secs() as u32
} else {
1
}
})
.unwrap_or(1)
} else {
1 // because 0 means "does not expire"
}
Expand Down
18 changes: 10 additions & 8 deletions protocols/kad/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use peers::{
PeersIterState,
};
use smallvec::SmallVec;
use web_time::Instant;
use web_time::SystemTime;

use crate::{
behaviour::PeerInfo,
Expand Down Expand Up @@ -184,7 +184,7 @@ impl QueryPool {
}

/// Polls the pool to advance the queries.
pub(crate) fn poll(&mut self, now: Instant) -> QueryPoolState<'_> {
pub(crate) fn poll(&mut self, now: SystemTime) -> QueryPoolState<'_> {
let mut finished = None;
let mut timeout = None;
let mut waiting = None;
Expand All @@ -202,7 +202,9 @@ impl QueryPool {
break;
}
PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
let elapsed = now - query.stats.start.unwrap_or(now);
let elapsed = now
.duration_since(query.stats.start.unwrap_or(now))
.unwrap_or_default();
if elapsed >= self.config.timeout {
timeout = Some(query_id);
break;
Expand Down Expand Up @@ -390,7 +392,7 @@ impl Query {
}

/// Advances the state of the underlying peer iterator.
fn next(&mut self, now: Instant) -> PeersIterState<'_> {
fn next(&mut self, now: SystemTime) -> PeersIterState<'_> {
let state = match &mut self.peers.peer_iter {
QueryPeerIter::Closest(iter) => iter.next(now),
QueryPeerIter::ClosestDisjoint(iter) => iter.next(now),
Expand Down Expand Up @@ -469,8 +471,8 @@ pub struct QueryStats {
requests: u32,
success: u32,
failure: u32,
start: Option<Instant>,
end: Option<Instant>,
start: Option<SystemTime>,
end: Option<SystemTime>,
}

impl QueryStats {
Expand Down Expand Up @@ -517,9 +519,9 @@ impl QueryStats {
pub fn duration(&self) -> Option<Duration> {
if let Some(s) = self.start {
if let Some(e) = self.end {
Some(e - s)
e.duration_since(s).ok()
} else {
Some(Instant::now() - s)
SystemTime::now().duration_since(s).ok()
}
} else {
None
Expand Down
Loading
Loading