From ece9ada5d99f1493e67162236440083b44848a6c Mon Sep 17 00:00:00 2001 From: Jayanring <956165026@qq.com> Date: Wed, 26 Jul 2023 16:27:03 +0800 Subject: [PATCH 01/20] optim crypto_check --- src/core/chain.rs | 5 ++++- src/core/controller.rs | 4 ++-- src/crypto.rs | 14 ++++++++------ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/core/chain.rs b/src/core/chain.rs index eacf414..83e743d 100644 --- a/src/core/chain.rs +++ b/src/core/chain.rs @@ -513,7 +513,10 @@ impl Chain { .auditor_check_batch(block.body.as_ref().ok_or(StatusCodeEnum::NoneBlockBody)?)? } - crypto_check_batch_async(block.body.clone().ok_or(StatusCodeEnum::NoneBlockBody)?).await?; + crypto_check_batch_async(Arc::new( + block.body.clone().ok_or(StatusCodeEnum::NoneBlockBody)?, + )) + .await?; self.finalize_block(block, block_hash.clone()).await?; diff --git a/src/core/controller.rs b/src/core/controller.rs index 61acfe9..2fc6a53 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -238,7 +238,7 @@ impl Controller { auditor.auditor_check(&raw_tx)?; } - crypto_check_async(raw_tx.clone()).await?; + crypto_check_async(Arc::new(raw_tx.clone())).await?; let res = { let mut pool = self.pool.write().await; @@ -268,7 +268,7 @@ impl Controller { raw_txs: RawTransactions, broadcast: bool, ) -> Result { - crypto_check_batch_async(raw_txs.clone()).await?; + crypto_check_batch_async(Arc::new(raw_txs.clone())).await?; let mut hashes = Vec::new(); { diff --git a/src/crypto.rs b/src/crypto.rs index 199c36f..df3e27e 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use cita_cloud_proto::{ blockchain::{RawTransaction, RawTransactions}, status_code::StatusCodeEnum, @@ -64,14 +66,14 @@ pub async fn recover_signature_async( } } -pub async fn crypto_check_async(tx: RawTransaction) -> Result<(), StatusCodeEnum> { +pub async fn crypto_check_async(tx: Arc) -> Result<(), StatusCodeEnum> { let (send, recv) = tokio::sync::oneshot::channel(); rayon::spawn(move || { cfg_if::cfg_if! { if #[cfg(feature = "sm")] { - let _ = send.send(crypto_sm::sm::crypto_check(tx)); + let _ = send.send(crypto_sm::sm::crypto_check(&tx)); } else if #[cfg(feature = "eth")] { - let _ = send.send(crypto_eth::eth::crypto_check(tx)); + let _ = send.send(crypto_eth::eth::crypto_check(&tx)); } } }); @@ -84,14 +86,14 @@ pub async fn crypto_check_async(tx: RawTransaction) -> Result<(), StatusCodeEnum } } -pub async fn crypto_check_batch_async(txs: RawTransactions) -> Result<(), StatusCodeEnum> { +pub async fn crypto_check_batch_async(txs: Arc) -> Result<(), StatusCodeEnum> { let (send, recv) = tokio::sync::oneshot::channel(); std::thread::spawn(move || { cfg_if::cfg_if! { if #[cfg(feature = "sm")] { - let _ = send.send(crypto_sm::sm::crypto_check_batch(txs)); + let _ = send.send(crypto_sm::sm::crypto_check_batch(&txs)); } else if #[cfg(feature = "eth")] { - let _ = send.send(crypto_eth::eth::crypto_check_batch(txs)); + let _ = send.send(crypto_eth::eth::crypto_check_batch(&txs)); } } }); From d87e8b6bcf1d4597345c475bac795bd6e722cb74 Mon Sep 17 00:00:00 2001 From: JLer Date: Thu, 3 Aug 2023 14:34:34 +0800 Subject: [PATCH 02/20] fix typo --- src/core/chain.rs | 6 +-- src/core/controller.rs | 62 +++++++++++++------------- src/grpc_client/storage.rs | 4 +- src/grpc_server/health_check_server.rs | 6 +-- 4 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/core/chain.rs b/src/core/chain.rs index 83e743d..6b6d02f 100644 --- a/src/core/chain.rs +++ b/src/core/chain.rs @@ -298,7 +298,7 @@ impl Chain { ); } - // update auditor pool and systemconfig + // update auditor pool and system_config // even empty block, we also need update current height of auditor { if let Some(raw_txs) = block.body.clone() { @@ -396,7 +396,7 @@ impl Chain { if prev_hash != self.block_hash { warn!( - "commit block({}) failed: get prehash: 0x{}, correct prehash: 0x{}. hash: 0x{}", + "commit block({}) failed: get prevhash: 0x{}, correct prevhash: 0x{}. hash: 0x{}", height, hex::encode(&prev_hash), hex::encode(&self.block_hash), @@ -491,7 +491,7 @@ impl Chain { if header.prevhash != self.block_hash { warn!( - "process block({}) failed: get prehash: 0x{}, correct prehash: 0x{}. hash: 0x{}", + "process block({}) failed: get prevhash: 0x{}, correct prevhash: 0x{}. hash: 0x{}", height, hex::encode(&header.prevhash), hex::encode(&self.block_hash), diff --git a/src/core/controller.rs b/src/core/controller.rs index 2fc6a53..5fc1776 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -417,9 +417,9 @@ impl Controller { pub async fn rpc_get_node_status(&self, state: &State) -> Result { let peers_count = get_network_status().await?.peer_count; - let peers_netinfo = get_peers_info().await?; + let peers_net_info = get_peers_info().await?; let mut peers_status = vec![]; - for p in peers_netinfo.nodes { + for p in peers_net_info.nodes { let na = NodeAddress(p.origin); let (address, height) = self .node_manager @@ -620,7 +620,7 @@ impl Controller { .ok_or(StatusCodeEnum::NoneBlockBody)? .tx_hashes; let tx_count = tx_hashes.len(); - let mut transantion_data = Vec::new(); + let mut transaction_data = Vec::new(); let mut miss_tx_hash_list = Vec::new(); for tx_hash in tx_hashes { if let Some(tx) = self.pool.read().await.pool_get_tx(tx_hash) { @@ -628,7 +628,7 @@ impl Controller { if total_quota > sys_config.quota_limit { return Err(StatusCodeEnum::QuotaUsedExceed); } - transantion_data.extend_from_slice(tx_hash); + transaction_data.extend_from_slice(tx_hash); } else { miss_tx_hash_list.push(tx_hash); } @@ -651,7 +651,7 @@ impl Controller { return Err(StatusCodeEnum::NoneRawTx); } - let transactions_root = hash_data(&transantion_data); + let transactions_root = hash_data(&transaction_data); if transactions_root != header.transactions_root { warn!( "check proposal({}) failed: header transactions_root: {}, controller calculate: {}", @@ -781,11 +781,11 @@ impl Controller { .address .clone() .ok_or(StatusCodeEnum::NoProvideAddress)?; - let node_orign = NodeAddress::from(&node); + let node_origin = NodeAddress::from(&node); match self .node_manager - .set_node(&node_orign, status.clone()) + .set_node(&node_origin, status.clone()) .await { Ok(None) => { @@ -810,14 +810,14 @@ impl Controller { self.unicast_chain_status_init(msg.origin, chain_status_init) .await; self.event_sender - .send(Event::TryUpdateGlobalStatus(node_orign, status)) + .send(Event::TryUpdateGlobalStatus(node_origin, status)) .unwrap(); } return Err(status_code); } } self.event_sender - .send(Event::TryUpdateGlobalStatus(node_orign, status)) + .send(Event::TryUpdateGlobalStatus(node_origin, status)) .unwrap(); } ControllerMsgType::ChainStatusInitRequestType => { @@ -834,7 +834,7 @@ impl Controller { let own_status = self.get_status().await; let node = chain_status.address.clone().unwrap(); - let node_orign = NodeAddress::from(&node); + let node_origin = NodeAddress::from(&node); match chain_status.check(&own_status).await { Ok(()) => {} Err(e) => { @@ -850,8 +850,8 @@ impl Controller { }, ) .await; - self.delete_global_status(&node_orign).await; - self.node_manager.set_ban_node(&node_orign).await?; + self.delete_global_status(&node_origin).await; + self.node_manager.set_ban_node(&node_origin).await?; } _ => {} } @@ -861,16 +861,16 @@ impl Controller { match self .node_manager - .check_address_origin(&node_orign, NodeAddress(msg.origin)) + .check_address_origin(&node_origin, NodeAddress(msg.origin)) .await { Ok(true) => { self.node_manager - .set_node(&node_orign, chain_status.clone()) + .set_node(&node_origin, chain_status.clone()) .await?; self.event_sender - .send(Event::TryUpdateGlobalStatus(node_orign, chain_status)) + .send(Event::TryUpdateGlobalStatus(node_origin, chain_status)) .unwrap(); } // give Ok or Err for process_network_msg is same @@ -893,12 +893,12 @@ impl Controller { match respond { Respond::NotSameChain(node) => { h160_address_check(Some(&node))?; - let node_orign = NodeAddress::from(&node); + let node_origin = NodeAddress::from(&node); warn!( - "process ChainStatusRespondType failed: remote check chain_status failed: NotSameChain. ban remote node. origin: {}", node_orign + "process ChainStatusRespondType failed: remote check chain_status failed: NotSameChain. ban remote node. origin: {}", node_origin ); - self.delete_global_status(&node_orign).await; - self.node_manager.set_ban_node(&node_orign).await?; + self.delete_global_status(&node_origin).await; + self.node_manager.set_ban_node(&node_origin).await?; } } } @@ -935,12 +935,12 @@ impl Controller { match sync_block_respond.respond { // todo check origin Some(Respond::MissBlock(node)) => { - let node_orign = NodeAddress::from(&node); - warn!("misbehavior: MissBlock({})", node_orign); - controller_clone.delete_global_status(&node_orign).await; + let node_origin = NodeAddress::from(&node); + warn!("misbehavior: MissBlock({})", node_origin); + controller_clone.delete_global_status(&node_origin).await; controller_clone .node_manager - .set_misbehavior_node(&node_orign) + .set_misbehavior_node(&node_origin) .await .unwrap(); } @@ -978,13 +978,13 @@ impl Controller { msg.origin ); let node = sync_blocks.address.as_ref().unwrap(); - let node_orign = NodeAddress::from(node); + let node_origin = NodeAddress::from(node); controller_clone .node_manager - .set_misbehavior_node(&node_orign) + .set_misbehavior_node(&node_origin) .await .unwrap(); - controller_clone.delete_global_status(&node_orign).await; + controller_clone.delete_global_status(&node_origin).await; } } } @@ -1028,10 +1028,10 @@ impl Controller { use crate::protocol::sync_manager::sync_tx_respond::Respond; match sync_tx_respond.respond { Some(Respond::MissTx(node)) => { - let node_orign = NodeAddress::from(&node); - warn!("misbehavior: MissTx({})", node_orign); - self.node_manager.set_misbehavior_node(&node_orign).await?; - self.delete_global_status(&node_orign).await; + let node_origin = NodeAddress::from(&node); + warn!("misbehavior: MissTx({})", node_origin); + self.node_manager.set_misbehavior_node(&node_origin).await?; + self.delete_global_status(&node_origin).await; } Some(Respond::Ok(raw_tx)) => { self.rpc_send_raw_transaction(raw_tx, false).await?; @@ -1256,7 +1256,7 @@ impl Controller { match old_cs.height.cmp(¤t_cs.height) { Ordering::Greater => { error!( - "node status rollbacked: old height: {}, current height: {}. set it misbehavior. origin: {}", + "node status rollback: old height: {}, current height: {}. set it misbehavior. origin: {}", old_cs.height, current_cs.height, &na diff --git a/src/grpc_client/storage.rs b/src/grpc_client/storage.rs index ea91a25..2ea010a 100644 --- a/src/grpc_client/storage.rs +++ b/src/grpc_client/storage.rs @@ -36,7 +36,7 @@ pub async fn assemble_proposal( block: &CompactBlock, height: u64, ) -> Result, StatusCodeEnum> { - let pre_state_root = get_last_stateroot(height).await?; + let pre_state_root = get_last_state_root(height).await?; let proposal = ProposalInner { proposal: Some(block.clone()), @@ -52,7 +52,7 @@ pub async fn assemble_proposal( Ok(proposal_bytes) } -pub async fn get_last_stateroot(h: u64) -> Result, StatusCodeEnum> { +pub async fn get_last_state_root(h: u64) -> Result, StatusCodeEnum> { let pre_h = h - 1; let pre_height_bytes = pre_h.to_be_bytes().to_vec(); diff --git a/src/grpc_server/health_check_server.rs b/src/grpc_server/health_check_server.rs index 58e90d0..4fbb9b6 100755 --- a/src/grpc_server/health_check_server.rs +++ b/src/grpc_server/health_check_server.rs @@ -48,7 +48,7 @@ impl Health for HealthCheckServer { &self, _request: Request, ) -> Result, Status> { - info!("healthcheck entry!"); + info!("health_check entry!"); let height = self.controller.rpc_get_block_number(true).await.unwrap(); let timestamp = unix_now(); let old_height = self.height.load(Ordering::Relaxed); @@ -58,14 +58,14 @@ impl Health for HealthCheckServer { self.height.store(height, Ordering::Relaxed); self.timestamp.store(timestamp, Ordering::Relaxed); info!( - "healthcheck: block increase: {} - {}, timestamp: {}", + "health_check: block increase: {} - {}, timestamp: {}", old_height, height, timestamp ); ServingStatus::Serving.into() } else { // height not increase for a long time info!( - "healthcheck: block not increase: {}, timestamp: {} - {}", + "health_check: block not increase: {}, timestamp: {} - {}", height, old_timestamp, timestamp ); if timestamp - old_timestamp > self.timeout * 1000 { From 72811ec6ae40a6a11921f00401aeab78afabbf6f Mon Sep 17 00:00:00 2001 From: JLer Date: Tue, 8 Aug 2023 15:13:03 +0800 Subject: [PATCH 03/20] refactor tx pool --- Cargo.toml | 6 +++++ src/core/auditor.rs | 32 +++++++++++++++++++++---- src/core/controller.rs | 49 +++++++++++++++++++++++++++++++++++--- src/core/pool.rs | 34 +++++++++++++++++++++++++- src/grpc_client/storage.rs | 24 ++++++++++++++++++- 5 files changed, 135 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d1ccdfb..e94e345 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ statig = { version = "0.3", features = ["async"] } flume = "0.10" rayon = "1.0" cfg-if = "1.0" +futures = "0.3" cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs" } cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs" } @@ -32,6 +33,11 @@ cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cit crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true } crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true } +[patch."https://github.com/cita-cloud/cloud-common-rs"] +cloud-util = { package = "cloud-util", path = "../runner/cloud-common-rs/cloud-util" } +cita_cloud_proto = { package = "cita_cloud_proto", path = "../runner/cloud-common-rs/cloud-proto" } + + [build-dependencies] tonic-build = "0.9" diff --git a/src/core/auditor.rs b/src/core/auditor.rs index eee3d91..7f2c059 100644 --- a/src/core/auditor.rs +++ b/src/core/auditor.rs @@ -24,6 +24,7 @@ use cita_cloud_proto::{ }, status_code::StatusCodeEnum, }; +use futures::{stream::FuturesUnordered, StreamExt}; use crate::{ core::system_config::{SystemConfig, LOCK_ID_BUTTON, LOCK_ID_VERSION}, @@ -32,7 +33,7 @@ use crate::{ #[derive(Clone)] pub struct Auditor { - history_hashes: HashMap>>, + pub history_hashes: HashMap>>, current_block_number: u64, sys_config: SystemConfig, } @@ -62,13 +63,34 @@ impl Auditor { 1u64 }; + let mut history_hashes_num = 0; + let mut futures = FuturesUnordered::new(); + for h in begin_block_number..=init_block_number { - let block = get_compact_block(h).await.unwrap(); - let block_body = block.body.unwrap(); - self.history_hashes - .insert(h, HashSet::from_iter(block_body.tx_hashes)); + futures.push(get_compact_block(h)); + } + + while let Some(result) = futures.next().await { + match result { + Ok(block) => { + let block_body = block.body.unwrap(); + history_hashes_num += block_body.tx_hashes.len(); + self.history_hashes.insert( + block.header.unwrap().height, + HashSet::from_iter(block_body.tx_hashes), + ); + } + Err(e) => { + error!("Failed to get compact block: {}", e); + } + } } + self.current_block_number = init_block_number; + info!( + "Auditor init finished: history_hashes({})", + history_hashes_num + ); } pub fn insert_tx_hash(&mut self, h: u64, hash_list: Vec>) { diff --git a/src/core/controller.rs b/src/core/controller.rs index 5fc1776..3e0655d 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -49,7 +49,7 @@ use crate::{ network_client, storage::{ db_get_tx, get_compact_block, get_full_block, get_height_by_block_hash, get_proof, - get_state_root, load_tx_info, + get_state_root, load_tx_info, store_data, }, storage_client, }, @@ -197,6 +197,7 @@ impl Controller { .await .unwrap(); self.set_status(status.clone()).await; + self.pool.write().await.init(self.auditor.clone()).await; } // send configuration to consensus let mut server_retry_interval = @@ -247,12 +248,33 @@ impl Controller { if res { if broadcast { let mut f_pool = self.forward_pool.write().await; - f_pool.body.push(raw_tx); + f_pool.body.push(raw_tx.clone()); if f_pool.body.len() > self.config.count_per_batch { self.broadcast_send_txs(f_pool.clone()).await; f_pool.body.clear(); } } + // send to storage + tokio::spawn(async move { + let raw_txs = RawTransactions { body: vec![raw_tx] }; + let mut raw_tx_bytes = Vec::new(); + match raw_txs.encode(&mut raw_tx_bytes) { + Ok(_) => { + if store_data( + i32::from(Regions::TransactionsPool) as u32, + vec![0; 8], + raw_tx_bytes, + ) + .await + .is_success() + .is_err() + { + warn!("store raw tx failed"); + } + } + Err(_) => warn!("encode raw tx failed"), + } + }); Ok(tx_hash) } else { warn!( @@ -283,8 +305,29 @@ impl Controller { } } if broadcast { - self.broadcast_send_txs(raw_txs).await; + self.broadcast_send_txs(raw_txs.clone()).await; } + // send to storage + tokio::spawn(async move { + let mut raw_tx_bytes = Vec::new(); + match raw_txs.encode(&mut raw_tx_bytes) { + Ok(_) => { + if store_data( + i32::from(Regions::TransactionsPool) as u32, + vec![0; 8], + raw_tx_bytes, + ) + .await + .is_success() + .is_err() + { + warn!("store raw tx failed"); + } + } + Err(_) => warn!("encode raw tx failed"), + } + }); + Ok(Hashes { hashes }) } diff --git a/src/core/pool.rs b/src/core/pool.rs index f09f6f5..aa3f36d 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -17,11 +17,15 @@ use std::{ cmp::{Eq, PartialEq}, collections::HashSet, hash::{Hash, Hasher}, + sync::Arc, }; use cita_cloud_proto::blockchain::{raw_transaction::Tx, RawTransaction}; +use tokio::sync::RwLock; -use crate::util::get_tx_quota; +use crate::{grpc_client::storage::reload_transactions_pool, util::get_tx_quota}; + +use super::auditor::Auditor; // wrapper type for Hash #[derive(Clone)] @@ -72,6 +76,34 @@ impl Pool { } } + pub async fn init(&mut self, auditor: Arc>) { + let mut txns = reload_transactions_pool() + .await + .map_or_else(|_| vec![], |txns| txns.body); + info!("pool init start: txns({})", txns.len()); + { + let auditor = auditor.read().await; + let history_hashes_set: HashSet<_> = auditor + .history_hashes + .iter() + .flat_map(|(_, hashes)| hashes) + .collect(); + + txns.retain(|txn| !history_hashes_set.contains(&get_raw_tx_hash(txn).to_vec())); + } + for raw_tx in txns { + let tx_quota = get_tx_quota(&raw_tx).unwrap(); + self.txns.insert(Txn(raw_tx)); + self.pool_quota += tx_quota; + } + + info!( + "pool init finished: txns({}), pool_quota({})", + self.txns.len(), + self.pool_quota + ); + } + pub fn insert(&mut self, raw_tx: RawTransaction) -> bool { let tx_quota = get_tx_quota(&raw_tx).unwrap(); let ret = self.txns.insert(Txn(raw_tx)); diff --git a/src/grpc_client/storage.rs b/src/grpc_client/storage.rs index 2ea010a..d066343 100644 --- a/src/grpc_client/storage.rs +++ b/src/grpc_client/storage.rs @@ -15,7 +15,9 @@ use prost::Message; use cita_cloud_proto::{ - blockchain::{raw_transaction::Tx::UtxoTx, Block, CompactBlock, RawTransaction}, + blockchain::{ + raw_transaction::Tx::UtxoTx, Block, CompactBlock, RawTransaction, RawTransactions, + }, client::StorageClientTrait, common::{Proof, ProposalInner, StateRoot}, controller::BlockNumber, @@ -302,3 +304,23 @@ pub async fn get_hash_in_range(mut hash: Vec, height: u64) -> Result } Ok(hash) } + +pub async fn reload_transactions_pool() -> Result { + let raw_txs_bytes = load_data( + storage_client(), + i32::from(Regions::TransactionsPool) as u32, + vec![0; 8], + ) + .await + .map_err(|e| { + warn!("reload transactions pool failed: {}", e.to_string()); + StatusCodeEnum::LoadError + })?; + + let raw_txs = RawTransactions::decode(raw_txs_bytes.as_slice()).map_err(|_| { + warn!("reload transactions pool failed: decode RawTransactions failed"); + StatusCodeEnum::DecodeError + })?; + + Ok(raw_txs) +} From 72920c951c029436052570a7c1d53c14ac1d672b Mon Sep 17 00:00:00 2001 From: JLer Date: Wed, 9 Aug 2023 14:45:43 +0800 Subject: [PATCH 04/20] add switch for tx persistence --- src/config.rs | 3 ++ src/core/controller.rs | 79 +++++++++++++++++++++++------------------- 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/src/config.rs b/src/config.rs index 45686a1..af63557 100644 --- a/src/config.rs +++ b/src/config.rs @@ -92,6 +92,8 @@ pub struct ControllerConfig { pub is_danger: bool, /// log config pub log_config: LogConfig, + /// tx pool persistence + pub tx_persistence: bool, } impl Default for ControllerConfig { @@ -129,6 +131,7 @@ impl Default for ControllerConfig { buffer_duration: 300, is_danger: false, log_config: Default::default(), + tx_persistence: false, } } } diff --git a/src/core/controller.rs b/src/core/controller.rs index 3e0655d..8898d31 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -197,7 +197,10 @@ impl Controller { .await .unwrap(); self.set_status(status.clone()).await; - self.pool.write().await.init(self.auditor.clone()).await; + + if self.config.tx_persistence { + self.pool.write().await.init(self.auditor.clone()).await; + } } // send configuration to consensus let mut server_retry_interval = @@ -255,26 +258,28 @@ impl Controller { } } // send to storage - tokio::spawn(async move { - let raw_txs = RawTransactions { body: vec![raw_tx] }; - let mut raw_tx_bytes = Vec::new(); - match raw_txs.encode(&mut raw_tx_bytes) { - Ok(_) => { - if store_data( - i32::from(Regions::TransactionsPool) as u32, - vec![0; 8], - raw_tx_bytes, - ) - .await - .is_success() - .is_err() - { - warn!("store raw tx failed"); + if self.config.tx_persistence { + tokio::spawn(async move { + let raw_txs = RawTransactions { body: vec![raw_tx] }; + let mut raw_tx_bytes = Vec::new(); + match raw_txs.encode(&mut raw_tx_bytes) { + Ok(_) => { + if store_data( + i32::from(Regions::TransactionsPool) as u32, + vec![0; 8], + raw_tx_bytes, + ) + .await + .is_success() + .is_err() + { + warn!("store raw tx failed"); + } } + Err(_) => warn!("encode raw tx failed"), } - Err(_) => warn!("encode raw tx failed"), - } - }); + }); + } Ok(tx_hash) } else { warn!( @@ -308,25 +313,27 @@ impl Controller { self.broadcast_send_txs(raw_txs.clone()).await; } // send to storage - tokio::spawn(async move { - let mut raw_tx_bytes = Vec::new(); - match raw_txs.encode(&mut raw_tx_bytes) { - Ok(_) => { - if store_data( - i32::from(Regions::TransactionsPool) as u32, - vec![0; 8], - raw_tx_bytes, - ) - .await - .is_success() - .is_err() - { - warn!("store raw tx failed"); + if self.config.tx_persistence { + tokio::spawn(async move { + let mut raw_tx_bytes = Vec::new(); + match raw_txs.encode(&mut raw_tx_bytes) { + Ok(_) => { + if store_data( + i32::from(Regions::TransactionsPool) as u32, + vec![0; 8], + raw_tx_bytes, + ) + .await + .is_success() + .is_err() + { + warn!("store raw tx failed"); + } } + Err(_) => warn!("encode raw tx failed"), } - Err(_) => warn!("encode raw tx failed"), - } - }); + }); + } Ok(Hashes { hashes }) } From 8b64b3e1998d7c4e8280e20514c5f97f42023dfc Mon Sep 17 00:00:00 2001 From: JLer Date: Fri, 11 Aug 2023 14:37:06 +0800 Subject: [PATCH 05/20] optim: handle send event err --- src/core/controller.rs | 133 ++++++++++++++++++++++++++----------- src/core/state_machine.rs | 10 +-- src/grpc_client/storage.rs | 3 +- src/main.rs | 18 +++-- 4 files changed, 114 insertions(+), 50 deletions(-) diff --git a/src/core/controller.rs b/src/core/controller.rs index 8898d31..ba61143 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -456,10 +456,10 @@ impl Controller { controller_for_add.config.server_retry_interval, )) .await; - controller_for_add + let _ = controller_for_add .event_sender .send(Event::BroadcastCSI) - .unwrap(); + .map_err(|e| warn!("rpc_add_node: send broadcast csi event failed: {}", e)); }); } res @@ -729,7 +729,12 @@ impl Controller { let mut wr = self.chain.write().await; wr.clear_candidate(); } - self.event_sender.send(Event::TrySyncBlock).unwrap(); + let _ = self.event_sender.send(Event::TrySyncBlock).map_err(|e| { + warn!( + "rpc_get_cross_chain_proof: send Event::TrySyncBlock failed: {:?}", + e + ) + }); } _ => {} } @@ -765,7 +770,12 @@ impl Controller { status.address = Some(self.local_address.clone()); self.set_status(status.clone()).await; self.broadcast_chain_status(status).await; - self.event_sender.send(Event::TrySyncBlock).unwrap(); + let _ = self.event_sender.send(Event::TrySyncBlock).map_err(|e| { + warn!( + "chain_commit_block: send Event::TrySyncBlock failed: {:?}", + e + ) + }); Ok(config) } Err(StatusCodeEnum::ProposalTooHigh) => { @@ -774,7 +784,12 @@ impl Controller { let mut wr = self.chain.write().await; wr.clear_candidate(); } - self.event_sender.send(Event::TrySyncBlock).unwrap(); + let _ = self.event_sender.send(Event::TrySyncBlock).map_err(|e| { + warn!( + "chain_commit_block: send Event::TrySyncBlock failed: {:?}", + e + ) + }); Err(StatusCodeEnum::ProposalTooHigh) } Err(e) => Err(e), @@ -859,16 +874,25 @@ impl Controller { let chain_status_init = self.make_csi(own_status).await?; self.unicast_chain_status_init(msg.origin, chain_status_init) .await; - self.event_sender + let _ = self + .event_sender .send(Event::TryUpdateGlobalStatus(node_origin, status)) - .unwrap(); + .map_err(|e| + warn!("process_network_msg: send Event::TryUpdateGlobalStatus failed: {:?}", e) + ); } return Err(status_code); } } - self.event_sender + let _ = self + .event_sender .send(Event::TryUpdateGlobalStatus(node_origin, status)) - .unwrap(); + .map_err(|e| { + warn!( + "process_network_msg: send Event::TryUpdateGlobalStatus failed: {:?}", + e + ) + }); } ControllerMsgType::ChainStatusInitRequestType => { let chain_status_init = self.make_csi(self.get_status().await).await?; @@ -918,10 +942,12 @@ impl Controller { self.node_manager .set_node(&node_origin, chain_status.clone()) .await?; - - self.event_sender + let _ = self + .event_sender .send(Event::TryUpdateGlobalStatus(node_origin, chain_status)) - .unwrap(); + .map_err(|e| + warn!("process_network_msg: send Event::TryUpdateGlobalStatus failed: {:?}", e) + ); } // give Ok or Err for process_network_msg is same Err(StatusCodeEnum::AddressOriginCheckError) | Ok(false) => { @@ -965,9 +991,15 @@ impl Controller { "get SyncBlockRequest: from origin: {:x}, height: {} - {}", msg.origin, sync_block_request.start_height, sync_block_request.end_height ); - self.event_sender + let _ = self + .event_sender .send(Event::SyncBlockReq(sync_block_request, msg.origin)) - .unwrap(); + .map_err(|e| { + warn!( + "process_network_msg: send Event::SyncBlockReq failed: {:?}", + e + ) + }); } ControllerMsgType::SyncBlockRespondType => { @@ -988,11 +1020,10 @@ impl Controller { let node_origin = NodeAddress::from(&node); warn!("misbehavior: MissBlock({})", node_origin); controller_clone.delete_global_status(&node_origin).await; - controller_clone + let _ = controller_clone .node_manager .set_misbehavior_node(&node_origin) - .await - .unwrap(); + .await; } Some(Respond::Ok(sync_blocks)) => { // todo handle error @@ -1008,10 +1039,12 @@ impl Controller { ) .await { - controller_clone + let _ = controller_clone .event_sender .send(Event::SyncBlock) - .unwrap(); + .map_err(|e| + warn!("process_network_msg: send Event::SyncBlock failed: {:?}", e) + ); } } Err(StatusCodeEnum::ProvideAddressError) @@ -1029,11 +1062,10 @@ impl Controller { ); let node = sync_blocks.address.as_ref().unwrap(); let node_origin = NodeAddress::from(node); - controller_clone + let _ = controller_clone .node_manager .set_misbehavior_node(&node_origin) - .await - .unwrap(); + .await; controller_clone.delete_global_status(&node_origin).await; } } @@ -1202,7 +1234,12 @@ impl Controller { ); self.update_global_status(node.to_owned(), status).await; if global_height > own_status.height { - self.event_sender.send(Event::TrySyncBlock).unwrap(); + let _ = self.event_sender.send(Event::TrySyncBlock).map_err(|e| { + warn!( + "try_update_global_status: send TrySyncBlock event failed: {}", + e + ) + }); } if (!in_sync || global_height % self.config.force_sync_epoch == 0) && self @@ -1210,7 +1247,12 @@ impl Controller { .contains_block(own_status.height + 1) .await { - self.event_sender.send(Event::SyncBlock).unwrap(); + let _ = self.event_sender.send(Event::SyncBlock).map_err(|e| { + warn!( + "try_update_global_status: send SyncBlock event failed: {}", + e + ) + }); } return Ok(true); @@ -1218,7 +1260,12 @@ impl Controller { // request block if own height behind remote's if global_height > own_status.height { - self.event_sender.send(Event::TrySyncBlock).unwrap(); + let _ = self.event_sender.send(Event::TrySyncBlock).map_err(|e| { + warn!( + "try_update_global_status: send TrySyncBlock event failed: {}", + e + ) + }); } Ok(false) @@ -1341,18 +1388,15 @@ impl Controller { } } - pub async fn handle_broadcast_csi(&self) { + pub async fn handle_broadcast_csi(&self) -> Result<(), StatusCodeEnum> { info!("receive BroadCastCSI event"); let status = self.get_status().await; let mut chain_status_bytes = Vec::new(); - status - .encode(&mut chain_status_bytes) - .map_err(|_| { - warn!("process BroadCastCSI failed: encode ChainStatus failed"); - StatusCodeEnum::EncodeError - }) - .unwrap(); + status.encode(&mut chain_status_bytes).map_err(|_| { + warn!("handle_broadcast_csi failed: encode ChainStatus failed"); + StatusCodeEnum::EncodeError + })?; let msg_hash = hash_data(&chain_status_bytes); cfg_if::cfg_if! { @@ -1363,7 +1407,7 @@ impl Controller { } } - let signature = crypto.sign_message(&msg_hash).unwrap(); + let signature = crypto.sign_message(&msg_hash)?; self.broadcast_chain_status_init(ChainStatusInit { chain_status: Some(status), @@ -1371,10 +1415,14 @@ impl Controller { }) .await .await - .unwrap(); + .map_err(|_| { + warn!("handle_broadcast_csi: broadcast ChainStatusInit failed"); + StatusCodeEnum::FatalError + })?; + Ok(()) } - pub async fn syncing_block(&self) { + pub async fn syncing_block(&self) -> Result<(), StatusCodeEnum> { let (global_address, _) = self.get_global_status().await; let mut own_status = self.get_status().await; let mut syncing = false; @@ -1386,7 +1434,7 @@ impl Controller { chain.clear_candidate(); match chain.process_block(block).await { Ok((consensus_config, mut status)) => { - reconfigure(consensus_config).await.is_success().unwrap(); + reconfigure(consensus_config).await.is_success()?; status.address = Some(self.local_address.clone()); self.set_status(status.clone()).await; own_status = status.clone(); @@ -1435,8 +1483,12 @@ impl Controller { } } if syncing { - self.event_sender.send(Event::TrySyncBlock).unwrap(); + let _ = self + .event_sender + .send(Event::TrySyncBlock) + .map_err(|_| warn!("syncing_block: send TrySyncBlock event failed")); } + Ok(()) } pub async fn handle_sync_block_req(&self, req: &SyncBlockRequest, origin: &u64) { @@ -1492,7 +1544,10 @@ impl Controller { if self.get_global_status().await.1.height > inner_health_check.current_height { self.chain.write().await.clear_candidate(); } - self.event_sender.send(Event::BroadcastCSI).unwrap(); + let _ = self + .event_sender + .send(Event::BroadcastCSI) + .map_err(|_| warn!("inner_health_check: send BroadcastCSI event failed")); inner_health_check.retry_limit += inner_health_check.tick; inner_health_check.tick = 0; } else if self.get_status().await.height < inner_health_check.current_height { diff --git a/src/core/state_machine.rs b/src/core/state_machine.rs index 5aef4be..6003607 100644 --- a/src/core/state_machine.rs +++ b/src/core/state_machine.rs @@ -68,7 +68,7 @@ impl ControllerStateMachine { } Event::SyncBlock => handle_sync_block(context).await, Event::BroadcastCSI => { - context.handle_broadcast_csi().await; + let _ = context.handle_broadcast_csi().await; Handled } Event::RecordAllNode => { @@ -136,7 +136,9 @@ impl ControllerStateMachine { #[action] async fn enter_syncing(&self, context: &mut Controller) { - context.syncing_block().await; + if let Err(e) = context.syncing_block().await { + warn!("syncing block error: {:?}", e) + } } #[action] @@ -197,11 +199,11 @@ async fn try_sync_block(context: &Controller, in_prepare_sync: bool) -> statig:: .get_sync_block_req(current_height, &global_status) .await { - controller_clone + let _ = controller_clone .unicast_sync_block(global_address.0, sync_req.clone()) .await .await - .unwrap(); + .map_err(|e| warn!("try_sync_block: unicast_sync_block error: {:?}", e)); } Transition(State::prepare_sync()) } diff --git a/src/grpc_client/storage.rs b/src/grpc_client/storage.rs index d066343..5505a3e 100644 --- a/src/grpc_client/storage.rs +++ b/src/grpc_client/storage.rs @@ -255,8 +255,7 @@ pub async fn get_hash_in_range(mut hash: Vec, height: u64) -> Result i32::from(Regions::TransactionHash2blockHeight) as u32, hash.clone(), ) - .await - .unwrap(); + .await?; let mut tx_height = u64_decode(height_bytes); while tx_height >= height { hash = match load_data( diff --git a/src/main.rs b/src/main.rs index 51e7a9b..ba32cd0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -165,8 +165,7 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> { i32::from(Regions::Global) as u32, 1u64.to_be_bytes().to_vec(), ) - .await - .unwrap(); + .await?; } break; } @@ -229,15 +228,24 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> { _ = reconnect_interval.tick() => { event_sender .send(Event::BroadcastCSI) - .unwrap(); + .map_err(|e| { + warn!("send broadcast csi event failed: {}", e); + StatusCodeEnum::FatalError + })?; event_sender .send(Event::RecordAllNode) - .unwrap(); + .map_err(|e| { + warn!("send record all node event failed: {}", e); + StatusCodeEnum::FatalError + })?; }, _ = inner_health_check_interval.tick() => { event_sender .send(Event::InnerHealthCheck) - .unwrap(); + .map_err(|e| { + warn!("send inner health check event failed: {}", e); + StatusCodeEnum::FatalError + })?; }, _ = forward_interval.tick() => { controller From 07a82e69abb4d589dfe78ef680c365189efe09fd Mon Sep 17 00:00:00 2001 From: JLer Date: Fri, 18 Aug 2023 10:21:07 +0800 Subject: [PATCH 06/20] update dependencies --- Cargo.toml | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e94e345..a6439bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -clap = { version = "4.2", features = ["derive"] } +clap = { version = "4.3", features = ["derive"] } tonic = "0.9" prost = "0.11" -tokio = { version = "1.27", features = ["full"] } +tokio = { version = "1.32", features = ["full"] } rand = "0.8" toml = "0.7" serde = "1.0" @@ -22,8 +22,8 @@ tracing = "0.1" tonic-reflection = "0.9" tonic-web = "0.9" statig = { version = "0.3", features = ["async"] } -flume = "0.10" -rayon = "1.0" +flume = "0.11" +rayon = "1.7" cfg-if = "1.0" futures = "0.3" @@ -33,11 +33,6 @@ cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cit crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true } crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true } -[patch."https://github.com/cita-cloud/cloud-common-rs"] -cloud-util = { package = "cloud-util", path = "../runner/cloud-common-rs/cloud-util" } -cita_cloud_proto = { package = "cita_cloud_proto", path = "../runner/cloud-common-rs/cloud-proto" } - - [build-dependencies] tonic-build = "0.9" From 5898abce694f4d8c6f3843f3a619e1558477aa21 Mon Sep 17 00:00:00 2001 From: JLer Date: Fri, 18 Aug 2023 16:42:14 +0800 Subject: [PATCH 07/20] optim regions --- src/core/chain.rs | 4 +-- src/core/controller.rs | 22 ++++++------- src/core/system_config.rs | 20 ++++++------ src/grpc_client/storage.rs | 65 ++++++++++++++------------------------ src/main.rs | 9 ++---- 5 files changed, 46 insertions(+), 74 deletions(-) diff --git a/src/core/chain.rs b/src/core/chain.rs index 6b6d02f..78eb02a 100644 --- a/src/core/chain.rs +++ b/src/core/chain.rs @@ -285,7 +285,7 @@ impl Chain { }; store_data( - i32::from(Regions::AllBlockData) as u32, + Regions::AllBlockData as u32, block_height.to_be_bytes().to_vec(), block_bytes, ) @@ -312,7 +312,7 @@ impl Chain { // if sys_config changed, store utxo tx hash into global region let lock_id = utxo_tx.transaction.as_ref().unwrap().lock_id; store_data( - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), utxo_tx.transaction_hash, ) diff --git a/src/core/controller.rs b/src/core/controller.rs index ba61143..1c4a2e6 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -265,7 +265,7 @@ impl Controller { match raw_txs.encode(&mut raw_tx_bytes) { Ok(_) => { if store_data( - i32::from(Regions::TransactionsPool) as u32, + Regions::TransactionsPool as u32, vec![0; 8], raw_tx_bytes, ) @@ -318,14 +318,10 @@ impl Controller { let mut raw_tx_bytes = Vec::new(); match raw_txs.encode(&mut raw_tx_bytes) { Ok(_) => { - if store_data( - i32::from(Regions::TransactionsPool) as u32, - vec![0; 8], - raw_tx_bytes, - ) - .await - .is_success() - .is_err() + if store_data(Regions::TransactionsPool as u32, vec![0; 8], raw_tx_bytes) + .await + .is_success() + .is_err() { warn!("store raw tx failed"); } @@ -341,7 +337,7 @@ impl Controller { pub async fn rpc_get_block_hash(&self, block_number: u64) -> Result, StatusCodeEnum> { load_data( storage_client(), - i32::from(Regions::BlockHash) as u32, + Regions::BlockHash as u32, block_number.to_be_bytes().to_vec(), ) .await @@ -383,7 +379,7 @@ impl Controller { ) -> Result { let block_number = load_data( storage_client(), - i32::from(Regions::BlockHash2blockHeight) as u32, + Regions::BlockHash2blockHeight as u32, hash.clone(), ) .await @@ -605,7 +601,7 @@ impl Controller { //check pre_state_root in proposal let pre_state_root = load_data( storage_client(), - i32::from(Regions::Result) as u32, + Regions::Result as u32, pre_height_bytes.clone(), ) .await?; @@ -637,7 +633,7 @@ impl Controller { //check timestamp in block header let pre_compact_block_bytes = load_data( storage_client(), - i32::from(Regions::CompactBlock) as u32, + Regions::CompactBlock as u32, pre_height_bytes.clone(), ) .await?; diff --git a/src/core/system_config.rs b/src/core/system_config.rs index 8bcd34c..234a4a2 100644 --- a/src/core/system_config.rs +++ b/src/core/system_config.rs @@ -119,7 +119,7 @@ impl SystemConfig { // region 0 global match load_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), ) .await @@ -156,7 +156,7 @@ impl SystemConfig { LOCK_ID_VERSION => { store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), self.version.to_be_bytes().to_vec(), ) @@ -166,7 +166,7 @@ impl SystemConfig { LOCK_ID_CHAIN_ID => { store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), self.chain_id.clone(), ) @@ -176,7 +176,7 @@ impl SystemConfig { LOCK_ID_ADMIN => { store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), self.admin.clone(), ) @@ -186,7 +186,7 @@ impl SystemConfig { LOCK_ID_BLOCK_INTERVAL => { store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), self.block_interval.to_be_bytes().to_vec(), ) @@ -200,7 +200,7 @@ impl SystemConfig { } store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), validators, ) @@ -210,7 +210,7 @@ impl SystemConfig { LOCK_ID_EMERGENCY_BRAKE => { store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), vec![], ) @@ -220,7 +220,7 @@ impl SystemConfig { LOCK_ID_BLOCK_LIMIT => { store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), self.block_limit.to_be_bytes().to_vec(), ) @@ -230,7 +230,7 @@ impl SystemConfig { LOCK_ID_QUOTA_LIMIT => { store_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, lock_id.to_be_bytes().to_vec(), self.quota_limit.to_be_bytes().to_vec(), ) @@ -357,7 +357,7 @@ impl SystemConfig { pub async fn modify_sys_config_by_utxotx_hash(&mut self, utxo_hash: Vec) -> StatusCodeEnum { match load_data( storage_client(), - i32::from(Regions::Transactions) as u32, + Regions::Transactions as u32, utxo_hash.clone(), ) .await diff --git a/src/grpc_client/storage.rs b/src/grpc_client/storage.rs index 5505a3e..2159fa6 100644 --- a/src/grpc_client/storage.rs +++ b/src/grpc_client/storage.rs @@ -60,7 +60,7 @@ pub async fn get_last_state_root(h: u64) -> Result, StatusCodeEnum> { let state_root = load_data( storage_client(), - i32::from(Regions::Result) as u32, + Regions::Result as u32, pre_height_bytes.clone(), ) .await?; @@ -91,12 +91,7 @@ pub async fn load_data_maybe_empty(region: u32, key: Vec) -> Result, pub async fn get_full_block(height: u64) -> Result { let height_bytes = height.to_be_bytes().to_vec(); - let block_bytes = load_data( - storage_client(), - i32::from(Regions::FullBlock) as u32, - height_bytes, - ) - .await?; + let block_bytes = load_data(storage_client(), Regions::FullBlock as u32, height_bytes).await?; Block::decode(block_bytes.as_slice()).map_err(|_| { warn!("get full block failed: decode Block failed"); @@ -109,7 +104,7 @@ pub async fn db_get_tx(tx_hash: &[u8]) -> Result let tx_bytes = load_data( storage_client(), - i32::from(Regions::Transactions) as u32, + Regions::Transactions as u32, tx_hash_bytes, ) .await @@ -137,7 +132,7 @@ pub async fn load_tx_info(tx_hash: &[u8]) -> Result<(u64, u64), StatusCodeEnum> let height_bytes = load_data( storage_client(), - i32::from(Regions::TransactionHash2blockHeight) as u32, + Regions::TransactionHash2blockHeight as u32, tx_hash_bytes.clone(), ) .await @@ -152,7 +147,7 @@ pub async fn load_tx_info(tx_hash: &[u8]) -> Result<(u64, u64), StatusCodeEnum> let tx_index_bytes = load_data( storage_client(), - i32::from(Regions::TransactionIndex) as u32, + Regions::TransactionIndex as u32, tx_hash_bytes, ) .await @@ -174,7 +169,7 @@ pub async fn load_tx_info(tx_hash: &[u8]) -> Result<(u64, u64), StatusCodeEnum> pub async fn get_height_by_block_hash(hash: Vec) -> Result { let block_number = load_data( storage_client(), - i32::from(Regions::BlockHash2blockHeight) as u32, + Regions::BlockHash2blockHeight as u32, hash.clone(), ) .await @@ -195,7 +190,7 @@ pub async fn get_compact_block(height: u64) -> Result Result Result { let height_bytes = height.to_be_bytes().to_vec(); - let proof = load_data( - storage_client(), - i32::from(Regions::Proof) as u32, - height_bytes, - ) - .await - .map_err(|e| { - warn!("get proof({}) failed: {}", height, e.to_string()); - StatusCodeEnum::NoProof - })?; + let proof = load_data(storage_client(), Regions::Proof as u32, height_bytes) + .await + .map_err(|e| { + warn!("get proof({}) failed: {}", height, e.to_string()); + StatusCodeEnum::NoProof + })?; Ok(Proof { proof }) } @@ -235,16 +226,12 @@ pub async fn get_proof(height: u64) -> Result { pub async fn get_state_root(height: u64) -> Result { let height_bytes = height.to_be_bytes().to_vec(); - let state_root = load_data( - storage_client(), - i32::from(Regions::Result) as u32, - height_bytes, - ) - .await - .map_err(|e| { - warn!("get state_root({}) failed: {}", height, e.to_string()); - StatusCodeEnum::NoStateRoot - })?; + let state_root = load_data(storage_client(), Regions::Result as u32, height_bytes) + .await + .map_err(|e| { + warn!("get state_root({}) failed: {}", height, e.to_string()); + StatusCodeEnum::NoStateRoot + })?; Ok(StateRoot { state_root }) } @@ -252,19 +239,13 @@ pub async fn get_state_root(height: u64) -> Result { pub async fn get_hash_in_range(mut hash: Vec, height: u64) -> Result, StatusCodeEnum> { let height_bytes = load_data( storage_client(), - i32::from(Regions::TransactionHash2blockHeight) as u32, + Regions::TransactionHash2blockHeight as u32, hash.clone(), ) .await?; let mut tx_height = u64_decode(height_bytes); while tx_height >= height { - hash = match load_data( - storage_client(), - i32::from(Regions::Transactions) as u32, - hash.clone(), - ) - .await - { + hash = match load_data(storage_client(), Regions::Transactions as u32, hash.clone()).await { Ok(raw_tx_bytes) => { if let UtxoTx(tx) = RawTransaction::decode(raw_tx_bytes.as_slice()) .unwrap() @@ -294,7 +275,7 @@ pub async fn get_hash_in_range(mut hash: Vec, height: u64) -> Result } else { let height_bytes = load_data( storage_client(), - i32::from(Regions::TransactionHash2blockHeight) as u32, + Regions::TransactionHash2blockHeight as u32, hash.clone(), ) .await?; @@ -307,7 +288,7 @@ pub async fn get_hash_in_range(mut hash: Vec, height: u64) -> Result pub async fn reload_transactions_pool() -> Result { let raw_txs_bytes = load_data( storage_client(), - i32::from(Regions::TransactionsPool) as u32, + Regions::TransactionsPool as u32, vec![0; 8], ) .await diff --git a/src/main.rs b/src/main.rs index ba32cd0..3c80e9f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -145,12 +145,7 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> { loop { server_retry_interval.tick().await; { - match load_data_maybe_empty( - i32::from(Regions::Global) as u32, - 0u64.to_be_bytes().to_vec(), - ) - .await - { + match load_data_maybe_empty(Regions::Global as u32, 0u64.to_be_bytes().to_vec()).await { Ok(current_block_number_bytes) => { info!("storage service ready, get current height success"); if current_block_number_bytes.is_empty() { @@ -162,7 +157,7 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> { current_block_number = u64_decode(current_block_number_bytes); current_block_hash = load_data( storage_client(), - i32::from(Regions::Global) as u32, + Regions::Global as u32, 1u64.to_be_bytes().to_vec(), ) .await?; From 64c8469c91c0c60d2f1c4bfb033f5f87238c6dc5 Mon Sep 17 00:00:00 2001 From: JLer Date: Mon, 21 Aug 2023 09:44:53 +0800 Subject: [PATCH 08/20] optim: restart when grpc server err --- src/grpc_server/mod.rs | 10 +++++----- src/main.rs | 22 +++++++++++++++++++++- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/grpc_server/mod.rs b/src/grpc_server/mod.rs index fb718be..1df91ac 100644 --- a/src/grpc_server/mod.rs +++ b/src/grpc_server/mod.rs @@ -58,13 +58,12 @@ pub(crate) async fn grpc_serve( })?; let layer = if config.enable_metrics { - tokio::spawn(run_metrics_exporter(config.metrics_port)); - - Some( + Some(( tower::ServiceBuilder::new() .layer(MiddlewareLayer::new(config.metrics_buckets)) .into_inner(), - ) + tokio::spawn(run_metrics_exporter(config.metrics_port)), + )) } else { None }; @@ -81,7 +80,7 @@ pub(crate) async fn grpc_serve( let http2_keepalive_interval = config.http2_keepalive_interval; let http2_keepalive_timeout = config.http2_keepalive_timeout; let tcp_keepalive = config.tcp_keepalive; - if let Some(layer) = layer { + if let Some((layer, metrics_exporter_join_handle)) = layer { Server::builder() .accept_http1(true) .http2_keepalive_interval(Some(Duration::from_secs(http2_keepalive_interval))) @@ -121,6 +120,7 @@ pub(crate) async fn grpc_serve( .await .map_err(|e| { warn!("start controller grpc server failed: {:?} ", e); + metrics_exporter_join_handle.abort(); StatusCodeEnum::FatalError })?; } else { diff --git a/src/main.rs b/src/main.rs index 3c80e9f..c58477c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -203,12 +203,13 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> { .await, )); - tokio::spawn(grpc_serve( + let mut grpc_join_handle = tokio::spawn(grpc_serve( controller.clone(), controller_state_machine.clone(), config.clone(), rx_signal.clone(), )); + let mut restart_num = 0; let mut reconnect_interval = time::interval(Duration::from_secs(config.origin_node_reconnect_interval)); @@ -259,6 +260,25 @@ async fn run(opts: RunOpts) -> Result<(), StatusCodeEnum> { break; } } + if grpc_join_handle.is_finished() { + if restart_num < 10 { + info!( + "controller grpc server has exited, try to start again({})...", + restart_num + ); + tokio::time::sleep(Duration::from_secs(config.server_retry_interval)).await; + grpc_join_handle = tokio::spawn(grpc_serve( + controller.clone(), + controller_state_machine.clone(), + config.clone(), + rx_signal.clone(), + )); + restart_num += 1; + } else { + info!("controller grpc server has exited, and restart failed, exit!",); + break; + } + } } Ok(()) } From af259855e3fe9080795bf0c86a0f0657dd1537ff Mon Sep 17 00:00:00 2001 From: JLer Date: Mon, 28 Aug 2023 18:18:12 +0800 Subject: [PATCH 09/20] optim sync --- Cargo.toml | 2 +- src/core/controller.rs | 2 +- src/core/state_machine.rs | 22 ++++++++-------------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a6439bc..f2a4ae3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -clap = { version = "4.3", features = ["derive"] } +clap = { version = "4.4", features = ["derive"] } tonic = "0.9" prost = "0.11" tokio = { version = "1.32", features = ["full"] } diff --git a/src/core/controller.rs b/src/core/controller.rs index 1c4a2e6..2b00151 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -532,7 +532,7 @@ impl Controller { &self, state: &State, ) -> Result<(u64, Vec), StatusCodeEnum> { - if state.matches_super(&Superstate::Sync {}) { + if matches!(state, State::Syncing {}) { return Err(StatusCodeEnum::NodeInSyncMode); } diff --git a/src/core/state_machine.rs b/src/core/state_machine.rs index 6003607..667ffc4 100644 --- a/src/core/state_machine.rs +++ b/src/core/state_machine.rs @@ -97,7 +97,7 @@ impl ControllerStateMachine { .await; Handled } - Event::TrySyncBlock => try_sync_block(context, false).await, + Event::TrySyncBlock => try_sync_block(context).await, _ => Super, } } @@ -120,7 +120,7 @@ impl ControllerStateMachine { async fn prepare_sync(&self, context: &mut Controller, event: &Event) -> Response { debug!("sync: `{event:?}`"); match event { - Event::TrySyncBlock => try_sync_block(context, true).await, + Event::TrySyncBlock => try_sync_block(context).await, _ => Super, } } @@ -129,7 +129,7 @@ impl ControllerStateMachine { async fn syncing(&self, context: &mut Controller, event: &Event) -> Response { debug!("sync: `{event:?}`"); match event { - Event::TrySyncBlock => try_sync_block(context, false).await, + Event::TrySyncBlock => try_sync_block(context).await, _ => Super, } } @@ -172,20 +172,14 @@ async fn handle_sync_block(context: &Controller) -> statig::Response { } } -async fn try_sync_block(context: &Controller, in_prepare_sync: bool) -> statig::Response { - let (_, global_status) = context.get_global_status().await; - // sync mode will return exclude global_height % context.config.force_sync_epoch == 0 - if in_prepare_sync && global_status.height % context.config.force_sync_epoch != 0 { - return Handled; - } +async fn try_sync_block(context: &Controller) -> statig::Response { + let (global_address, global_status) = context.get_global_status().await; let current_height = context.get_status().await.height; - let controller_clone = context.clone(); - let (global_address, global_status) = controller_clone.get_global_status().await; // try read chain state, if can't get chain default online state let res = { - if let Ok(chain) = controller_clone.chain.try_read() { + if let Ok(chain) = context.chain.try_read() { chain.next_step(&global_status) } else { ChainStep::BusyState @@ -194,12 +188,12 @@ async fn try_sync_block(context: &Controller, in_prepare_sync: bool) -> statig:: match res { ChainStep::SyncStep => { - if let Some(sync_req) = controller_clone + if let Some(sync_req) = context .sync_manager .get_sync_block_req(current_height, &global_status) .await { - let _ = controller_clone + let _ = context .unicast_sync_block(global_address.0, sync_req.clone()) .await .await From 108a57c7a91133b0db9a3eb15579db7d738550a2 Mon Sep 17 00:00:00 2001 From: JLer Date: Tue, 29 Aug 2023 09:54:28 +0800 Subject: [PATCH 10/20] simplify the code --- src/core/state_machine.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/core/state_machine.rs b/src/core/state_machine.rs index 667ffc4..ead90fe 100644 --- a/src/core/state_machine.rs +++ b/src/core/state_machine.rs @@ -79,6 +79,7 @@ impl ControllerStateMachine { context.inner_health_check().await; Handled } + Event::TrySyncBlock => try_sync_block(context).await, _ => Super, } } @@ -97,7 +98,6 @@ impl ControllerStateMachine { .await; Handled } - Event::TrySyncBlock => try_sync_block(context).await, _ => Super, } } @@ -117,21 +117,15 @@ impl ControllerStateMachine { } #[state(superstate = "sync")] - async fn prepare_sync(&self, context: &mut Controller, event: &Event) -> Response { - debug!("sync: `{event:?}`"); - match event { - Event::TrySyncBlock => try_sync_block(context).await, - _ => Super, - } + async fn prepare_sync(&self, event: &Event) -> Response { + debug!("prepare_sync: `{event:?}`"); + Super } #[state(superstate = "sync", entry_action = "enter_syncing")] - async fn syncing(&self, context: &mut Controller, event: &Event) -> Response { - debug!("sync: `{event:?}`"); - match event { - Event::TrySyncBlock => try_sync_block(context).await, - _ => Super, - } + async fn syncing(&self, event: &Event) -> Response { + debug!("syncing: `{event:?}`"); + Super } #[action] From 5c60de5d8fa88b5df316640bbd583df9b7d3bdd9 Mon Sep 17 00:00:00 2001 From: JLer Date: Mon, 4 Sep 2023 10:50:33 +0800 Subject: [PATCH 11/20] optim sync --- src/core/chain.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/core/chain.rs b/src/core/chain.rs index 78eb02a..e3d160f 100644 --- a/src/core/chain.rs +++ b/src/core/chain.rs @@ -549,7 +549,9 @@ impl Chain { } pub fn next_step(&self, global_status: &ChainStatus) -> ChainStep { - if global_status.height > self.block_number && self.candidates.is_empty() { + if global_status.height > self.block_number + 1 + || (global_status.height > self.block_number && self.candidates.is_empty()) + { debug!("sync mode"); ChainStep::SyncStep } else { From a8a52b9cccc5f3190df577214a4f5297c971872d Mon Sep 17 00:00:00 2001 From: JLer Date: Sat, 7 Oct 2023 11:04:09 +0800 Subject: [PATCH 12/20] bump dependencies --- Cargo.toml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2a4ae3..142341a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,21 +9,21 @@ edition = "2021" [dependencies] clap = { version = "4.4", features = ["derive"] } -tonic = "0.9" -prost = "0.11" +tonic = "0.10" +prost = "0.12" tokio = { version = "1.32", features = ["full"] } rand = "0.8" -toml = "0.7" +toml = "0.8" serde = "1.0" serde_derive = "1.0" hex = "0.4" tower = "0.4" tracing = "0.1" -tonic-reflection = "0.9" -tonic-web = "0.9" +tonic-reflection = "0.10" +tonic-web = "0.10" statig = { version = "0.3", features = ["async"] } flume = "0.11" -rayon = "1.7" +rayon = "1.8" cfg-if = "1.0" futures = "0.3" @@ -34,7 +34,7 @@ crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true } [build-dependencies] -tonic-build = "0.9" +tonic-build = "0.10" [profile.release.package."*"] # Set the default for dependencies. From fc27668afe7a66a7765906c1fedf73392f82b646 Mon Sep 17 00:00:00 2001 From: yieazy Date: Tue, 29 Aug 2023 17:12:09 +0800 Subject: [PATCH 13/20] chore: bump to 6.7.2-beta.1 --- Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 142341a..d5b2f08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "controller" -version = "6.7.0" +version = "6.7.2-beta.1" authors = ["Rivtower Technologies "] license = "Apache-2.0" edition = "2021" @@ -27,11 +27,11 @@ rayon = "1.8" cfg-if = "1.0" futures = "0.3" -cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs" } -cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs" } +cloud-util = "0.8.0-beta.1" +cita_cloud_proto = "=6.7.2-beta.1" -crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true } -crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true } +crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true, branch = "v6.7.2-beta.1" } +crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true, branch = "v6.7.2-beta.1" } [build-dependencies] tonic-build = "0.10" From f59cc21ccbc31dbbf68e53c47a488f89f0e35308 Mon Sep 17 00:00:00 2001 From: yieazy Date: Fri, 13 Oct 2023 15:29:12 +0800 Subject: [PATCH 14/20] chore: bump to 6.7.2-beta.2 --- Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5b2f08..9fcb53c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "controller" -version = "6.7.2-beta.1" +version = "6.7.2-beta.2" authors = ["Rivtower Technologies "] license = "Apache-2.0" edition = "2021" @@ -27,11 +27,11 @@ rayon = "1.8" cfg-if = "1.0" futures = "0.3" -cloud-util = "0.8.0-beta.1" -cita_cloud_proto = "=6.7.2-beta.1" +cloud-util = "=0.8.0-beta.2" +cita_cloud_proto = "=6.7.2-beta.2" -crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true, branch = "v6.7.2-beta.1" } -crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true, branch = "v6.7.2-beta.1" } +crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true, branch = "v6.7.2-beta.2" } +crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true, branch = "v6.7.2-beta.2" } [build-dependencies] tonic-build = "0.10" From 05c96160ed9a4a6bf388f910d627dd1c90b7b1f4 Mon Sep 17 00:00:00 2001 From: yieazy Date: Wed, 1 Nov 2023 14:34:56 +0800 Subject: [PATCH 15/20] chore: bump to v6.7.2-beta.3 --- Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9fcb53c..9d1db1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "controller" -version = "6.7.2-beta.2" +version = "6.7.2-beta.3" authors = ["Rivtower Technologies "] license = "Apache-2.0" edition = "2021" @@ -27,11 +27,11 @@ rayon = "1.8" cfg-if = "1.0" futures = "0.3" -cloud-util = "=0.8.0-beta.2" -cita_cloud_proto = "=6.7.2-beta.2" +cloud-util = "=0.8.0-beta.3" +cita_cloud_proto = "=6.7.2-beta.3" -crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true, branch = "v6.7.2-beta.2" } -crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true, branch = "v6.7.2-beta.2" } +crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true, branch = "v6.7.2-beta.3" } +crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true, branch = "v6.7.2-beta.3" } [build-dependencies] tonic-build = "0.10" From 9e3ef0326009c458af899a388a3332ea61069b35 Mon Sep 17 00:00:00 2001 From: yieazy Date: Mon, 6 Nov 2023 17:42:02 +0800 Subject: [PATCH 16/20] chore: main branch bump to 6.7.2-beta.3 --- Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9d1db1e..085799d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,11 +27,11 @@ rayon = "1.8" cfg-if = "1.0" futures = "0.3" -cloud-util = "=0.8.0-beta.3" -cita_cloud_proto = "=6.7.2-beta.3" +cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs" } +cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs" } -crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true, branch = "v6.7.2-beta.3" } -crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true, branch = "v6.7.2-beta.3" } +crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true } +crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true } [build-dependencies] tonic-build = "0.10" From e01c419628c78f175d3b4f4b57d8aeebf8529581 Mon Sep 17 00:00:00 2001 From: yieazy Date: Tue, 21 Nov 2023 14:27:56 +0800 Subject: [PATCH 17/20] chore: bump to 6.7.2 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 085799d..b25f389 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "controller" -version = "6.7.2-beta.3" +version = "6.7.2" authors = ["Rivtower Technologies "] license = "Apache-2.0" edition = "2021" From 27bdb4a20185b2bd985b17a5e4fad6915a614acf Mon Sep 17 00:00:00 2001 From: JLer Date: Wed, 29 Nov 2023 17:54:06 +0800 Subject: [PATCH 18/20] broadcast txs --- src/core/controller.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/controller.rs b/src/core/controller.rs index 2b00151..aa68747 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -309,7 +309,7 @@ impl Controller { } } } - if broadcast { + if broadcast && !hashes.is_empty() { self.broadcast_send_txs(raw_txs.clone()).await; } // send to storage @@ -1133,7 +1133,7 @@ impl Controller { StatusCodeEnum::DecodeError })?; - self.batch_transactions(body, false).await?; + self.batch_transactions(body, true).await?; } ControllerMsgType::Noop => { From 30df222f91fe7530a76dab4d82a08e43fd7522df Mon Sep 17 00:00:00 2001 From: JLer Date: Fri, 12 Jan 2024 18:26:16 +0800 Subject: [PATCH 19/20] feat: too many requests --- src/core/controller.rs | 83 ++++++++++++++++++++++----------------- src/core/pool.rs | 51 ++++++++++++++++++++---- src/core/state_machine.rs | 5 ++- 3 files changed, 92 insertions(+), 47 deletions(-) diff --git a/src/core/controller.rs b/src/core/controller.rs index aa68747..5b759f1 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -38,7 +38,10 @@ use cloud_util::{ use crate::{ config::ControllerConfig, core::{ - auditor::Auditor, chain::Chain, genesis::GenesisBlock, pool::Pool, + auditor::Auditor, + chain::Chain, + genesis::GenesisBlock, + pool::{Pool, PoolError}, system_config::SystemConfig, }, crypto::{crypto_check_async, crypto_check_batch_async, hash_data}, @@ -248,45 +251,51 @@ impl Controller { let mut pool = self.pool.write().await; pool.insert(raw_tx.clone()) }; - if res { - if broadcast { - let mut f_pool = self.forward_pool.write().await; - f_pool.body.push(raw_tx.clone()); - if f_pool.body.len() > self.config.count_per_batch { - self.broadcast_send_txs(f_pool.clone()).await; - f_pool.body.clear(); + match res { + Ok(_) => { + if broadcast { + let mut f_pool = self.forward_pool.write().await; + f_pool.body.push(raw_tx.clone()); + if f_pool.body.len() > self.config.count_per_batch { + self.broadcast_send_txs(f_pool.clone()).await; + f_pool.body.clear(); + } } - } - // send to storage - if self.config.tx_persistence { - tokio::spawn(async move { - let raw_txs = RawTransactions { body: vec![raw_tx] }; - let mut raw_tx_bytes = Vec::new(); - match raw_txs.encode(&mut raw_tx_bytes) { - Ok(_) => { - if store_data( - Regions::TransactionsPool as u32, - vec![0; 8], - raw_tx_bytes, - ) - .await - .is_success() - .is_err() - { - warn!("store raw tx failed"); + // send to storage + if self.config.tx_persistence { + tokio::spawn(async move { + let raw_txs = RawTransactions { body: vec![raw_tx] }; + let mut raw_tx_bytes = Vec::new(); + match raw_txs.encode(&mut raw_tx_bytes) { + Ok(_) => { + if store_data( + Regions::TransactionsPool as u32, + vec![0; 8], + raw_tx_bytes, + ) + .await + .is_success() + .is_err() + { + warn!("store raw tx failed"); + } } + Err(_) => warn!("encode raw tx failed"), } - Err(_) => warn!("encode raw tx failed"), - } - }); + }); + } + Ok(tx_hash) + } + Err(e) => { + warn!( + "rpc send raw transaction failed: {e:?}. hash: 0x{}", + hex::encode(&tx_hash) + ); + match e { + PoolError::TooManyRequests => Err(StatusCodeEnum::TooManyRequests), + PoolError::DupTransaction => Err(StatusCodeEnum::DupTransaction), + } } - Ok(tx_hash) - } else { - warn!( - "rpc send raw transaction failed: tx already in pool. hash: 0x{}", - hex::encode(&tx_hash) - ); - Err(StatusCodeEnum::DupTransaction) } } @@ -304,7 +313,7 @@ impl Controller { auditor.auditor_check_batch(&raw_txs)?; for raw_tx in raw_txs.body.clone() { let hash = get_tx_hash(&raw_tx)?.to_vec(); - if pool.insert(raw_tx) { + if pool.insert(raw_tx).is_ok() { hashes.push(Hash { hash }) } } diff --git a/src/core/pool.rs b/src/core/pool.rs index aa3f36d..2a07516 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -64,6 +64,15 @@ pub struct Pool { pool_quota: u64, block_limit: u64, quota_limit: u64, + warn_quota: u64, + busy_quota: u64, + in_busy: bool, +} + +#[derive(Debug)] +pub enum PoolError { + DupTransaction, + TooManyRequests, } impl Pool { @@ -73,6 +82,9 @@ impl Pool { pool_quota: 0, block_limit, quota_limit, + warn_quota: quota_limit * 30, + busy_quota: quota_limit * 50, + in_busy: false, } } @@ -104,13 +116,24 @@ impl Pool { ); } - pub fn insert(&mut self, raw_tx: RawTransaction) -> bool { - let tx_quota = get_tx_quota(&raw_tx).unwrap(); - let ret = self.txns.insert(Txn(raw_tx)); - if ret { - self.pool_quota += tx_quota; + pub fn insert(&mut self, raw_tx: RawTransaction) -> Result<(), PoolError> { + if self.in_busy { + Err(PoolError::TooManyRequests) + } else { + let tx_quota = get_tx_quota(&raw_tx).unwrap(); + if self.pool_quota + tx_quota > self.busy_quota { + self.in_busy = true; + Err(PoolError::TooManyRequests) + } else { + let ret = self.txns.insert(Txn(raw_tx)); + if ret { + self.pool_quota += tx_quota; + Ok(()) + } else { + Err(PoolError::DupTransaction) + } + } } - ret } pub fn remove(&mut self, tx_hash_list: &[Vec]) { @@ -124,12 +147,24 @@ impl Pool { } self.txns.remove(tx_hash.as_slice()); } + if self.pool_quota < self.warn_quota { + self.in_busy = false; + } } pub fn package(&mut self, height: u64) -> (Vec>, u64) { let block_limit = self.block_limit; - self.txns - .retain(|txn| tx_is_valid(&txn.0, height, block_limit)); + self.txns.retain(|txn| { + let tx_is_valid = tx_is_valid(&txn.0, height, block_limit); + if !tx_is_valid { + let tx_quota = get_tx_quota(&txn.0).unwrap(); + self.pool_quota -= tx_quota; + } + tx_is_valid + }); + if self.pool_quota < self.warn_quota { + self.in_busy = false; + } let mut quota_limit = self.quota_limit; let mut pack_tx = vec![]; for txn in self.txns.iter().cloned() { diff --git a/src/core/state_machine.rs b/src/core/state_machine.rs index ead90fe..d66600f 100644 --- a/src/core/state_machine.rs +++ b/src/core/state_machine.rs @@ -156,10 +156,11 @@ impl ControllerStateMachine { async fn handle_sync_block(context: &Controller) -> statig::Response { debug!("receive SyncBlock event"); let (_, global_status) = context.get_global_status().await; - match { + let res = { let chain = context.chain.read().await; chain.next_step(&global_status) - } { + }; + match res { ChainStep::SyncStep => Transition(State::syncing()), ChainStep::OnlineStep => Transition(State::participate_in_consensus()), ChainStep::BusyState => Handled, From f1b09305140813f24cee070c462ed5bcb8285064 Mon Sep 17 00:00:00 2001 From: JLer Date: Mon, 15 Jan 2024 13:43:45 +0800 Subject: [PATCH 20/20] feat: tx_poor queue FIFO --- Cargo.toml | 1 + src/core/pool.rs | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b25f389..7f08e69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ flume = "0.11" rayon = "1.8" cfg-if = "1.0" futures = "0.3" +indexmap = "2.1" cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs" } cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs" } diff --git a/src/core/pool.rs b/src/core/pool.rs index 2a07516..61c3afe 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -21,6 +21,7 @@ use std::{ }; use cita_cloud_proto::blockchain::{raw_transaction::Tx, RawTransaction}; +use indexmap::IndexSet; use tokio::sync::RwLock; use crate::{grpc_client::storage::reload_transactions_pool, util::get_tx_quota}; @@ -60,7 +61,7 @@ fn get_raw_tx_hash(raw_tx: &RawTransaction) -> &[u8] { } pub struct Pool { - txns: HashSet, + txns: IndexSet, pool_quota: u64, block_limit: u64, quota_limit: u64, @@ -78,7 +79,7 @@ pub enum PoolError { impl Pool { pub fn new(block_limit: u64, quota_limit: u64) -> Self { Pool { - txns: HashSet::new(), + txns: IndexSet::new(), pool_quota: 0, block_limit, quota_limit,