Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 77 additions & 37 deletions kindelia_core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,18 @@ pub fn miner_loop(
pub enum NodeError {
#[error(transparent)]
BlockStorage(#[from] BlockStorageError),

#[error(transparent)]
BlockLookup(#[from] BlockLookupError),

#[error(transparent)]
Epoch(#[from] EpochError),

#[error("Inconsistent data error:\n msg: {msg}\n context: {context}")]
Inconsistency { msg: String, context: String },

#[error("Bad Magic! Block's magic value {magic} does not match network_id {network_id}")]
BadMagic { magic: u32, network_id: u32 },
}

/// Errors associated with Blocks
Expand Down Expand Up @@ -914,7 +926,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
// - In case of a reorg, rollback to the block before it
// - Run that block's code, updating the HVM state
// - Updates the longest chain saved on disk
pub fn add_block(&mut self, block: &HashedBlock) {
pub fn add_block(&mut self, block: &HashedBlock) -> Result<(), NodeError> {
// Adding a block might trigger the addition of other blocks
// that were waiting for it. Because of that, we loop here.

Expand All @@ -924,7 +936,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
while let Some(block) = must_include.pop() {
let btime = block.time;
// If block is too far into the future, ignore it
if btime >= get_time() + DELAY_TOLERANCE {
if btime >= try_get_time()? + DELAY_TOLERANCE {
emit_event!(
self.event_emitter,
NodeEventType::too_late(&block),
Expand Down Expand Up @@ -1114,8 +1126,12 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
// This will cause the block to be moved from self.pending to self.block
if let Some(wait_list) = self.wait_list.get(&bhash) {
for waiting_for_me in wait_list {
must_include
.push(self.pending.remove(waiting_for_me).expect("block"));
must_include.push(self.pending.remove(waiting_for_me).ok_or(
NodeError::Inconsistency {
msg: format!("block {} not found", waiting_for_me),
context: "removing block from pending list".to_string(),
},
)?);
}
self.wait_list.remove(&bhash);
}
Expand All @@ -1132,6 +1148,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
);
}
}
Ok(())
}

pub fn compute_block(
Expand Down Expand Up @@ -1191,20 +1208,22 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
return longest;
}

pub fn receive_message(&mut self) {
pub fn receive_message(&mut self) -> Result<(), NodeError> {
let mut count = 0;
for (addr, msg) in self.comm.proto_recv() {
//if count < HANDLE_MESSAGE_LIMIT { TODO: ???
self.handle_message(addr, &msg);
self.handle_message(addr, &msg)?;
count = count + 1;
//}
}
Ok(())
}

fn receive_request(&mut self) {
fn receive_request(&mut self) -> Result<(), NodeError> {
if let Ok(request) = self.query_recv.try_recv() {
self.handle_request(request);
self.handle_request(request)?;
}
Ok(())
}

pub fn get_block_hash_by_index(&self, index: u64) -> Option<U256> {
Expand Down Expand Up @@ -1273,7 +1292,10 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
Some(RegInfo { ownr, stmt })
}

pub fn handle_request(&mut self, request: NodeRequest<C>) {
pub fn handle_request(
&mut self,
request: NodeRequest<C>,
) -> Result<(), NodeError> {
fn handle_ans_err<T>(req_txt: &str, res: Result<(), T>) {
if let Err(_) = res {
eprintln!("WARN: failed to send node request {} answer back", req_txt);
Expand Down Expand Up @@ -1326,19 +1348,16 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
let hashes = self.get_longest_chain(Some(num));
let infos = hashes
.iter()
.map(|h| {
self
.get_block_info(h)
.expect("should obtain block info result ok")
.expect("should find block")
})
.collect();
.map(|h| self.get_block_info(h))
.collect::<Result<Option<_>, _>>()?
.ok_or(NodeError::Inconsistency {
msg: format!("block not found in range [{}..{}]", start, end),
context: "processing GetBlocks request".to_string(),
})?;
handle_ans_err("GetBlocks", tx.send(infos));
}
NodeRequest::GetBlock { hash, tx } => {
let info = self
.get_block_info(&hash)
.expect("should obtain block info result ok");
let info = self.get_block_info(&hash)?;
handle_ans_err("GetBlock", tx.send(info));
}
NodeRequest::GetBlockHash { index, tx } => {
Expand Down Expand Up @@ -1423,6 +1442,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
handle_ans_err("Publish", tx.send(result));
}
}
Ok(())
}

// Sends a block to a target address; also share some random peers
Expand Down Expand Up @@ -1491,20 +1511,23 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
&mut self,
addr: C::Address,
msg: &Message<C::Address>,
) {
) -> Result<(), NodeError> {
if addr != self.addr {
match msg {
Message::GiveMeThatBlock { magic, .. }
| Message::NoticeTheseBlocks { magic, .. }
| Message::PleaseMineThisTransaction { magic, .. } => {
if magic != &self.network_id {
return;
return Err(NodeError::BadMagic {
magic: *magic,
network_id: self.network_id,
});
}
}
}

self.peers.see_peer(
Peer { address: addr, seen_at: get_time() },
Peer { address: addr, seen_at: try_get_time()? },
#[cfg(feature = "events")]
self.event_emitter.clone(),
);
Expand Down Expand Up @@ -1566,7 +1589,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {

// Adds the block to the database
for block in &blocks {
self.add_block(&block);
self.add_block(&block)?;
}

// Requests missing ancestors
Expand All @@ -1590,6 +1613,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
}
}
}
Ok(())
}

pub fn gossip(&mut self, peer_count: u128, message: &Message<C::Address>) {
Expand Down Expand Up @@ -1623,8 +1647,11 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
pub fn load_blocks(&mut self) -> Result<(), NodeError> {
self.storage.disable();
let storage = self.storage.clone();
storage
.read_blocks(|(block, _file_path)| self.add_block(&block.hashed()))?;
storage.read_blocks(|(block, _file_path)| {
self
.add_block(&block.hashed())
.map_err(|e| BlockStorageError::Read { source: Box::new(e) })
})?;
self.storage.enable();
Ok(())
}
Expand All @@ -1646,13 +1673,14 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
self.send_to_miner(MinerMessage::Request { prev: self.tip, body, targ });
}

fn do_handle_mined_block(&mut self) {
fn do_handle_mined_block(&mut self) -> Result<(), NodeError> {
if let Some(miner_comm) = &mut self.miner_comm {
if let MinerMessage::Answer { block } = miner_comm.read() {
self.add_block(&block);
self.add_block(&block)?;
self.broadcast_tip_block();
}
}
Ok(())
}

/// Builds the body to be mined.
Expand Down Expand Up @@ -1771,14 +1799,18 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
Task {
delay: HANDLE_MESSAGE_DELAY,
action: |node| {
node.receive_message();
if let Err(e) = node.receive_message() {
eprintln!("Error in task receive_message.\n{}", e);
}
},
},
// Receives and handles incoming API requests
Task {
delay: HANDLE_REQUEST_DELAY,
action: |node| {
node.receive_request();
if let Err(e) = node.receive_request() {
eprintln!("Error in task receive_request.\n{}", e);
}
},
},
// Forgets inactive peers
Expand All @@ -1797,7 +1829,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
delay: 5_000,
action: |node| {
if let Err(e) = node.log_heartbeat() {
eprintln!("Error logging heartbeat.\n{}", e);
eprintln!("Error in task log_heartbeat.\n{}", e);
}
},
},
Expand All @@ -1820,7 +1852,9 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
Task {
delay: 5,
action: |node| {
node.do_handle_mined_block();
if let Err(e) = node.do_handle_mined_block() {
eprintln!("Error in task handle_mined_block.\n{}", e);
}
},
},
];
Expand All @@ -1832,15 +1866,21 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {

loop {
let now = std::time::Instant::now();
let system_time = get_time(); // Measured in milliseconds
for (i, task) in tasks.iter().enumerate() {
if last_tick_time[i] + task.delay <= system_time {
(task.action)(&mut self);
last_tick_time[i] = system_time;
}
match try_get_time() { // Measured in milliseconds
Ok(system_time) => {
for (i, task) in tasks.iter().enumerate() {
if last_tick_time[i] + task.delay <= system_time {
(task.action)(&mut self);
last_tick_time[i] = system_time;
}
}
},
Err(e) => eprintln!("{}\ncontext: main task loop. (tasks paused until system time is fixed)", e),
}

let elapsed = now.elapsed();
let extra = std::time::Duration::from_millis(1).checked_sub(elapsed);

// If the elapsed time is less than 1ms, sleep for the remaining time
if let Some(extra) = extra {
std::thread::sleep(extra);
Expand Down
18 changes: 13 additions & 5 deletions kindelia_core/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ where
height: u128,
block: HashedBlock,
) -> Result<(), BlockStorageError>;
fn read_blocks<F: FnMut((node::Block, PathBuf))>(
fn read_blocks<
F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>,
>(
&self,
then: F,
) -> Result<(), BlockStorageError>;
Expand Down Expand Up @@ -423,9 +425,11 @@ impl BlockStorage for SimpleFileStorage {
}
Ok(())
}
fn read_blocks<F: FnMut((node::Block, PathBuf))>(
fn read_blocks<
F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>,
>(
&self,
then: F,
mut then: F,
) -> Result<(), BlockStorageError> {
let file_paths = get_ordered_blocks_path(&self.path)
.map_err(|e| BlockStorageError::Read { source: Box::new(e) })?;
Expand All @@ -445,7 +449,9 @@ impl BlockStorage for SimpleFileStorage {
.ok_or(BlockStorageError::Serialization { source: None })?;
items.push((block, file_path));
}
items.into_iter().for_each(then);
for item in items.into_iter() {
then(item)?
}
Ok(())
}
fn disable(&mut self) {
Expand Down Expand Up @@ -525,7 +531,9 @@ pub struct EmptyStorage;
impl BlockStorage for EmptyStorage {
fn enable(&mut self) {}
fn disable(&mut self) {}
fn read_blocks<F: FnMut((node::Block, PathBuf))>(
fn read_blocks<
F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>,
>(
&self,
_: F,
) -> Result<(), BlockStorageError> {
Expand Down
2 changes: 1 addition & 1 deletion kindelia_core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub(crate) fn get_time_micro() -> u128 {
/// since epoch.
#[derive(Error, Debug)]
#[error("SystemTime precedes the unix epoch. {now:?} < {epoch:?}")]
pub(crate) struct EpochError {
pub struct EpochError {
pub now: SystemTime,
pub epoch: SystemTime,
pub source: SystemTimeError,
Expand Down