diff --git a/components/chainhook-cli/src/config/mod.rs b/components/chainhook-cli/src/config/mod.rs index 0f896e305..b22831373 100644 --- a/components/chainhook-cli/src/config/mod.rs +++ b/components/chainhook-cli/src/config/mod.rs @@ -1,7 +1,7 @@ pub mod file; pub mod generator; -use chainhook_sdk::chainhooks::types::{ChainhookStore, PoxConfig}; +use chainhook_sdk::chainhooks::types::PoxConfig; pub use chainhook_sdk::indexer::IndexerConfig; use chainhook_sdk::observer::{ EventObserverConfig, PredicatesConfig, DEFAULT_PAYLOAD_HTTP_REQUEST_ATTEMPTS_INTERVAL_MS, DEFAULT_PAYLOAD_HTTP_REQUEST_ATTEMPTS_MAX, DEFAULT_PAYLOAD_HTTP_REQUEST_CONCURRENCY @@ -119,7 +119,6 @@ impl Config { pub fn get_event_observer_config(&self) -> EventObserverConfig { EventObserverConfig { bitcoin_rpc_proxy_enabled: true, - registered_chainhooks: ChainhookStore::new(), predicates_config: PredicatesConfig { payload_http_request_timeout_ms: self.predicates.payload_http_request_timeout_ms, payload_http_request_concurrency: self.predicates.payload_http_request_concurrency, diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index 2a4a5d80e..99d53f07d 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -1,9 +1,7 @@ use crate::config::{Config, PredicatesApi}; use crate::scan::common::get_block_heights_to_scan; -use crate::service::{ - open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, - set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData, -}; +use crate::storage::predicates_db::{open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, set_predicate_scanning_status, set_unconfirmed_expiration_status}; +use chainhook_sdk::chainhooks::types::{PredicateStatus, ScanningData}; use chainhook_sdk::bitcoincore_rpc::RpcApi; use chainhook_sdk::bitcoincore_rpc::{Auth, Client}; use chainhook_sdk::chainhooks::bitcoin::{ @@ -186,7 +184,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( let res = match process_block_with_predicates( block, - &vec![&predicate_spec], + &vec![&(predicate_spec.clone(), PredicateStatus::New)], &event_observer_config, ctx, ) @@ -262,7 +260,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate( pub async fn process_block_with_predicates( block: BitcoinBlockData, - predicates: &Vec<&BitcoinChainhookInstance>, + predicates: &Vec<&(BitcoinChainhookInstance, PredicateStatus)>, event_observer_config: &EventObserverConfig, ctx: &Context, ) -> Result { diff --git a/components/chainhook-cli/src/scan/common.rs b/components/chainhook-cli/src/scan/common.rs index ee5ddef61..2bcddeaaa 100644 --- a/components/chainhook-cli/src/scan/common.rs +++ b/components/chainhook-cli/src/scan/common.rs @@ -1,4 +1,4 @@ -use crate::service::ScanningData; +use chainhook_sdk::chainhooks::types::ScanningData; use chainhook_sdk::utils::{BlockHeights, BlockHeightsError}; use std::collections::VecDeque; diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index e7f3124de..6b3e19f2c 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -5,19 +5,14 @@ use std::{ sync::{Arc, RwLock}, }; +use chainhook_sdk::chainhooks::types::ScanningData; + use crate::{ archive::download_stacks_dataset_if_required, config::{Config, PredicatesApi}, scan::common::get_block_heights_to_scan, - service::{ - open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, - set_predicate_scanning_status, set_unconfirmed_expiration_status, ScanningData, - }, storage::{ - get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, - get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present, - open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn, - signers::get_signer_db_messages_received_at_block, StacksDbConnections, + get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, get_stacks_block_at_block_height, insert_entry_in_stacks_blocks, is_stacks_block_present, open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn, predicates_db::{open_readwrite_predicates_db_conn_or_panic, set_confirmed_expiration_status, set_predicate_scanning_status, set_unconfirmed_expiration_status}, signers::get_signer_db_messages_received_at_block, StacksDbConnections }, }; use chainhook_sdk::{ diff --git a/components/chainhook-cli/src/scan/tests/mod.rs b/components/chainhook-cli/src/scan/tests/mod.rs index 4afd6fe1e..7fcc37bfe 100644 --- a/components/chainhook-cli/src/scan/tests/mod.rs +++ b/components/chainhook-cli/src/scan/tests/mod.rs @@ -3,8 +3,6 @@ use std::collections::VecDeque; use chainhook_sdk::utils::MAX_BLOCK_HEIGHTS_ENTRIES; use test_case::test_case; -use crate::service::ScanningData; - use super::common::get_block_heights_to_scan; fn expect_exceeded_max_entries_error( @@ -90,7 +88,7 @@ fn get_huge_vec() -> Vec { #[test_case(None, None, Some(3), 2, None, Some(VecDeque::from([0,1,2,3])) => using expect_entries; "end_block > chain_tip, no start_block yields vec from 0 to end")] #[test_case(None, None, Some(2), 3, None, Some(VecDeque::from([0,1,2])) => using expect_entries; "chain_tip > end_block, no yields vec from 0 to end_block")] #[test_case(None, Some(0), Some(MAX_BLOCK_HEIGHTS_ENTRIES + 1), 0, None, None => using expect_exceeded_max_entries_error; "limits max number of entries")] -#[test_case(None, Some(0), Some(3), 0, Some(ScanningData { number_of_blocks_to_scan: 0, number_of_blocks_evaluated: 0, number_of_times_triggered: 0, last_occurrence: None, last_evaluated_block_height: 2}), Some(VecDeque::from([2,3])) => using expect_entries; "uses previous scan data for start_block if available")] +#[test_case(None, Some(0), Some(3), 0, Some(chainhook_sdk::chainhooks::types::ScanningData { number_of_blocks_to_scan: 0, number_of_blocks_evaluated: 0, number_of_times_triggered: 0, last_occurrence: None, last_evaluated_block_height: 2}), Some(VecDeque::from([2,3])) => using expect_entries; "uses previous scan data for start_block if available")] #[test_case(Some(vec![0,1,2]), None, None, 0, None, Some(VecDeque::from([0,1,2])) => using expect_entries; "providing blocks returns the same blocks as vec")] #[test_case(Some(get_huge_vec()), None, None, 0, None, None => using expect_exceeded_max_entries_error; "providing too many blocks errors")] fn test_get_block_heights_to_scan( @@ -98,7 +96,7 @@ fn test_get_block_heights_to_scan( start_block: Option, end_block: Option, chain_tip: u64, - unfinished_scan_data: Option, + unfinished_scan_data: Option, expected: Option>, ) -> (Result>, String>, Option>) { ( diff --git a/components/chainhook-cli/src/service/http_api.rs b/components/chainhook-cli/src/service/http_api.rs index 7aaeb3294..7c715cc8b 100644 --- a/components/chainhook-cli/src/service/http_api.rs +++ b/components/chainhook-cli/src/service/http_api.rs @@ -1,32 +1,39 @@ use std::{ - collections::HashMap, net::{IpAddr, Ipv4Addr}, sync::{mpsc::Sender, Arc, Mutex}, }; use chainhook_sdk::{ - chainhooks::types::{ChainhookInstance, ChainhookSpecificationNetworkMap}, + chainhooks::{ + database::PredicatesDatabaseAccess, + types::{ChainhookInstance, ChainhookSpecificationNetworkMap}, + }, observer::ObserverCommand, + try_error, try_info, try_warn, utils::Context, }; use hiro_system_kit::slog; -use redis::{Commands, Connection}; -use rocket::serde::json::{json, Json, Value as JsonValue}; use rocket::State; use rocket::{ config::{self, Config, LogLevel}, Shutdown, }; +use rocket::{ + http::Status, + response::status::Custom, + serde::json::{json, Json, Value as JsonValue}, +}; use rocket_okapi::{okapi::openapi3::OpenApi, openapi, openapi_get_routes_spec}; use std::error::Error; -use crate::config::PredicatesApiConfig; +use crate::{config::PredicatesApiConfig, storage::predicates_db::RedisPredicatesDatabaseAccess}; -use super::{open_readwrite_predicates_db_conn, PredicateStatus}; +use super::PredicateStatus; pub async fn start_predicate_api_server( api_config: PredicatesApiConfig, observer_commands_tx: Sender, + predicates_database_access: RedisPredicatesDatabaseAccess, ctx: Context, ) -> Result> { let log_level = LogLevel::Off; @@ -57,6 +64,7 @@ pub async fn start_predicate_api_server( let ignite = rocket::custom(control_config) .manage(background_job_tx_mutex) .manage(api_config) + .manage(predicates_database_access) .manage(ctx_cloned) .mount("/", routes) .ignite() @@ -70,147 +78,131 @@ pub async fn start_predicate_api_server( Ok(predicate_api_shutdown) } +fn success_response(result: JsonValue) -> Result, Custom>> { + Ok(Json(json!({ + "status": 200, + "result": result, + }))) +} + +fn error_response( + message: String, + ctx: &State, +) -> Result, Custom>> { + try_error!(ctx, "{message}"); + Err(Custom( + Status::InternalServerError, + Json(json!({ + "status": Status::InternalServerError.code, + "result": message, + })), + )) +} + +fn user_error_response( + message: String, + ctx: &State, +) -> Result, Custom>> { + try_warn!(ctx, "{message}"); + Err(Custom( + Status::UnprocessableEntity, + Json(json!({ + "status": Status::UnprocessableEntity.code, + "result": message, + })), + )) +} + +fn not_found_response() -> Result, Custom>> { + Err(Custom( + Status::NotFound, + Json(json!({ + "status": Status::NotFound.code, + "result": "Not Found", + })), + )) +} + #[openapi(tag = "Health Check")] #[get("/ping")] -fn handle_ping(ctx: &State) -> Json { - ctx.try_log(|logger| slog::info!(logger, "Handling HTTP GET /ping")); - Json(json!({ - "status": 200, - "result": "chainhook service up and running", - })) +fn handle_ping(ctx: &State) -> Result, Custom>> { + try_info!(ctx, "Handling HTTP GET /ping"); + success_response(json!("chainhook service up and running")) } #[openapi(tag = "Managing Predicates")] #[get("/v1/chainhooks", format = "application/json")] fn handle_get_predicates( - api_config: &State, + predicates_database_access: &State, ctx: &State, -) -> Json { - ctx.try_log(|logger| slog::info!(logger, "Handling HTTP GET /v1/chainhooks")); - match open_readwrite_predicates_db_conn(api_config) { - Ok(mut predicates_db_conn) => { - let predicates = match get_entries_from_predicates_db(&mut predicates_db_conn, ctx) { - Ok(predicates) => predicates, - Err(e) => { - ctx.try_log(|logger| slog::warn!(logger, "unable to retrieve predicates: {e}")); - return Json(json!({ - "status": 500, - "message": "unable to retrieve predicates", - })); - } - }; - +) -> Result, Custom>> { + try_info!(ctx, "Handling HTTP GET /v1/chainhooks"); + match predicates_database_access.inner().get_all_predicates(ctx) { + Ok(predicates) => { let serialized_predicates = predicates .iter() .map(|(p, s)| serialized_predicate_with_status(p, s)) - .collect::>(); - - Json(json!({ - "status": 200, - "result": serialized_predicates - })) + .collect::>(); + return success_response(serialized_predicates.into()); } - Err(e) => Json(json!({ - "status": 500, - "message": e, - })), - } + Err(e) => { + return error_response(format!("unable to retrieve predicates: {e}"), ctx); + } + }; } #[openapi(tag = "Managing Predicates")] #[post("/v1/chainhooks", format = "application/json", data = "")] fn handle_create_predicate( predicate: Result, rocket::serde::json::Error>, - api_config: &State, + predicates_database_access: &State, background_job_tx: &State>>>, ctx: &State, -) -> Json { - ctx.try_log(|logger| slog::info!(logger, "Handling HTTP POST /v1/chainhooks")); +) -> Result, Custom>> { + try_info!(ctx, "Handling HTTP POST /v1/chainhooks"); let predicate = match predicate { Err(e) => { - return Json(json!({ - "status": 422, - "error": e.to_string(), - })) + return user_error_response(e.to_string(), ctx); } Ok(predicate) => { let predicate = predicate.into_inner(); if let Err(e) = predicate.validate() { - return Json(json!({ - "status": 422, - "error": e, - })); + return user_error_response(e.to_string(), ctx); } predicate } }; - let predicate_uuid = predicate.get_uuid().to_string(); - - if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) { - if let Ok(Some(_)) = get_entry_from_predicates_db( - &ChainhookInstance::either_stx_or_btc_key(&predicate_uuid), - &mut predicates_db_conn, - ctx, - ) { - return Json(json!({ - "status": 409, - "error": "Predicate uuid already in use", - })) - } + if let Ok(Some(_)) = predicates_database_access + .inner() + .get_predicate(&predicate_uuid, ctx) + { + return user_error_response("Predicate uuid already in use".to_string(), ctx); } let background_job_tx = background_job_tx.inner(); if let Ok(tx) = background_job_tx.lock() { let _ = tx.send(ObserverCommand::RegisterPredicate(predicate)); }; - - Json(json!({ - "status": 200, - "result": predicate_uuid, - })) + success_response(predicate_uuid.into()) } #[openapi(tag = "Managing Predicates")] #[get("/v1/chainhooks/", format = "application/json")] fn handle_get_predicate( predicate_uuid: String, - api_config: &State, + predicates_database_access: &State, ctx: &State, -) -> Json { - ctx.try_log(|logger| { - slog::info!( - logger, - "Handling HTTP GET /v1/chainhooks/{}", - predicate_uuid - ) - }); - - match open_readwrite_predicates_db_conn(api_config) { - Ok(mut predicates_db_conn) => { - let (predicate, status) = match get_entry_from_predicates_db( - &ChainhookInstance::either_stx_or_btc_key(&predicate_uuid), - &mut predicates_db_conn, - ctx, - ) { - Ok(Some(predicate_with_status)) => predicate_with_status, - _ => { - return Json(json!({ - "status": 404, - })) - } - }; - let result = serialized_predicate_with_status(&predicate, &status); - Json(json!({ - "status": 200, - "result": result - })) - } - Err(e) => Json(json!({ - "status": 500, - "message": e, - })), - } +) -> Result, Custom>> { + try_info!(ctx, "Handling HTTP GET /v1/chainhooks/{predicate_uuid}"); + let (predicate, status) = match predicates_database_access + .inner() + .get_predicate(&predicate_uuid, ctx) + { + Ok(Some(predicate_with_status)) => predicate_with_status, + _ => return not_found_response(), + }; + success_response(serialized_predicate_with_status(&predicate, &status)) } #[openapi(tag = "Managing Predicates")] @@ -218,132 +210,49 @@ fn handle_get_predicate( fn handle_delete_stacks_predicate( predicate_uuid: String, background_job_tx: &State>>>, + predicates_database_access: &State, ctx: &State, -) -> Json { - ctx.try_log(|logger| { - slog::info!( - logger, - "Handling HTTP DELETE /v1/chainhooks/stacks/{}", - predicate_uuid - ) - }); - +) -> Result, Custom>> { + try_info!( + ctx, + "Handling HTTP DELETE /v1/chainhooks/stacks/{predicate_uuid}" + ); + if let Ok(Some(_)) = predicates_database_access + .inner() + .get_predicate(&predicate_uuid, ctx) + { + return not_found_response(); + } let background_job_tx = background_job_tx.inner(); if let Ok(tx) = background_job_tx.lock() { let _ = tx.send(ObserverCommand::DeregisterStacksPredicate(predicate_uuid)); }; - - Json(json!({ - "status": 200, - "result": "Ok", - })) + success_response("Ok".into()) } #[openapi(tag = "Managing Predicates")] #[delete("/v1/chainhooks/bitcoin/", format = "application/json")] fn handle_delete_bitcoin_predicate( predicate_uuid: String, + predicates_database_access: &State, background_job_tx: &State>>>, ctx: &State, -) -> Json { - ctx.try_log(|logger| { - slog::info!( - logger, - "Handling HTTP DELETE /v1/chainhooks/bitcoin/{}", - predicate_uuid - ) - }); - +) -> Result, Custom>> { + try_info!( + ctx, + "Handling HTTP DELETE /v1/chainhooks/bitcoin/{predicate_uuid}" + ); + if let Ok(Some(_)) = predicates_database_access + .inner() + .get_predicate(&predicate_uuid, ctx) + { + return not_found_response(); + } let background_job_tx = background_job_tx.inner(); if let Ok(tx) = background_job_tx.lock() { let _ = tx.send(ObserverCommand::DeregisterBitcoinPredicate(predicate_uuid)); }; - - Json(json!({ - "status": 200, - "result": "Ok", - })) -} - -pub fn get_entry_from_predicates_db( - predicate_key: &str, - predicate_db_conn: &mut Connection, - _ctx: &Context, -) -> Result, String> { - let entry: HashMap = predicate_db_conn.hgetall(predicate_key).map_err(|e| { - format!( - "unable to load chainhook associated with key {}: {}", - predicate_key, - e - ) - })?; - - let encoded_spec = match entry.get("specification") { - None => return Ok(None), - Some(payload) => payload, - }; - - let spec = ChainhookInstance::deserialize_specification(encoded_spec)?; - - let encoded_status = match entry.get("status") { - None => Err(format!( - "found predicate specification with no status for predicate {}", - predicate_key - )), - Some(payload) => Ok(payload), - }?; - - let status = serde_json::from_str(encoded_status).map_err(|e| format!("{}", e))?; - - Ok(Some((spec, status))) -} - -pub fn get_entries_from_predicates_db( - predicate_db_conn: &mut Connection, - ctx: &Context, -) -> Result, String> { - let chainhooks_to_load: Vec = predicate_db_conn - .scan_match(ChainhookInstance::either_stx_or_btc_key("*")) - .map_err(|e| format!("unable to connect to redis: {}", e))? - .collect(); - - let mut predicates = vec![]; - for predicate_key in chainhooks_to_load.iter() { - let chainhook = match get_entry_from_predicates_db(predicate_key, predicate_db_conn, ctx) { - Ok(Some((spec, status))) => (spec, status), - Ok(None) => { - warn!( - ctx.expect_logger(), - "unable to load chainhook associated with key {}", predicate_key, - ); - continue; - } - Err(e) => { - error!( - ctx.expect_logger(), - "unable to load chainhook associated with key {}: {}", - predicate_key, - e.to_string() - ); - continue; - } - }; - predicates.push(chainhook); - } - Ok(predicates) -} - -pub fn load_predicates_from_redis( - config: &crate::config::Config, - ctx: &Context, -) -> Result, String> { - let redis_uri: &str = config.expected_api_database_uri(); - let client = redis::Client::open(redis_uri) - .map_err(|e| format!("unable to connect to redis: {}", e))?; - let mut predicate_db_conn = client - .get_connection() - .map_err(|e| format!("unable to connect to redis: {}", e))?; - get_entries_from_predicates_db(&mut predicate_db_conn, ctx) + success_response("Ok".into()) } pub fn document_predicate_api_server() -> Result { diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 5d55f4683..cbdda9e0c 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -1,10 +1,11 @@ pub(crate) mod http_api; mod runloops; -use crate::config::{Config, PredicatesApi, PredicatesApiConfig}; -use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; +use crate::config::{Config, PredicatesApi}; +use crate::service::http_api::start_predicate_api_server; use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; use crate::storage::database_access::StacksDatabaseAccess; +use crate::storage::predicates_db::RedisPredicatesDatabaseAccess; use crate::storage::signers::{initialize_signers_db, store_signer_db_messages}; use crate::storage::{ confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks, @@ -12,26 +13,25 @@ use crate::storage::{ open_readwrite_stacks_db_conn, }; -use chainhook_sdk::chainhooks::types::{ChainhookSpecificationNetworkMap, ChainhookStore}; +use chainhook_sdk::chainhooks::database::PredicatesDatabaseAccess; +use chainhook_sdk::chainhooks::types::{ + ChainhookSpecificationNetworkMap, PredicateStatus, ScanningData, +}; use chainhook_sdk::chainhooks::types::ChainhookInstance; use chainhook_sdk::observer::{ - start_event_observer, HookExpirationData, ObserverCommand, ObserverEvent, - PredicateDeregisteredEvent, PredicateEvaluationReport, PredicateInterruptedData, + start_event_observer, ObserverCommand, ObserverEvent, PredicateDeregisteredEvent, StacksObserverStartupContext, }; use chainhook_sdk::types::{Chain, StacksBlockData, StacksChainEvent}; use chainhook_sdk::utils::Context; use chainhook_sdk::{try_error, try_info}; -use redis::{Commands, Connection}; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use self::http_api::get_entry_from_predicates_db; use self::runloops::{BitcoinScanOp, StacksScanOp}; pub struct Service { @@ -41,7 +41,11 @@ pub struct Service { } impl Service { - pub fn new(config: Config, ctx: Context, stacks_block_processing_flag: Arc) -> Self { + pub fn new( + config: Config, + ctx: Context, + stacks_block_processing_flag: Arc, + ) -> Self { Self { config, ctx, @@ -54,26 +58,35 @@ impl Service { predicates_from_startup: Vec, observer_commands_tx_rx: Option<(Sender, Receiver)>, ) -> Result<(), String> { - let mut chainhook_store = ChainhookStore::new(); + // Create database access for the observer + let stacks_database_access = + StacksDatabaseAccess::new(PathBuf::from(&self.config.storage.working_dir)); + + // Create Redis predicates database access + let redis_uri = match &self.config.http_api { + PredicatesApi::On(api_config) => api_config.database_uri.clone(), + PredicatesApi::Off => panic!("Predicates API is not enabled"), + }; + let predicates_database_access = RedisPredicatesDatabaseAccess::new(redis_uri); // store all predicates from Redis that were in the process of scanning when // chainhook was shutdown - we need to resume where we left off let mut leftover_scans = vec![]; // retrieve predicates from Redis, and register each in memory if self.config.is_http_api_enabled() { - let registered_predicates = match load_predicates_from_redis(&self.config, &self.ctx) { - Ok(predicates) => predicates, - Err(e) => { - error!( - self.ctx.expect_logger(), - "Failed loading predicate from storage: {}", - e.to_string() - ); - vec![] - } - }; + let registered_predicates = + match predicates_database_access.get_all_predicates(&self.ctx) { + Ok(predicates) => predicates, + Err(e) => { + error!( + self.ctx.expect_logger(), + "Failed loading predicate from storage: {}", + e.to_string() + ); + vec![] + } + }; for (predicate, status) in registered_predicates.into_iter() { - let predicate_uuid = predicate.uuid().to_string(); match status { PredicateStatus::Scanning(scanning_data) => { leftover_scans.push((predicate.clone(), Some(scanning_data))); @@ -99,67 +112,24 @@ impl Service { continue; } } - match chainhook_store.register_instance(predicate) { - Ok(_) => { - debug!( - self.ctx.expect_logger(), - "Predicate {} retrieved from storage and registered", predicate_uuid, - ); - } - Err(e) => { - warn!( - self.ctx.expect_logger(), - "Failed to register predicate {} after retrieving from storage: {}", - predicate_uuid, - e.to_string() - ); - } - } } } let mut newly_registered_predicates = vec![]; - // For each predicate found, register in memory. for predicate in predicates_from_startup.into_iter() { - if let PredicatesApi::On(api_config) = &self.config.http_api { - if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) { - let uuid = predicate.get_uuid(); - if let Ok(Some(_)) = get_entry_from_predicates_db( - &ChainhookInstance::either_stx_or_btc_key(uuid), - &mut predicates_db_conn, - &self.ctx, - ) { - warn!( - self.ctx.expect_logger(), - "Predicate uuid already in use: {uuid}", - ); - continue; - } - }; - } - match chainhook_store.register_instance_from_network_map( - ( - &self.config.network.bitcoin_network, - &self.config.network.stacks_network, - ), - predicate, - ) { - Ok(spec) => { - newly_registered_predicates.push(spec.clone()); - debug!( - self.ctx.expect_logger(), - "Predicate {} retrieved from config and loaded", - spec.uuid(), - ); + let spec = match predicate { + ChainhookSpecificationNetworkMap::Stacks(hook) => { + let spec = + hook.into_specification_for_network(&self.config.network.stacks_network)?; + ChainhookInstance::Stacks(spec) } - Err(e) => { - warn!( - self.ctx.expect_logger(), - "Failed to load predicate from config: {}", - e.to_string() - ); + ChainhookSpecificationNetworkMap::Bitcoin(hook) => { + let spec = + hook.into_specification_for_network(&self.config.network.bitcoin_network)?; + ChainhookInstance::Bitcoin(spec) } - } + }; + newly_registered_predicates.push(spec); } initialize_signers_db(&self.config.expected_cache_path(), &self.ctx) @@ -169,8 +139,7 @@ impl Service { observer_commands_tx_rx.unwrap_or(channel()); let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded(); - let mut event_observer_config = self.config.get_event_observer_config(); - event_observer_config.registered_chainhooks = chainhook_store; + let event_observer_config = self.config.get_event_observer_config(); // Stacks scan operation threadpool let (stacks_scan_op_tx, stacks_scan_op_rx) = crossbeam_channel::unbounded(); @@ -217,16 +186,18 @@ impl Service { self.ctx.expect_logger(), "Listening on port {} for chainhook predicate registrations", api_config.http_port ); - let ctx = self.ctx.clone(); - let api_config = api_config.clone(); + let moved_ctx = self.ctx.clone(); + let moved_api_config = api_config.clone(); let moved_observer_command_tx = observer_command_tx.clone(); + let moved_predicates_database_access = predicates_database_access.clone(); // Test and initialize a database connection let res = hiro_system_kit::thread_named("HTTP Predicate API") .spawn(move || { let future = start_predicate_api_server( - api_config, - moved_observer_command_tx.clone(), - ctx.clone(), + moved_api_config, + moved_observer_command_tx, + moved_predicates_database_access, + moved_ctx, ); hiro_system_kit::nestable_block_on(future) }) @@ -285,9 +256,6 @@ impl Service { let observer_event_tx_moved = observer_event_tx.clone(); let moved_observer_command_tx = observer_command_tx.clone(); - // Create database access for the observer - let database_access = - StacksDatabaseAccess::new(PathBuf::from(&self.config.storage.working_dir)); let _ = start_event_observer( event_observer_config.clone(), @@ -297,19 +265,11 @@ impl Service { None, Some(stacks_startup_context), self.stacks_block_processing_flag.clone(), - Some(database_access), + Some(stacks_database_access), + predicates_database_access, self.ctx.clone(), ); - let ctx = self.ctx.clone(); - match self.config.http_api { - PredicatesApi::On(ref api_config) => { - // Test redis connection - open_readwrite_predicates_db_conn(api_config)?; - } - PredicatesApi::Off => {} - }; - for predicate_with_last_scanned_block in leftover_scans { match predicate_with_last_scanned_block { (ChainhookInstance::Stacks(spec), last_scanned_block) => { @@ -344,174 +304,38 @@ impl Service { } }; match event { - ObserverEvent::PredicateRegistered(spec) => { - // If start block specified, use it. - // If no start block specified, depending on the nature the hook, we'd like to retrieve: - // - contract-id - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - update_predicate_spec( - &spec.key(), - &spec, - &mut predicates_db_conn, - &self.ctx, - ); - update_predicate_status( - &spec.key(), - PredicateStatus::New, - &mut predicates_db_conn, - &self.ctx, - ); + ObserverEvent::PredicateRegistered(spec) => match spec { + ChainhookInstance::Stacks(predicate_spec) => { + let _ = stacks_scan_op_tx.send(StacksScanOp::StartScan { + predicate_spec, + unfinished_scan_data: None, + }); } - match spec { - ChainhookInstance::Stacks(predicate_spec) => { - let _ = stacks_scan_op_tx.send(StacksScanOp::StartScan { - predicate_spec, - unfinished_scan_data: None, - }); - } - ChainhookInstance::Bitcoin(predicate_spec) => { - let _ = bitcoin_scan_op_tx.send(BitcoinScanOp::StartScan { - predicate_spec, - unfinished_scan_data: None, - }); - } + ChainhookInstance::Bitcoin(predicate_spec) => { + let _ = bitcoin_scan_op_tx.send(BitcoinScanOp::StartScan { + predicate_spec, + unfinished_scan_data: None, + }); } - } - ObserverEvent::PredicateEnabled(spec) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - update_predicate_spec( - &spec.key(), - &spec, - &mut predicates_db_conn, - &self.ctx, - ); - set_predicate_streaming_status( - StreamingDataType::FinishedScanning, - &spec.key(), - &mut predicates_db_conn, - &ctx, - ); - } - } + }, + ObserverEvent::PredicateEnabled(_) => {} ObserverEvent::PredicateDeregistered(PredicateDeregisteredEvent { predicate_uuid, chain, }) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - - match chain { - Chain::Bitcoin => { - let _ = bitcoin_scan_op_tx - .send(BitcoinScanOp::KillScan(predicate_uuid.clone())); - } - Chain::Stacks => { - let _ = stacks_scan_op_tx - .send(StacksScanOp::KillScan(predicate_uuid.clone())); - } - }; - - let predicate_key = - ChainhookInstance::either_stx_or_btc_key(&predicate_uuid); - let res: Result<(), redis::RedisError> = - predicates_db_conn.del(predicate_key.clone()); - if let Err(e) = res { - warn!( - self.ctx.expect_logger(), - "unable to delete predicate {predicate_key}: {}", - e.to_string() - ); + match chain { + Chain::Bitcoin => { + let _ = bitcoin_scan_op_tx + .send(BitcoinScanOp::KillScan(predicate_uuid.clone())); } - } - } - ObserverEvent::BitcoinChainEvent((chain_update, report)) => { - debug!(self.ctx.expect_logger(), "Bitcoin update not stored"); - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - - match chain_update { - chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithBlocks( - data, - ) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Bitcoin, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireBitcoinPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); - } - } - } - } - chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithReorg( - data, - ) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Bitcoin, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireBitcoinPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); - } - } - } - } + Chain::Stacks => { + let _ = stacks_scan_op_tx + .send(StacksScanOp::KillScan(predicate_uuid.clone())); } - update_status_from_report( - Chain::Bitcoin, - report, - &mut predicates_db_conn, - &ctx, - ); - } + }; } - ObserverEvent::StacksChainEvent((chain_event, report)) => { + ObserverEvent::BitcoinChainEvent(_) => {} + ObserverEvent::StacksChainEvent((chain_event, _)) => { match open_readwrite_stacks_db_conn( &self.config.expected_cache_path(), &self.ctx, @@ -587,99 +411,12 @@ impl Service { continue; } }; - - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - - match &chain_event { - StacksChainEvent::ChainUpdatedWithBlocks(data) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Stacks, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireStacksPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); - } - } - } - } - StacksChainEvent::ChainUpdatedWithReorg(data) => { - for confirmed_block in &data.confirmed_blocks { - if let Some(expired_predicate_uuids) = - expire_predicates_for_block( - &Chain::Stacks, - confirmed_block.block_identifier.index, - &mut predicates_db_conn, - &ctx, - ) - { - for uuid in expired_predicate_uuids.into_iter() { - let _ = observer_command_tx.send( - ObserverCommand::ExpireStacksPredicate( - HookExpirationData { - hook_uuid: uuid, - block_height: confirmed_block - .block_identifier - .index, - }, - ), - ); - } - } - } - } - StacksChainEvent::ChainUpdatedWithMicroblocks(_) - | StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {} - StacksChainEvent::ChainUpdatedWithNonConsensusEvents(_) => { - // TODO(rafaelcr): Expire signer message predicates when appropriate - } - }; - update_status_from_report( - Chain::Stacks, - report, - &mut predicates_db_conn, - &ctx, - ); - }; - // Signal completion by setting block processing flag to false - self.stacks_block_processing_flag.store(false, Ordering::Relaxed); + self.stacks_block_processing_flag + .store(false, Ordering::Relaxed); } - ObserverEvent::PredicateInterrupted(PredicateInterruptedData { - predicate_key, - error, - }) => { - if let PredicatesApi::On(ref config) = self.config.http_api { - let Ok(mut predicates_db_conn) = - open_readwrite_predicates_db_conn_verbose(config, &ctx) - else { - continue; - }; - set_predicate_interrupted_status( - error, - &predicate_key, - &mut predicates_db_conn, - &ctx, - ); - } + ObserverEvent::PredicateInterrupted(_) => { + // Stop sync? } ObserverEvent::Terminate => { info!( @@ -702,604 +439,5 @@ impl Service { } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "type", content = "info")] -/// A high-level view of how `PredicateStatus` is used/updated can be seen here: docs/images/predicate-status-flowchart/PredicateStatusFlowchart.png. -pub enum PredicateStatus { - Scanning(ScanningData), - Streaming(StreamingData), - UnconfirmedExpiration(ExpiredData), - ConfirmedExpiration(ExpiredData), - Interrupted(String), - New, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] -pub struct ScanningData { - pub number_of_blocks_to_scan: u64, - pub number_of_blocks_evaluated: u64, - pub number_of_times_triggered: u64, - pub last_occurrence: Option, - pub last_evaluated_block_height: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct StreamingData { - pub last_occurrence: Option, - pub last_evaluation: u64, - pub number_of_times_triggered: u64, - pub number_of_blocks_evaluated: u64, - pub last_evaluated_block_height: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct ExpiredData { - pub number_of_blocks_evaluated: u64, - pub number_of_times_triggered: u64, - pub last_occurrence: Option, - pub last_evaluated_block_height: u64, - pub expired_at_block_height: u64, -} - -fn update_status_from_report( - chain: Chain, - report: PredicateEvaluationReport, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - for (predicate_uuid, blocks_ids) in report.predicates_triggered.iter() { - if let Some(last_triggered_height) = blocks_ids.last().map(|b| b.index) { - let triggered_count = blocks_ids.len().try_into().unwrap_or(0); - set_predicate_streaming_status( - StreamingDataType::Occurrence { - last_triggered_height, - triggered_count, - }, - &(ChainhookInstance::either_stx_or_btc_key(predicate_uuid)), - predicates_db_conn, - ctx, - ); - } - } - - for (predicate_uuid, blocks_ids) in report.predicates_evaluated.iter() { - // clone so we don't actually update the report - let mut blocks_ids = blocks_ids.clone(); - // any triggered or expired predicate was also evaluated. But we already updated the status for that block, - // so remove those matching blocks from the list of evaluated predicates - if let Some(triggered_block_ids) = report.predicates_triggered.get(predicate_uuid) { - for triggered_id in triggered_block_ids { - blocks_ids.remove(triggered_id); - } - } - if let Some(expired_block_ids) = report.predicates_expired.get(predicate_uuid) { - for expired_id in expired_block_ids { - blocks_ids.remove(expired_id); - } - } - if let Some(last_evaluated_height) = blocks_ids.last().map(|b| b.index) { - let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); - set_predicate_streaming_status( - StreamingDataType::Evaluation { - last_evaluated_height, - evaluated_count, - }, - &(ChainhookInstance::either_stx_or_btc_key(predicate_uuid)), - predicates_db_conn, - ctx, - ); - } - } - for (predicate_uuid, blocks_ids) in report.predicates_expired.iter() { - if let Some(last_evaluated_height) = blocks_ids.last().map(|b| b.index) { - let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); - set_unconfirmed_expiration_status( - &chain, - evaluated_count, - last_evaluated_height, - &(ChainhookInstance::either_stx_or_btc_key(predicate_uuid)), - predicates_db_conn, - ctx, - ); - } - } -} - -fn set_predicate_interrupted_status( - error: String, - predicate_key: &str, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let status = PredicateStatus::Interrupted(error); - update_predicate_status(predicate_key, status, predicates_db_conn, ctx); -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum StreamingDataType { - Occurrence { - last_triggered_height: u64, - triggered_count: u64, - }, - Evaluation { - last_evaluated_height: u64, - evaluated_count: u64, - }, - FinishedScanning, -} - -/// Updates a predicate's status to `Streaming` if `Scanning` is complete. -/// -/// If `StreamingStatusType` is `Occurrence`, sets the `last_occurrence` & `last_evaluation` fields to the current time. -/// -/// If `StreamingStatusType` is `Evaluation`, sets the `last_evaluation` field to the current time while leaving the `last_occurrence` field as it was. -fn set_predicate_streaming_status( - streaming_data_type: StreamingDataType, - predicate_key: &str, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let now_secs = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Could not get current time in ms") - .as_secs(); - let ( - last_occurrence, - number_of_blocks_evaluated, - number_of_times_triggered, - last_evaluated_block_height, - ) = { - let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); - match current_status { - Some(status) => match status { - PredicateStatus::Streaming(StreamingData { - last_occurrence, - number_of_blocks_evaluated, - number_of_times_triggered, - last_evaluated_block_height, - last_evaluation: _, - }) => ( - last_occurrence, - number_of_blocks_evaluated, - number_of_times_triggered, - last_evaluated_block_height, - ), - PredicateStatus::Scanning(ScanningData { - number_of_blocks_to_scan: _, - number_of_blocks_evaluated, - number_of_times_triggered, - last_evaluated_block_height, - last_occurrence, - }) => ( - last_occurrence, - number_of_blocks_evaluated, - number_of_times_triggered, - last_evaluated_block_height, - ), - PredicateStatus::UnconfirmedExpiration(ExpiredData { - number_of_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - last_evaluated_block_height, - expired_at_block_height: _, - }) => ( - last_occurrence, - number_of_blocks_evaluated, - number_of_times_triggered, - last_evaluated_block_height, - ), - PredicateStatus::New => (None, 0, 0, 0), - PredicateStatus::Interrupted(_) | PredicateStatus::ConfirmedExpiration(_) => { - warn!(ctx.expect_logger(), "Attempting to set Streaming status when previous status was {:?} for predicate {}", status, predicate_key); - return; - } - }, - None => (None, 0, 0, 0), - } - }; - let ( - last_occurrence, - number_of_times_triggered, - number_of_blocks_evaluated, - last_evaluated_block_height, - ) = match streaming_data_type { - StreamingDataType::Occurrence { - last_triggered_height, - triggered_count, - } => ( - Some(now_secs), - number_of_times_triggered + triggered_count, - number_of_blocks_evaluated + triggered_count, - last_triggered_height, - ), - StreamingDataType::Evaluation { - last_evaluated_height, - evaluated_count, - } => ( - last_occurrence, - number_of_times_triggered, - number_of_blocks_evaluated + evaluated_count, - last_evaluated_height, - ), - StreamingDataType::FinishedScanning => ( - last_occurrence, - number_of_times_triggered, - number_of_blocks_evaluated, - last_evaluated_block_height, - ), - }; - - update_predicate_status( - predicate_key, - PredicateStatus::Streaming(StreamingData { - last_occurrence, - last_evaluation: now_secs, - number_of_times_triggered, - last_evaluated_block_height, - number_of_blocks_evaluated, - }), - predicates_db_conn, - ctx, - ); -} - -/// Updates a predicate's status to `Scanning`. -/// -/// Sets the `last_occurrence` time to the current time if a new trigger has occurred since the last status update. -pub fn set_predicate_scanning_status( - predicate_key: &str, - number_of_blocks_to_scan: u64, - number_of_blocks_evaluated: u64, - number_of_times_triggered: u64, - current_block_height: u64, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let now_secs = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Could not get current time in ms") - .as_secs(); - let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); - let last_occurrence = match current_status { - Some(status) => match status { - PredicateStatus::Scanning(scanning_data) => { - if number_of_times_triggered > scanning_data.number_of_times_triggered { - Some(now_secs) - } else { - scanning_data.last_occurrence - } - } - PredicateStatus::Streaming(streaming_data) => { - if number_of_times_triggered > streaming_data.number_of_times_triggered { - Some(now_secs) - } else { - streaming_data.last_occurrence - } - } - PredicateStatus::UnconfirmedExpiration(expired_data) => { - if number_of_times_triggered > expired_data.number_of_times_triggered { - Some(now_secs) - } else { - expired_data.last_occurrence - } - } - PredicateStatus::New => { - if number_of_times_triggered > 0 { - Some(now_secs) - } else { - None - } - } - PredicateStatus::ConfirmedExpiration(_) | PredicateStatus::Interrupted(_) => { - warn!(ctx.expect_logger(), "Attempting to set Scanning status when previous status was {:?} for predicate {}", status, predicate_key); - return; - } - }, - None => None, - }; - - update_predicate_status( - predicate_key, - PredicateStatus::Scanning(ScanningData { - number_of_blocks_to_scan, - number_of_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - last_evaluated_block_height: current_block_height, - }), - predicates_db_conn, - ctx, - ); -} - -/// Updates a predicate's status to `UnconfirmedExpiration`. -pub fn set_unconfirmed_expiration_status( - chain: &Chain, - number_of_new_blocks_evaluated: u64, - last_evaluated_block_height: u64, - predicate_key: &str, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); - let mut previously_was_unconfirmed = false; - let ( - number_of_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - expired_at_block_height, - ) = match current_status { - Some(status) => match status { - PredicateStatus::Scanning(ScanningData { - number_of_blocks_to_scan: _, - number_of_blocks_evaluated: _, - number_of_times_triggered, - last_occurrence, - last_evaluated_block_height, - }) => ( - number_of_new_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - last_evaluated_block_height, - ), - PredicateStatus::New => (0, 0, None, 0), - PredicateStatus::Streaming(StreamingData { - last_occurrence, - last_evaluation: _, - number_of_times_triggered, - number_of_blocks_evaluated, - last_evaluated_block_height, - }) => ( - number_of_blocks_evaluated + number_of_new_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - last_evaluated_block_height, - ), - PredicateStatus::UnconfirmedExpiration(ExpiredData { - number_of_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - last_evaluated_block_height: _, - expired_at_block_height, - }) => { - previously_was_unconfirmed = true; - ( - number_of_blocks_evaluated + number_of_new_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - expired_at_block_height, - ) - } - PredicateStatus::ConfirmedExpiration(_) | PredicateStatus::Interrupted(_) => { - warn!(ctx.expect_logger(), "Attempting to set UnconfirmedExpiration status when previous status was {:?} for predicate {}", status, predicate_key); - return; - } - }, - None => (0, 0, None, 0), - }; - update_predicate_status( - predicate_key, - PredicateStatus::UnconfirmedExpiration(ExpiredData { - number_of_blocks_evaluated, - number_of_times_triggered, - last_occurrence, - last_evaluated_block_height, - expired_at_block_height, - }), - predicates_db_conn, - ctx, - ); - // don't insert this entry more than once - if !previously_was_unconfirmed { - insert_predicate_expiration( - chain, - expired_at_block_height, - predicate_key, - predicates_db_conn, - ctx, - ); - } -} - -pub fn set_confirmed_expiration_status( - predicate_key: &str, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); - let expired_data = match current_status { - Some(status) => match status { - PredicateStatus::UnconfirmedExpiration(expired_data) => expired_data, - PredicateStatus::ConfirmedExpiration(_) - | PredicateStatus::Interrupted(_) - | PredicateStatus::New - | PredicateStatus::Scanning(_) - | PredicateStatus::Streaming(_) => { - warn!(ctx.expect_logger(), "Attempting to set ConfirmedExpiration status when previous status was {:?} for predicate {}", status, predicate_key); - return; - } - }, - None => { - // None means the predicate was deleted, so we can just ignore this predicate expiring - return; - } - }; - update_predicate_status( - predicate_key, - PredicateStatus::ConfirmedExpiration(expired_data), - predicates_db_conn, - ctx, - ); -} - -fn get_predicate_expiration_key(chain: &Chain, block_height: u64) -> String { - match chain { - Chain::Bitcoin => format!("expires_at:bitcoin_block:{}", block_height), - Chain::Stacks => format!("expires_at:stacks_block:{}", block_height), - } -} -fn expire_predicates_for_block( - chain: &Chain, - confirmed_block_index: u64, - predicates_db_conn: &mut Connection, - ctx: &Context, -) -> Option> { - match get_predicates_expiring_at_block(chain, confirmed_block_index, predicates_db_conn, ctx) { - Some(predicates_to_expire) => { - for predicate_key in predicates_to_expire.iter() { - set_confirmed_expiration_status(predicate_key, predicates_db_conn, ctx); - } - Some(predicates_to_expire) - } - None => None, - } -} - -fn insert_predicate_expiration( - chain: &Chain, - expired_at_block_height: u64, - predicate_key: &str, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let key = get_predicate_expiration_key(chain, expired_at_block_height); - let mut predicates_expiring_at_block = - get_predicates_expiring_at_block(chain, expired_at_block_height, predicates_db_conn, ctx) - .unwrap_or_default(); - predicates_expiring_at_block.push(predicate_key.to_owned()); - let serialized_expiring_predicates = json!(predicates_expiring_at_block).to_string(); - if let Err(e) = - predicates_db_conn.hset::<_, _, _, ()>(&key, "predicates", &serialized_expiring_predicates) - { - warn!( - ctx.expect_logger(), - "Error updating expired predicates index: {}", - e.to_string() - ); - } else { - debug!( - ctx.expect_logger(), - "Updating expired predicates at block height {expired_at_block_height} with predicate: {predicate_key}" - ); - } -} - -fn get_predicates_expiring_at_block( - chain: &Chain, - block_index: u64, - predicates_db_conn: &mut Connection, - ctx: &Context, -) -> Option> { - let key = get_predicate_expiration_key(chain, block_index); - match predicates_db_conn.hget::<_, _, String>(key.to_string(), "predicates") { - Ok(ref payload) => match serde_json::from_str(payload) { - Ok(data) => { - if let Err(e) = predicates_db_conn.hdel::<_, _, u64>(key.to_string(), "predicates") - { - warn!( - ctx.expect_logger(), - "Error removing expired predicates index: {}", - e.to_string() - ); - } - Some(data) - } - Err(_) => None, - }, - Err(_) => None, - } -} - -pub fn update_predicate_status( - predicate_key: &str, - status: PredicateStatus, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let serialized_status = json!(status).to_string(); - if let Err(e) = - predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "status", &serialized_status) - { - warn!( - ctx.expect_logger(), - "Error updating status for {}: {}", - predicate_key, - e.to_string() - ); - } else { - debug!( - ctx.expect_logger(), - "Updating predicate {predicate_key} status: {serialized_status}" - ); - } -} - -fn update_predicate_spec( - predicate_key: &str, - spec: &ChainhookInstance, - predicates_db_conn: &mut Connection, - ctx: &Context, -) { - let serialized_spec = json!(spec).to_string(); - if let Err(e) = - predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "specification", &serialized_spec) - { - warn!( - ctx.expect_logger(), - "Error updating status for {}: {}", - predicate_key, - e.to_string() - ); - } else { - debug!( - ctx.expect_logger(), - "Updating predicate {predicate_key} with spec: {serialized_spec}" - ); - } -} - -fn retrieve_predicate_status( - predicate_key: &str, - predicates_db_conn: &mut Connection, -) -> Option { - match predicates_db_conn.hget::<_, _, String>(predicate_key.to_string(), "status") { - Ok(ref payload) => match serde_json::from_str(payload) { - Ok(data) => Some(data), - Err(_) => None, - }, - Err(_) => None, - } -} - -pub fn open_readwrite_predicates_db_conn( - config: &PredicatesApiConfig, -) -> Result { - let redis_uri = &config.database_uri; - let client = redis::Client::open(redis_uri.clone()).unwrap(); - client - .get_connection() - .map_err(|e| format!("unable to connect to db: {}", e)) -} - -pub fn open_readwrite_predicates_db_conn_verbose( - config: &PredicatesApiConfig, - ctx: &Context, -) -> Result { - let res = open_readwrite_predicates_db_conn(config); - if let Err(ref e) = res { - error!(ctx.expect_logger(), "{}", e.to_string()); - } - res -} - -// todo: evaluate expects -pub fn open_readwrite_predicates_db_conn_or_panic( - config: &PredicatesApiConfig, - ctx: &Context, -) -> Connection { - open_readwrite_predicates_db_conn_verbose(config, ctx).expect("unable to open redis conn") -} - #[cfg(test)] pub mod tests; diff --git a/components/chainhook-cli/src/service/runloops.rs b/components/chainhook-cli/src/service/runloops.rs index c177b5636..6c1e526b9 100644 --- a/components/chainhook-cli/src/service/runloops.rs +++ b/components/chainhook-cli/src/service/runloops.rs @@ -19,10 +19,10 @@ use crate::{ bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate, common::PredicateScanResult, stacks::scan_stacks_chainstate_via_rocksdb_using_predicate, }, - service::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, storage::StacksDbConnections, + storage::{predicates_db::{open_readwrite_predicates_db_conn_or_panic, set_predicate_interrupted_status}, StacksDbConnections}, }; -use super::ScanningData; +use chainhook_sdk::chainhooks::types::ScanningData; pub enum StacksScanOp { StartScan { diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs index 4fa0b51a6..3536043e0 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs @@ -3,10 +3,8 @@ use crate::config::{ PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI, }; use crate::scan::stacks::import_stacks_chainstate_from_remote_tsv; -use crate::service::{ - http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status, - PredicateStatus, Service, -}; +use crate::service::{http_api::start_predicate_api_server, PredicateStatus, Service}; +use crate::storage::predicates_db::{update_predicate_spec, update_predicate_status, RedisPredicatesDatabaseAccess}; use chainhook_sdk::chainhooks::types::PoxConfig; use chainhook_sdk::observer::PredicatesConfig; use chainhook_sdk::{ @@ -23,11 +21,11 @@ use rocket::serde::json::Value as JsonValue; use rocket::Shutdown; use std::path::PathBuf; use std::sync::atomic::AtomicBool; -use std::sync::Arc; use std::sync::mpsc; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; use std::sync::mpsc::Sender; +use std::sync::Arc; use super::get_free_port; use super::mock_bitcoin_rpc::mock_bitcoin_rpc; @@ -210,9 +208,14 @@ pub async fn build_predicate_api_server(port: u16) -> (Receiver }; let (tx, rx) = channel(); - let shutdown = start_predicate_api_server(api_config, tx, ctx) - .await - .unwrap(); + let shutdown = start_predicate_api_server( + api_config, + tx, + RedisPredicatesDatabaseAccess::new(DEFAULT_REDIS_URI.to_string()), + ctx, + ) + .await + .unwrap(); // Loop to check if the server is ready let mut attempts = 0; diff --git a/components/chainhook-cli/src/service/tests/mod.rs b/components/chainhook-cli/src/service/tests/mod.rs index 2321499af..c247ab1b8 100644 --- a/components/chainhook-cli/src/service/tests/mod.rs +++ b/components/chainhook-cli/src/service/tests/mod.rs @@ -1,4 +1,5 @@ use chainhook_sdk::chainhooks::types::ChainhookSpecificationNetworkMap; +use chainhook_sdk::chainhooks::types::StreamingData; use chainhook_sdk::types::Chain; use chainhook_sdk::utils::Context; use rocket::serde::json::Value as JsonValue; @@ -26,7 +27,7 @@ use crate::service::tests::helpers::mock_service::{ setup_stacks_chainhook_test, TestSetupResult, }; use crate::service::tests::helpers::mock_stacks_node::create_burn_fork_at; -use crate::service::{PredicateStatus, PredicateStatus::*, ScanningData, StreamingData}; +use crate::service::{PredicateStatus, PredicateStatus::*, ScanningData}; use crate::storage::{get_all_unconfirmed_blocks, open_readonly_stacks_db_conn}; use super::http_api::document_predicate_api_server; diff --git a/components/chainhook-cli/src/service/tests/observer_tests.rs b/components/chainhook-cli/src/service/tests/observer_tests.rs index 6245a8a82..3717a8c37 100644 --- a/components/chainhook-cli/src/service/tests/observer_tests.rs +++ b/components/chainhook-cli/src/service/tests/observer_tests.rs @@ -1,7 +1,10 @@ -use std::{sync::{atomic::AtomicBool, mpsc::channel, Arc}, thread::sleep, time::Duration}; +use std::{ + sync::{atomic::AtomicBool, mpsc::channel, Arc}, + thread::sleep, + time::Duration, +}; use chainhook_sdk::{ - chainhooks::types::ChainhookStore, observer::{start_event_observer, EventObserverConfig, PredicatesConfig}, types::{BitcoinNetwork, StacksNodeConfig}, utils::Context, @@ -10,17 +13,22 @@ use reqwest::Method; use serde_json::Value; use test_case::test_case; -use crate::{service::tests::{ - cleanup, cleanup_err, - helpers::{ - build_predicates::build_stacks_payload, - mock_service::{ - call_observer_svc, call_ping, call_prometheus, call_register_predicate, flush_redis, - TestSetupResult, +use crate::{ + service::tests::{ + cleanup, cleanup_err, + helpers::{ + build_predicates::build_stacks_payload, + mock_service::{ + call_observer_svc, call_ping, call_prometheus, call_register_predicate, + flush_redis, TestSetupResult, + }, }, + setup_bitcoin_chainhook_test, setup_stacks_chainhook_test, }, - setup_bitcoin_chainhook_test, setup_stacks_chainhook_test, -}, storage::database_access::StacksDatabaseAccess}; + storage::{ + database_access::StacksDatabaseAccess, predicates_db::RedisPredicatesDatabaseAccess, + }, +}; use super::helpers::{ build_predicates::get_random_uuid, get_free_port, mock_stacks_node::create_tmp_working_dir, @@ -158,7 +166,7 @@ async fn start_and_ping_event_observer(config: EventObserverConfig, ingestion_po logger: Some(logger), tracer: false, }; - start_event_observer::( + start_event_observer::( config, observer_commands_tx, observer_commands_rx, @@ -167,6 +175,7 @@ async fn start_and_ping_event_observer(config: EventObserverConfig, ingestion_po None, Arc::new(AtomicBool::new(false)), None, + RedisPredicatesDatabaseAccess::new("localhost:6379".to_string()), ctx, ) .unwrap(); @@ -187,7 +196,6 @@ async fn it_responds_200_for_unimplemented_endpoints( panic!("test failed with error: {e}"); }); let config = EventObserverConfig { - registered_chainhooks: ChainhookStore::new(), predicates_config: PredicatesConfig::default(), bitcoin_rpc_proxy_enabled: false, bitcoind_rpc_username: String::new(), diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index f86143f63..253f37831 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -1,4 +1,5 @@ pub mod database_access; +pub mod predicates_db; pub mod signers; pub mod sqlite; diff --git a/components/chainhook-cli/src/storage/predicates_db.rs b/components/chainhook-cli/src/storage/predicates_db.rs new file mode 100644 index 000000000..404f41eb1 --- /dev/null +++ b/components/chainhook-cli/src/storage/predicates_db.rs @@ -0,0 +1,807 @@ +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +use chainhook_sdk::chainhooks::database::PredicatesDatabaseAccess; +use chainhook_sdk::chainhooks::stacks::StacksChainhookInstance; +use chainhook_sdk::chainhooks::types::{ExpiredData, PredicateStatus, ScanningData, StreamingData}; +use chainhook_sdk::chainhooks::{bitcoin::BitcoinChainhookInstance, types::ChainhookInstance}; +use chainhook_sdk::observer::PredicateEvaluationReport; +use chainhook_sdk::types::Chain; +use chainhook_sdk::utils::Context; +use redis::{Commands, Connection}; + +use crate::config::PredicatesApiConfig; + +#[derive(Clone)] +pub struct RedisPredicatesDatabaseAccess { + redis_uri: String, +} + +impl RedisPredicatesDatabaseAccess { + pub fn new(redis_uri: String) -> Self { + Self { redis_uri } + } + + fn get_predicate_db_conn(&self) -> Result { + let client = redis::Client::open(self.redis_uri.as_str()) + .map_err(|e| format!("unable to connect to redis: {}", e))?; + let conn = client + .get_connection() + .map_err(|e| format!("unable to connect to redis: {}", e))?; + Ok(conn) + } +} + +impl PredicatesDatabaseAccess for RedisPredicatesDatabaseAccess { + fn get_predicate(&self, uuid: &String, ctx: &Context) -> Result, String> { + let mut conn = self.get_predicate_db_conn()?; + let predicate_key = ChainhookInstance::either_stx_or_btc_key(uuid); + get_entry_from_predicates_db(&predicate_key, &mut conn, ctx) + } + + fn insert_predicate(&self, predicate: ChainhookInstance, ctx: &Context) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + + update_predicate_spec(&predicate.key(), &predicate, &mut conn, ctx); + update_predicate_status(&predicate.key(), PredicateStatus::New, &mut conn, ctx); + + Ok(()) + } + + fn enable_predicate(&self, predicate: ChainhookInstance, ctx: &Context) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + + update_predicate_spec(&predicate.key(), &predicate, &mut conn, ctx); + set_predicate_streaming_status( + StreamingDataType::FinishedScanning, + &predicate.key(), + &mut conn, + ctx, + ); + + Ok(()) + } + + fn delete_predicate(&self, uuid: &String, _ctx: &Context) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + let predicate_key = ChainhookInstance::either_stx_or_btc_key(uuid); + conn.del::<_, ()>(predicate_key.clone()) + .map_err(|e| format!("unable to delete predicate: {e}"))?; + Ok(()) + } + + fn expire_stacks_predicates_for_block( + &self, + block_height: u64, + ctx: &Context, + ) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + expire_predicates_for_block(&Chain::Stacks, block_height, &mut conn, ctx); + Ok(()) + } + + fn expire_bitcoin_predicates_for_block( + &self, + block_height: u64, + ctx: &Context, + ) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + expire_predicates_for_block(&Chain::Bitcoin, block_height, &mut conn, ctx); + Ok(()) + } + + fn interrupt_predicate( + &self, + uuid: &String, + error: String, + ctx: &Context, + ) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + let predicate_key = ChainhookInstance::either_stx_or_btc_key(uuid); + set_predicate_interrupted_status(error, &predicate_key, &mut conn, ctx); + Ok(()) + } + + fn update_stacks_predicates_from_report(&self, report: PredicateEvaluationReport, ctx: &Context) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + update_status_from_report(Chain::Stacks, report, &mut conn, ctx); + Ok(()) + } + + fn update_bitcoin_predicates_from_report(&self, report: PredicateEvaluationReport, ctx: &Context) -> Result<(), String> { + let mut conn = self.get_predicate_db_conn()?; + update_status_from_report(Chain::Bitcoin, report, &mut conn, ctx); + Ok(()) + } + + fn get_all_predicates(&self, ctx: &Context) -> Result, String> { + let mut conn = self.get_predicate_db_conn()?; + let entries = get_entries_from_predicates_db(&mut conn, ctx)?; + Ok(entries) + } + + fn get_active_stacks_predicates( + &self, + ctx: &Context, + ) -> Result, String> { + let mut conn = self.get_predicate_db_conn()?; + let entries = get_entries_from_predicates_db(&mut conn, ctx)?; + let stacks_predicates = entries + .into_iter() + .filter_map(|(spec, status)| match spec { + ChainhookInstance::Stacks(stacks_spec) => { + if stacks_spec.enabled + && !matches!( + status, + PredicateStatus::ConfirmedExpiration(_) + | PredicateStatus::UnconfirmedExpiration(_) + | PredicateStatus::Interrupted(_) + ) + { + Some((stacks_spec, status)) + } else { + None + } + } + _ => None, + }) + .collect(); + Ok(stacks_predicates) + } + + fn get_active_bitcoin_predicates( + &self, + ctx: &Context, + ) -> Result, String> { + let mut conn = self.get_predicate_db_conn()?; + let entries = get_entries_from_predicates_db(&mut conn, ctx)?; + let bitcoin_predicates = entries + .into_iter() + .filter_map(|(spec, status)| match spec { + ChainhookInstance::Bitcoin(bitcoin_spec) => { + if bitcoin_spec.enabled + && !matches!( + status, + PredicateStatus::ConfirmedExpiration(_) + | PredicateStatus::UnconfirmedExpiration(_) + | PredicateStatus::Interrupted(_) + ) + { + Some((bitcoin_spec, status)) + } else { + None + } + } + _ => None, + }) + .collect(); + Ok(bitcoin_predicates) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +enum StreamingDataType { + Occurrence { + last_triggered_height: u64, + triggered_count: u64, + }, + Evaluation { + last_evaluated_height: u64, + evaluated_count: u64, + }, + FinishedScanning, +} + +/// Updates a predicate's status to `Streaming` if `Scanning` is complete. +/// +/// If `StreamingStatusType` is `Occurrence`, sets the `last_occurrence` & `last_evaluation` fields to the current time. +/// +/// If `StreamingStatusType` is `Evaluation`, sets the `last_evaluation` field to the current time while leaving the `last_occurrence` field as it was. +fn set_predicate_streaming_status( + streaming_data_type: StreamingDataType, + predicate_key: &str, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let now_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Could not get current time in ms") + .as_secs(); + let ( + last_occurrence, + number_of_blocks_evaluated, + number_of_times_triggered, + last_evaluated_block_height, + ) = { + let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); + match current_status { + Some(status) => match status { + PredicateStatus::Streaming(StreamingData { + last_occurrence, + number_of_blocks_evaluated, + number_of_times_triggered, + last_evaluated_block_height, + last_evaluation: _, + }) => ( + last_occurrence, + number_of_blocks_evaluated, + number_of_times_triggered, + last_evaluated_block_height, + ), + PredicateStatus::Scanning(ScanningData { + number_of_blocks_to_scan: _, + number_of_blocks_evaluated, + number_of_times_triggered, + last_evaluated_block_height, + last_occurrence, + }) => ( + last_occurrence, + number_of_blocks_evaluated, + number_of_times_triggered, + last_evaluated_block_height, + ), + PredicateStatus::UnconfirmedExpiration(ExpiredData { + number_of_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + last_evaluated_block_height, + expired_at_block_height: _, + }) => ( + last_occurrence, + number_of_blocks_evaluated, + number_of_times_triggered, + last_evaluated_block_height, + ), + PredicateStatus::New => (None, 0, 0, 0), + PredicateStatus::Interrupted(_) | PredicateStatus::ConfirmedExpiration(_) => { + warn!(ctx.expect_logger(), "Attempting to set Streaming status when previous status was {:?} for predicate {}", status, predicate_key); + return; + } + }, + None => (None, 0, 0, 0), + } + }; + let ( + last_occurrence, + number_of_times_triggered, + number_of_blocks_evaluated, + last_evaluated_block_height, + ) = match streaming_data_type { + StreamingDataType::Occurrence { + last_triggered_height, + triggered_count, + } => ( + Some(now_secs), + number_of_times_triggered + triggered_count, + number_of_blocks_evaluated + triggered_count, + last_triggered_height, + ), + StreamingDataType::Evaluation { + last_evaluated_height, + evaluated_count, + } => ( + last_occurrence, + number_of_times_triggered, + number_of_blocks_evaluated + evaluated_count, + last_evaluated_height, + ), + StreamingDataType::FinishedScanning => ( + last_occurrence, + number_of_times_triggered, + number_of_blocks_evaluated, + last_evaluated_block_height, + ), + }; + + update_predicate_status( + predicate_key, + PredicateStatus::Streaming(StreamingData { + last_occurrence, + last_evaluation: now_secs, + number_of_times_triggered, + last_evaluated_block_height, + number_of_blocks_evaluated, + }), + predicates_db_conn, + ctx, + ); +} + +fn update_status_from_report( + chain: Chain, + report: PredicateEvaluationReport, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + for (predicate_uuid, blocks_ids) in report.predicates_triggered.iter() { + if let Some(last_triggered_height) = blocks_ids.last().map(|b| b.index) { + let triggered_count = blocks_ids.len().try_into().unwrap_or(0); + set_predicate_streaming_status( + StreamingDataType::Occurrence { + last_triggered_height, + triggered_count, + }, + &(ChainhookInstance::either_stx_or_btc_key(predicate_uuid)), + predicates_db_conn, + ctx, + ); + } + } + + for (predicate_uuid, blocks_ids) in report.predicates_evaluated.iter() { + // clone so we don't actually update the report + let mut blocks_ids = blocks_ids.clone(); + // any triggered or expired predicate was also evaluated. But we already updated the status for that block, + // so remove those matching blocks from the list of evaluated predicates + if let Some(triggered_block_ids) = report.predicates_triggered.get(predicate_uuid) { + for triggered_id in triggered_block_ids { + blocks_ids.remove(triggered_id); + } + } + if let Some(expired_block_ids) = report.predicates_expired.get(predicate_uuid) { + for expired_id in expired_block_ids { + blocks_ids.remove(expired_id); + } + } + if let Some(last_evaluated_height) = blocks_ids.last().map(|b| b.index) { + let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); + set_predicate_streaming_status( + StreamingDataType::Evaluation { + last_evaluated_height, + evaluated_count, + }, + &(ChainhookInstance::either_stx_or_btc_key(predicate_uuid)), + predicates_db_conn, + ctx, + ); + } + } + for (predicate_uuid, blocks_ids) in report.predicates_expired.iter() { + if let Some(last_evaluated_height) = blocks_ids.last().map(|b| b.index) { + let evaluated_count = blocks_ids.len().try_into().unwrap_or(0); + set_unconfirmed_expiration_status( + &chain, + evaluated_count, + last_evaluated_height, + &(ChainhookInstance::either_stx_or_btc_key(predicate_uuid)), + predicates_db_conn, + ctx, + ); + } + } +} + +/// Updates a predicate's status to `Scanning`. +/// +/// Sets the `last_occurrence` time to the current time if a new trigger has occurred since the last status update. +pub fn set_predicate_scanning_status( + predicate_key: &str, + number_of_blocks_to_scan: u64, + number_of_blocks_evaluated: u64, + number_of_times_triggered: u64, + current_block_height: u64, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let now_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Could not get current time in ms") + .as_secs(); + let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); + let last_occurrence = match current_status { + Some(status) => match status { + PredicateStatus::Scanning(scanning_data) => { + if number_of_times_triggered > scanning_data.number_of_times_triggered { + Some(now_secs) + } else { + scanning_data.last_occurrence + } + } + PredicateStatus::Streaming(streaming_data) => { + if number_of_times_triggered > streaming_data.number_of_times_triggered { + Some(now_secs) + } else { + streaming_data.last_occurrence + } + } + PredicateStatus::UnconfirmedExpiration(expired_data) => { + if number_of_times_triggered > expired_data.number_of_times_triggered { + Some(now_secs) + } else { + expired_data.last_occurrence + } + } + PredicateStatus::New => { + if number_of_times_triggered > 0 { + Some(now_secs) + } else { + None + } + } + PredicateStatus::ConfirmedExpiration(_) | PredicateStatus::Interrupted(_) => { + warn!(ctx.expect_logger(), "Attempting to set Scanning status when previous status was {:?} for predicate {}", status, predicate_key); + return; + } + }, + None => None, + }; + + update_predicate_status( + predicate_key, + PredicateStatus::Scanning(ScanningData { + number_of_blocks_to_scan, + number_of_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + last_evaluated_block_height: current_block_height, + }), + predicates_db_conn, + ctx, + ); +} + +fn expire_predicates_for_block( + chain: &Chain, + confirmed_block_index: u64, + predicates_db_conn: &mut Connection, + ctx: &Context, +) -> Option> { + match get_predicates_expiring_at_block(chain, confirmed_block_index, predicates_db_conn, ctx) { + Some(predicates_to_expire) => { + for predicate_key in predicates_to_expire.iter() { + set_confirmed_expiration_status(predicate_key, predicates_db_conn, ctx); + } + Some(predicates_to_expire) + } + None => None, + } +} + +/// Updates a predicate's status to `UnconfirmedExpiration`. +pub fn set_unconfirmed_expiration_status( + chain: &Chain, + number_of_new_blocks_evaluated: u64, + last_evaluated_block_height: u64, + predicate_key: &str, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); + let mut previously_was_unconfirmed = false; + let ( + number_of_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + expired_at_block_height, + ) = match current_status { + Some(status) => match status { + PredicateStatus::Scanning(ScanningData { + number_of_blocks_to_scan: _, + number_of_blocks_evaluated: _, + number_of_times_triggered, + last_occurrence, + last_evaluated_block_height, + }) => ( + number_of_new_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + last_evaluated_block_height, + ), + PredicateStatus::New => (0, 0, None, 0), + PredicateStatus::Streaming(StreamingData { + last_occurrence, + last_evaluation: _, + number_of_times_triggered, + number_of_blocks_evaluated, + last_evaluated_block_height, + }) => ( + number_of_blocks_evaluated + number_of_new_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + last_evaluated_block_height, + ), + PredicateStatus::UnconfirmedExpiration(ExpiredData { + number_of_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + last_evaluated_block_height: _, + expired_at_block_height, + }) => { + previously_was_unconfirmed = true; + ( + number_of_blocks_evaluated + number_of_new_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + expired_at_block_height, + ) + } + PredicateStatus::ConfirmedExpiration(_) | PredicateStatus::Interrupted(_) => { + warn!(ctx.expect_logger(), "Attempting to set UnconfirmedExpiration status when previous status was {:?} for predicate {}", status, predicate_key); + return; + } + }, + None => (0, 0, None, 0), + }; + update_predicate_status( + predicate_key, + PredicateStatus::UnconfirmedExpiration(ExpiredData { + number_of_blocks_evaluated, + number_of_times_triggered, + last_occurrence, + last_evaluated_block_height, + expired_at_block_height, + }), + predicates_db_conn, + ctx, + ); + // don't insert this entry more than once + if !previously_was_unconfirmed { + insert_predicate_expiration( + chain, + expired_at_block_height, + predicate_key, + predicates_db_conn, + ctx, + ); + } +} + +pub fn set_confirmed_expiration_status( + predicate_key: &str, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let current_status = retrieve_predicate_status(predicate_key, predicates_db_conn); + let expired_data = match current_status { + Some(status) => match status { + PredicateStatus::UnconfirmedExpiration(expired_data) => expired_data, + PredicateStatus::ConfirmedExpiration(_) + | PredicateStatus::Interrupted(_) + | PredicateStatus::New + | PredicateStatus::Scanning(_) + | PredicateStatus::Streaming(_) => { + warn!(ctx.expect_logger(), "Attempting to set ConfirmedExpiration status when previous status was {:?} for predicate {}", status, predicate_key); + return; + } + }, + None => { + // None means the predicate was deleted, so we can just ignore this predicate expiring + return; + } + }; + update_predicate_status( + predicate_key, + PredicateStatus::ConfirmedExpiration(expired_data), + predicates_db_conn, + ctx, + ); +} + +fn get_predicate_expiration_key(chain: &Chain, block_height: u64) -> String { + match chain { + Chain::Bitcoin => format!("expires_at:bitcoin_block:{}", block_height), + Chain::Stacks => format!("expires_at:stacks_block:{}", block_height), + } +} + +fn insert_predicate_expiration( + chain: &Chain, + expired_at_block_height: u64, + predicate_key: &str, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let key = get_predicate_expiration_key(chain, expired_at_block_height); + let mut predicates_expiring_at_block = + get_predicates_expiring_at_block(chain, expired_at_block_height, predicates_db_conn, ctx) + .unwrap_or_default(); + predicates_expiring_at_block.push(predicate_key.to_owned()); + let serialized_expiring_predicates = json!(predicates_expiring_at_block).to_string(); + if let Err(e) = + predicates_db_conn.hset::<_, _, _, ()>(&key, "predicates", &serialized_expiring_predicates) + { + warn!( + ctx.expect_logger(), + "Error updating expired predicates index: {}", + e.to_string() + ); + } else { + debug!( + ctx.expect_logger(), + "Updating expired predicates at block height {expired_at_block_height} with predicate: {predicate_key}" + ); + } +} + +fn get_predicates_expiring_at_block( + chain: &Chain, + block_index: u64, + predicates_db_conn: &mut Connection, + ctx: &Context, +) -> Option> { + let key = get_predicate_expiration_key(chain, block_index); + match predicates_db_conn.hget::<_, _, String>(key.to_string(), "predicates") { + Ok(ref payload) => match serde_json::from_str(payload) { + Ok(data) => { + if let Err(e) = predicates_db_conn.hdel::<_, _, u64>(key.to_string(), "predicates") + { + warn!( + ctx.expect_logger(), + "Error removing expired predicates index: {}", + e.to_string() + ); + } + Some(data) + } + Err(_) => None, + }, + Err(_) => None, + } +} + +pub fn update_predicate_status( + predicate_key: &str, + status: PredicateStatus, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let serialized_status = json!(status).to_string(); + if let Err(e) = + predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "status", &serialized_status) + { + warn!( + ctx.expect_logger(), + "Error updating status for {}: {}", + predicate_key, + e.to_string() + ); + } else { + debug!( + ctx.expect_logger(), + "Updating predicate {predicate_key} status: {serialized_status}" + ); + } +} + +pub fn update_predicate_spec( + predicate_key: &str, + spec: &ChainhookInstance, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let serialized_spec = json!(spec).to_string(); + if let Err(e) = + predicates_db_conn.hset::<_, _, _, ()>(&predicate_key, "specification", &serialized_spec) + { + warn!( + ctx.expect_logger(), + "Error updating status for {}: {}", + predicate_key, + e.to_string() + ); + } else { + debug!( + ctx.expect_logger(), + "Updating predicate {predicate_key} with spec: {serialized_spec}" + ); + } +} + +fn retrieve_predicate_status( + predicate_key: &str, + predicates_db_conn: &mut Connection, +) -> Option { + match predicates_db_conn.hget::<_, _, String>(predicate_key.to_string(), "status") { + Ok(ref payload) => match serde_json::from_str(payload) { + Ok(data) => Some(data), + Err(_) => None, + }, + Err(_) => None, + } +} + +pub fn set_predicate_interrupted_status( + error: String, + predicate_key: &str, + predicates_db_conn: &mut Connection, + ctx: &Context, +) { + let status = PredicateStatus::Interrupted(error); + update_predicate_status(predicate_key, status, predicates_db_conn, ctx); +} + +fn get_entry_from_predicates_db( + predicate_key: &str, + predicate_db_conn: &mut Connection, + _ctx: &Context, +) -> Result, String> { + let entry: HashMap = predicate_db_conn.hgetall(predicate_key).map_err(|e| { + format!( + "unable to load chainhook associated with key {}: {}", + predicate_key, e + ) + })?; + + let encoded_spec = match entry.get("specification") { + None => return Ok(None), + Some(payload) => payload, + }; + + let spec = ChainhookInstance::deserialize_specification(encoded_spec)?; + + let encoded_status = match entry.get("status") { + None => Err(format!( + "found predicate specification with no status for predicate {}", + predicate_key + )), + Some(payload) => Ok(payload), + }?; + + let status = serde_json::from_str(encoded_status).map_err(|e| format!("{}", e))?; + + Ok(Some((spec, status))) +} + +fn get_entries_from_predicates_db( + predicate_db_conn: &mut Connection, + ctx: &Context, +) -> Result, String> { + let chainhooks_to_load: Vec = predicate_db_conn + .scan_match(ChainhookInstance::either_stx_or_btc_key("*")) + .map_err(|e| format!("unable to connect to redis: {}", e))? + .collect(); + + let mut predicates = vec![]; + for predicate_key in chainhooks_to_load.iter() { + let chainhook = match get_entry_from_predicates_db(predicate_key, predicate_db_conn, ctx) { + Ok(Some((spec, status))) => (spec, status), + Ok(None) => { + warn!( + ctx.expect_logger(), + "unable to load chainhook associated with key {}", predicate_key, + ); + continue; + } + Err(e) => { + error!( + ctx.expect_logger(), + "unable to load chainhook associated with key {}: {}", + predicate_key, + e.to_string() + ); + continue; + } + }; + predicates.push(chainhook); + } + Ok(predicates) +} + +pub fn open_readwrite_predicates_db_conn( + config: &PredicatesApiConfig, +) -> Result { + let redis_uri = &config.database_uri; + let client = redis::Client::open(redis_uri.clone()).unwrap(); + client + .get_connection() + .map_err(|e| format!("unable to connect to db: {}", e)) +} + +pub fn open_readwrite_predicates_db_conn_verbose( + config: &PredicatesApiConfig, + ctx: &Context, +) -> Result { + let res = open_readwrite_predicates_db_conn(config); + if let Err(ref e) = res { + error!(ctx.expect_logger(), "{}", e.to_string()); + } + res +} + +// todo: evaluate expects +pub fn open_readwrite_predicates_db_conn_or_panic( + config: &PredicatesApiConfig, + ctx: &Context, +) -> Connection { + open_readwrite_predicates_db_conn_verbose(config, ctx).expect("unable to open redis conn") +} diff --git a/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs b/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs index e239521c2..fd2be312c 100644 --- a/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs @@ -2,7 +2,7 @@ use super::types::{ append_error_context, validate_txid, ChainhookInstance, ExactMatchingRule, HookAction, MatchingRule, PoxConfig, TxinPredicate, }; -use crate::{observer::EventObserverConfig, utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES}}; +use crate::{chainhooks::types::PredicateStatus, observer::EventObserverConfig, utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES}}; use bitcoincore_rpc_json::bitcoin::{address::Payload, Address}; use chainhook_types::{ @@ -547,7 +547,7 @@ pub enum BitcoinChainhookOccurrence { pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>( chain_event: &'a BitcoinChainEvent, - active_chainhooks: &Vec<&'a BitcoinChainhookInstance>, + active_chainhooks: &Vec<&'a (BitcoinChainhookInstance, PredicateStatus)>, ctx: &Context, ) -> ( Vec>, @@ -560,7 +560,7 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>( match chain_event { BitcoinChainEvent::ChainUpdatedWithBlocks(event) => { - for chainhook in active_chainhooks.iter() { + for (chainhook, _) in active_chainhooks.iter() { let mut apply = vec![]; let rollback = vec![]; let end_block = chainhook.end_block.unwrap_or(u64::MAX); @@ -592,7 +592,7 @@ pub fn evaluate_bitcoin_chainhooks_on_chain_event<'a>( } } BitcoinChainEvent::ChainUpdatedWithReorg(event) => { - for chainhook in active_chainhooks.iter() { + for (chainhook, _) in active_chainhooks.iter() { let mut apply = vec![]; let mut rollback = vec![]; let end_block = chainhook.end_block.unwrap_or(u64::MAX); diff --git a/components/chainhook-sdk/src/chainhooks/database.rs b/components/chainhook-sdk/src/chainhooks/database.rs new file mode 100644 index 000000000..baaaed331 --- /dev/null +++ b/components/chainhook-sdk/src/chainhooks/database.rs @@ -0,0 +1,106 @@ +use crate::chainhooks::bitcoin::BitcoinChainhookInstance; +use crate::chainhooks::stacks::StacksChainhookInstance; +use crate::chainhooks::types::{ChainhookInstance, PredicateStatus}; +use crate::observer::PredicateEvaluationReport; +use crate::utils::Context; + +/// Trait for predicate storage operations needed by the SDK +/// This allows the SDK to work with different storage implementations +/// without creating circular dependencies +pub trait PredicatesDatabaseAccess { + fn get_predicate(&self, uuid: &String, ctx: &Context) -> Result, String>; + + fn insert_predicate(&self, predicate: ChainhookInstance, ctx: &Context) -> Result<(), String>; + + fn enable_predicate(&self, predicate: ChainhookInstance, ctx: &Context) -> Result<(), String>; + + fn delete_predicate(&self, uuid: &String, ctx: &Context) -> Result<(), String>; + + fn expire_stacks_predicates_for_block(&self, block_height: u64, ctx: &Context) -> Result<(), String>; + + fn expire_bitcoin_predicates_for_block(&self, block_height: u64, ctx: &Context) -> Result<(), String>; + + fn interrupt_predicate(&self, uuid: &String, error: String, ctx: &Context) -> Result<(), String>; + + fn update_stacks_predicates_from_report(&self, report: PredicateEvaluationReport, ctx: &Context) -> Result<(), String>; + + fn update_bitcoin_predicates_from_report(&self, report: PredicateEvaluationReport, ctx: &Context) -> Result<(), String>; + + fn get_all_predicates(&self, ctx: &Context) -> Result, String>; + + /// Get all active Stacks predicates from storage + fn get_active_stacks_predicates( + &self, + ctx: &Context, + ) -> Result, String>; + + /// Get all active Bitcoin predicates from storage + fn get_active_bitcoin_predicates( + &self, + ctx: &Context, + ) -> Result, String>; +} + +impl PredicatesDatabaseAccess for () { + fn get_predicate(&self, _uuid: &String, _ctx: &Context) -> Result, String> { + Ok(None) + } + + fn insert_predicate( + &self, + _predicate: ChainhookInstance, + _ctx: &Context, + ) -> Result<(), String> { + Ok(()) + } + + fn enable_predicate( + &self, + _predicate: ChainhookInstance, + _ctx: &Context, + ) -> Result<(), String> { + Ok(()) + } + + fn delete_predicate(&self, _uuid: &String, _ctx: &Context) -> Result<(), String> { + Ok(()) + } + + fn expire_stacks_predicates_for_block(&self, _block_height: u64, _ctx: &Context) -> Result<(), String> { + Ok(()) + } + + fn expire_bitcoin_predicates_for_block(&self, _block_height: u64, _ctx: &Context) -> Result<(), String> { + Ok(()) + } + + fn interrupt_predicate(&self, _uuid: &String, _error: String, _ctx: &Context) -> Result<(), String> { + Ok(()) + } + + fn update_stacks_predicates_from_report(&self, _report: PredicateEvaluationReport, _ctx: &Context) -> Result<(), String> { + Ok(()) + } + + fn update_bitcoin_predicates_from_report(&self, _report: PredicateEvaluationReport, _ctx: &Context) -> Result<(), String> { + Ok(()) + } + + fn get_all_predicates(&self, _ctx: &Context) -> Result, String> { + Ok(vec![]) + } + + fn get_active_stacks_predicates( + &self, + _ctx: &Context, + ) -> Result, String> { + Ok(vec![]) + } + + fn get_active_bitcoin_predicates( + &self, + _ctx: &Context, + ) -> Result, String> { + Ok(vec![]) + } +} diff --git a/components/chainhook-sdk/src/chainhooks/mod.rs b/components/chainhook-sdk/src/chainhooks/mod.rs index 701fd7ebd..6f5085b7e 100644 --- a/components/chainhook-sdk/src/chainhooks/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/mod.rs @@ -1,4 +1,5 @@ pub mod bitcoin; +pub mod database; pub mod stacks; pub mod types; diff --git a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs index 36995f1a4..b6527b7ff 100644 --- a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs @@ -1,3 +1,4 @@ +use crate::chainhooks::types::PredicateStatus; use crate::observer::EventObserverConfig; use crate::utils::{AbstractStacksBlock, Context, MAX_BLOCK_HEIGHTS_ENTRIES}; @@ -550,7 +551,7 @@ impl<'a> StacksTriggerChainhook<'a> { pub fn evaluate_stacks_chainhooks_on_chain_event<'a>( chain_event: &'a StacksChainEvent, - active_chainhooks: Vec<&'a StacksChainhookInstance>, + active_chainhooks: Vec<&'a (StacksChainhookInstance, PredicateStatus)>, ctx: &Context, ) -> ( Vec>, @@ -562,7 +563,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>( let mut expired_predicates = BTreeMap::new(); match chain_event { StacksChainEvent::ChainUpdatedWithBlocks(update) => { - for chainhook in active_chainhooks.iter() { + for (chainhook, _) in active_chainhooks.iter() { let mut apply = vec![]; let mut rollback = vec![]; for block_update in update.new_blocks.iter() { @@ -615,7 +616,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>( } } StacksChainEvent::ChainUpdatedWithMicroblocks(update) => { - for chainhook in active_chainhooks.iter() { + for (chainhook, _) in active_chainhooks.iter() { let mut apply = vec![]; let rollback = vec![]; @@ -644,7 +645,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>( } } StacksChainEvent::ChainUpdatedWithMicroblocksReorg(update) => { - for chainhook in active_chainhooks.iter() { + for (chainhook, _) in active_chainhooks.iter() { let mut apply = vec![]; let mut rollback = vec![]; @@ -681,7 +682,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>( } } StacksChainEvent::ChainUpdatedWithReorg(update) => { - for chainhook in active_chainhooks.iter() { + for (chainhook, _) in active_chainhooks.iter() { let mut apply = vec![]; let mut rollback = vec![]; @@ -745,7 +746,7 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>( #[cfg(feature = "stacks-signers")] StacksChainEvent::ChainUpdatedWithNonConsensusEvents(data) => { if let Some(first_event) = data.events.first() { - for chainhook in active_chainhooks.iter() { + for (chainhook, _) in active_chainhooks.iter() { evaluated_predicates .insert(chainhook.uuid.as_str(), &first_event.received_at_block); let (occurrences, mut expirations) = diff --git a/components/chainhook-sdk/src/chainhooks/tests/mod.rs b/components/chainhook-sdk/src/chainhooks/tests/mod.rs index 7f8b7a7fa..c75d0ea82 100644 --- a/components/chainhook-sdk/src/chainhooks/tests/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/tests/mod.rs @@ -13,7 +13,7 @@ use super::{ types::{ExactMatchingRule, FileHook}, }; use crate::{ - chainhooks::stacks::serialize_stacks_payload_to_json, + chainhooks::{stacks::serialize_stacks_payload_to_json, types::PredicateStatus}, observer::EventObserverConfig, utils::Context, }; @@ -410,8 +410,9 @@ fn test_stacks_predicates( enabled: true, expired_at: None, }; + let entry = (chainhook, PredicateStatus::New); - let predicates = vec![&chainhook]; + let predicates = vec![&entry]; let (triggered, _predicates_evaluated, _expired) = evaluate_stacks_chainhooks_on_chain_event(&event, predicates, &Context::empty()); @@ -490,8 +491,8 @@ fn test_stacks_predicate_contract_deploy(predicate: StacksPredicate, expected_ap enabled: true, expired_at: None, }; - - let predicates = vec![&chainhook]; + let entry = (chainhook, PredicateStatus::New); + let predicates = vec![&entry]; let (triggered, _predicates_evaluated, _predicates_expired) = evaluate_stacks_chainhooks_on_chain_event(&event, predicates, &Context::empty()); @@ -527,7 +528,7 @@ fn verify_optional_addition_of_contract_abi() { new_blocks, confirmed_blocks: vec![], }); - let mut contract_deploy_chainhook = StacksChainhookInstance { + let contract_deploy_chainhook = StacksChainhookInstance { uuid: "contract-deploy".to_string(), owner_uuid: None, name: "".to_string(), @@ -547,6 +548,7 @@ fn verify_optional_addition_of_contract_abi() { enabled: true, expired_at: None, }; + let mut entry1 = (contract_deploy_chainhook, PredicateStatus::New); let contract_call_chainhook = StacksChainhookInstance { uuid: "contract-call".to_string(), owner_uuid: None, @@ -568,8 +570,9 @@ fn verify_optional_addition_of_contract_abi() { enabled: true, expired_at: None, }; + let entry2 = (contract_call_chainhook, PredicateStatus::New); - let predicates = vec![&contract_deploy_chainhook, &contract_call_chainhook]; + let predicates = vec![&entry1, &entry2]; let (triggered, _blocks, _) = evaluate_stacks_chainhooks_on_chain_event(&event, predicates, &Context::empty()); assert_eq!(triggered.len(), 2); @@ -593,8 +596,8 @@ fn verify_optional_addition_of_contract_abi() { } } } - contract_deploy_chainhook.include_contract_abi = Some(false); - let predicates = vec![&contract_deploy_chainhook, &contract_call_chainhook]; + entry1.0.include_contract_abi = Some(false); + let predicates = vec![&entry1, &entry2]; let (triggered, _blocks, _) = evaluate_stacks_chainhooks_on_chain_event(&event, predicates, &Context::empty()); assert_eq!(triggered.len(), 2); @@ -684,8 +687,9 @@ fn test_stacks_predicate_contract_call(predicate: StacksPredicate, expected_appl enabled: true, expired_at: None, }; + let entry = (chainhook, PredicateStatus::New); - let predicates = vec![&chainhook]; + let predicates = vec![&entry]; let (triggered, _predicates_evaluated, _predicates_expired) = evaluate_stacks_chainhooks_on_chain_event(&event, predicates, &Context::empty()); diff --git a/components/chainhook-sdk/src/chainhooks/types.rs b/components/chainhook-sdk/src/chainhooks/types.rs index c925a8da0..dcd9a1c47 100644 --- a/components/chainhook-sdk/src/chainhooks/types.rs +++ b/components/chainhook-sdk/src/chainhooks/types.rs @@ -1,7 +1,6 @@ use std::str::FromStr; -use chainhook_types::{BitcoinNetwork, StacksNetwork}; -use serde::ser::{SerializeSeq, Serializer}; +use chainhook_types::BitcoinNetwork; use serde::{Deserialize, Serialize}; use schemars::JsonSchema; @@ -11,155 +10,6 @@ use crate::chainhooks::bitcoin::BitcoinChainhookSpecificationNetworkMap; use crate::chainhooks::stacks::StacksChainhookInstance; use crate::chainhooks::stacks::StacksChainhookSpecificationNetworkMap; -#[derive(Deserialize, Debug, Clone)] -pub struct ChainhookStore { - pub stacks_chainhooks: Vec, - pub bitcoin_chainhooks: Vec, -} - -impl Default for ChainhookStore { - fn default() -> Self { - Self::new() - } -} - -impl ChainhookStore { - pub fn new() -> ChainhookStore { - ChainhookStore { - stacks_chainhooks: vec![], - bitcoin_chainhooks: vec![], - } - } - - pub fn register_instance_from_network_map( - &mut self, - networks: (&BitcoinNetwork, &StacksNetwork), - hook: ChainhookSpecificationNetworkMap, - ) -> Result { - let spec = match hook { - ChainhookSpecificationNetworkMap::Stacks(hook) => { - let spec = hook.into_specification_for_network(networks.1)?; - self.stacks_chainhooks.push(spec.clone()); - ChainhookInstance::Stacks(spec) - } - ChainhookSpecificationNetworkMap::Bitcoin(hook) => { - let spec = hook.into_specification_for_network(networks.0)?; - self.bitcoin_chainhooks.push(spec.clone()); - ChainhookInstance::Bitcoin(spec) - } - }; - Ok(spec) - } - - pub fn enable_instance(&mut self, predicate_spec: &mut ChainhookInstance) { - match predicate_spec { - ChainhookInstance::Stacks(spec_to_enable) => { - for spec in self.stacks_chainhooks.iter_mut() { - if spec.uuid.eq(&spec_to_enable.uuid) { - spec.enabled = true; - spec_to_enable.enabled = true; - break; - } - } - } - ChainhookInstance::Bitcoin(spec_to_enable) => { - for spec in self.bitcoin_chainhooks.iter_mut() { - if spec.uuid.eq(&spec_to_enable.uuid) { - spec.enabled = true; - spec_to_enable.enabled = true; - break; - } - } - } - }; - } - - pub fn register_instance(&mut self, spec: ChainhookInstance) -> Result<(), String> { - match spec { - ChainhookInstance::Stacks(spec) => { - let spec = spec.clone(); - self.stacks_chainhooks.push(spec); - } - ChainhookInstance::Bitcoin(spec) => { - let spec = spec.clone(); - self.bitcoin_chainhooks.push(spec); - } - }; - Ok(()) - } - - pub fn deregister_stacks_hook(&mut self, hook_uuid: String) -> Option { - let mut i = 0; - while i < self.stacks_chainhooks.len() { - if self.stacks_chainhooks[i].uuid == hook_uuid { - let hook = self.stacks_chainhooks.remove(i); - return Some(hook); - } else { - i += 1; - } - } - None - } - - pub fn deregister_bitcoin_hook( - &mut self, - hook_uuid: String, - ) -> Option { - let mut i = 0; - while i < self.bitcoin_chainhooks.len() { - if self.bitcoin_chainhooks[i].uuid == hook_uuid { - let hook = self.bitcoin_chainhooks.remove(i); - return Some(hook); - } else { - i += 1; - } - } - None - } - - pub fn expire_stacks_hook(&mut self, hook_uuid: String, block_height: u64) { - let mut i = 0; - while i < self.stacks_chainhooks.len() { - if ChainhookInstance::stacks_key(&self.stacks_chainhooks[i].uuid) == hook_uuid { - self.stacks_chainhooks[i].expired_at = Some(block_height); - break; - } else { - i += 1; - } - } - } - - pub fn expire_bitcoin_hook(&mut self, hook_uuid: String, block_height: u64) { - let mut i = 0; - while i < self.bitcoin_chainhooks.len() { - if ChainhookInstance::bitcoin_key(&self.bitcoin_chainhooks[i].uuid) == hook_uuid { - self.bitcoin_chainhooks[i].expired_at = Some(block_height); - break; - } else { - i += 1; - } - } - } -} - -impl Serialize for ChainhookStore { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut seq = serializer.serialize_seq(Some( - self.bitcoin_chainhooks.len() + self.stacks_chainhooks.len(), - ))?; - for chainhook in self.bitcoin_chainhooks.iter() { - seq.serialize_element(chainhook)?; - } - for chainhook in self.stacks_chainhooks.iter() { - seq.serialize_element(chainhook)?; - } - seq.end() - } -} - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "snake_case")] pub enum ChainhookInstance { @@ -199,6 +49,13 @@ impl ChainhookInstance { Self::Stacks(data) => &data.uuid, } } + + pub fn is_enabled(&self) -> bool { + match &self { + Self::Bitcoin(data) => data.enabled, + Self::Stacks(data) => data.enabled, + } + } } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, JsonSchema)] @@ -1078,3 +935,40 @@ pub fn validate_txid(txid: &String) -> Result<(), String> { } Ok(()) } + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum PredicateStatus { + Scanning(ScanningData), + Streaming(StreamingData), + UnconfirmedExpiration(ExpiredData), + ConfirmedExpiration(ExpiredData), + Interrupted(String), + New, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] +pub struct ScanningData { + pub number_of_blocks_to_scan: u64, + pub number_of_blocks_evaluated: u64, + pub number_of_times_triggered: u64, + pub last_occurrence: Option, + pub last_evaluated_block_height: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct StreamingData { + pub last_occurrence: Option, + pub last_evaluation: u64, + pub number_of_times_triggered: u64, + pub number_of_blocks_evaluated: u64, + pub last_evaluated_block_height: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ExpiredData { + pub number_of_blocks_evaluated: u64, + pub number_of_times_triggered: u64, + pub last_occurrence: Option, + pub last_evaluated_block_height: u64, + pub expired_at_block_height: u64, +} diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index cb27094b0..3a8420037 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -7,13 +7,12 @@ use crate::chainhooks::bitcoin::{ BitcoinChainhookInstance, BitcoinChainhookOccurrence, BitcoinChainhookOccurrencePayload, BitcoinTriggerChainhook, }; +use crate::chainhooks::database::PredicatesDatabaseAccess; use crate::chainhooks::stacks::{ evaluate_stacks_chainhooks_on_chain_event, handle_stacks_hook_action, StacksChainhookInstance, StacksChainhookOccurrence, StacksChainhookOccurrencePayload, }; -use crate::chainhooks::types::{ - ChainhookInstance, ChainhookSpecificationNetworkMap, ChainhookStore, -}; +use crate::chainhooks::types::{ChainhookInstance, ChainhookSpecificationNetworkMap}; use crate::indexer::bitcoin::{ build_http_client, download_and_parse_block_with_retry, standardize_bitcoin_block, @@ -22,6 +21,7 @@ use crate::indexer::bitcoin::{ use crate::indexer::database::BlocksDatabaseAccess; use crate::indexer::{Indexer, IndexerConfig}; use crate::monitoring::{start_serving_prometheus_metrics, PrometheusMonitoring}; +use crate::try_info; use crate::utils::{send_concurrent_http_requests, Context}; use bitcoincore_rpc::bitcoin::{BlockHash, Txid}; @@ -99,7 +99,6 @@ impl Default for PredicatesConfig { #[derive(Debug, Clone)] pub struct EventObserverConfig { - pub registered_chainhooks: ChainhookStore, pub predicates_config: PredicatesConfig, pub bitcoin_rpc_proxy_enabled: bool, pub bitcoind_rpc_username: String, @@ -321,7 +320,6 @@ impl BitcoinEventObserverConfigBuilder { BitcoinNetwork::Regtest }; Ok(EventObserverConfig { - registered_chainhooks: ChainhookStore::new(), predicates_config: PredicatesConfig { payload_http_request_timeout_ms: None, payload_http_request_concurrency: DEFAULT_PAYLOAD_HTTP_REQUEST_CONCURRENCY, @@ -358,7 +356,6 @@ impl BitcoinEventObserverConfigBuilder { impl EventObserverConfig { pub fn default() -> Self { EventObserverConfig { - registered_chainhooks: ChainhookStore::new(), predicates_config: PredicatesConfig { payload_http_request_timeout_ms: None, payload_http_request_concurrency: DEFAULT_PAYLOAD_HTTP_REQUEST_CONCURRENCY, @@ -382,14 +379,12 @@ impl EventObserverConfig { } /// Adds a [ChainhookInstance] to config's the registered chainhook store, returning the updated config. + /// Note: This method is deprecated as predicates are now managed through the PredicatesDatabaseAccess trait. pub fn register_chainhook_instance( &mut self, - spec: ChainhookInstance, + _spec: ChainhookInstance, ) -> Result<&mut Self, String> { - let mut chainhook_config = ChainhookStore::new(); - chainhook_config.register_instance(spec)?; - self.registered_chainhooks = chainhook_config; - + // No-op: predicates are now managed through Redis Ok(self) } @@ -447,7 +442,6 @@ impl EventObserverConfig { let config = EventObserverConfig { bitcoin_rpc_proxy_enabled: false, - registered_chainhooks: ChainhookStore::new(), predicates_config: PredicatesConfig { payload_http_request_timeout_ms: None, payload_http_request_concurrency: DEFAULT_PAYLOAD_HTTP_REQUEST_CONCURRENCY, @@ -505,8 +499,6 @@ pub enum ObserverCommand { EnablePredicate(ChainhookInstance), DeregisterBitcoinPredicate(String), DeregisterStacksPredicate(String), - ExpireBitcoinPredicate(HookExpirationData), - ExpireStacksPredicate(HookExpirationData), NotifyBitcoinTransactionProxied, Terminate, } @@ -747,7 +739,10 @@ impl ObserverSidecar { /// .start() /// } /// ``` -pub struct EventObserverBuilder { +pub struct EventObserverBuilder< + D: BlocksDatabaseAccess + Send + Sync + 'static, + P: PredicatesDatabaseAccess + Send + Sync + 'static, +> { config: EventObserverConfig, observer_commands_tx: Sender, observer_commands_rx: Receiver, @@ -757,13 +752,19 @@ pub struct EventObserverBuilder stacks_startup_context: Option, stacks_block_processing_flag: Arc, stacks_database_access: Option, + predicates_database_access: P, } -impl EventObserverBuilder { +impl< + D: BlocksDatabaseAccess + Send + Sync + 'static, + P: PredicatesDatabaseAccess + Send + Sync + 'static, + > EventObserverBuilder +{ pub fn new( config: EventObserverConfig, observer_commands_tx: &Sender, observer_commands_rx: Receiver, + predicates_database_access: P, ctx: &Context, ) -> Self { EventObserverBuilder { @@ -776,6 +777,7 @@ impl EventObserverBuilder { stacks_startup_context: None, stacks_block_processing_flag: Arc::new(AtomicBool::new(false)), stacks_database_access: None, + predicates_database_access, } } @@ -819,13 +821,17 @@ impl EventObserverBuilder { self.stacks_startup_context, self.stacks_block_processing_flag, self.stacks_database_access, + self.predicates_database_access, self.ctx, ) } } /// Spawns a thread to observe blockchain events. Use [EventObserverBuilder] to configure easily. -pub fn start_event_observer( +pub fn start_event_observer< + D: BlocksDatabaseAccess + Send + Sync + 'static, + P: PredicatesDatabaseAccess + Send + Sync + 'static, +>( config: EventObserverConfig, observer_commands_tx: Sender, observer_commands_rx: Receiver, @@ -834,6 +840,7 @@ pub fn start_event_observer( stacks_startup_context: Option, stacks_block_processing_flag: Arc, stacks_database_access: Option, + predicates_database_access: P, ctx: Context, ) -> Result<(), Box> { match config.bitcoin_block_signaling { @@ -844,6 +851,7 @@ pub fn start_event_observer( let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); + let predicates_database_access_moved = predicates_database_access; let _ = hiro_system_kit::thread_named("Chainhook event observer") .spawn(move || { let future = start_bitcoin_event_observer( @@ -852,6 +860,7 @@ pub fn start_event_observer( observer_commands_rx, observer_events_tx.clone(), observer_sidecar, + predicates_database_access_moved, context_cloned.clone(), ); match hiro_system_kit::nestable_block_on(future) { @@ -876,6 +885,7 @@ pub fn start_event_observer( let context_cloned = ctx.clone(); let event_observer_config_moved = config.clone(); let observer_commands_tx_moved = observer_commands_tx.clone(); + let predicates_database_access_moved = predicates_database_access; let _ = hiro_system_kit::thread_named("Chainhook event observer") .spawn(move || { @@ -888,6 +898,7 @@ pub fn start_event_observer( stacks_startup_context.unwrap_or_default(), stacks_block_processing_flag, stacks_database_access, + predicates_database_access_moved, context_cloned.clone(), ); match hiro_system_kit::nestable_block_on(future) { @@ -923,15 +934,15 @@ pub fn start_event_observer( Ok(()) } -pub async fn start_bitcoin_event_observer( +pub async fn start_bitcoin_event_observer( config: EventObserverConfig, _observer_commands_tx: Sender, observer_commands_rx: Receiver, observer_events_tx: Option>, observer_sidecar: Option, + predicates_database_access: P, ctx: Context, ) -> Result<(), Box> { - let chainhook_store = config.registered_chainhooks.clone(); #[cfg(feature = "zeromq")] { let ctx_moved = ctx.clone(); @@ -944,11 +955,8 @@ pub async fn start_bitcoin_event_observer( } let prometheus_monitoring = PrometheusMonitoring::new(); - prometheus_monitoring.initialize( - chainhook_store.stacks_chainhooks.len() as u64, - chainhook_store.bitcoin_chainhooks.len() as u64, - None, - ); + // Initialize with 0 counts - will be updated dynamically as predicates are registered + prometheus_monitoring.initialize(0, 0, None); if let Some(port) = config.prometheus_monitoring_port { let registry_moved = prometheus_monitoring.registry.clone(); @@ -965,7 +973,7 @@ pub async fn start_bitcoin_event_observer( // This loop is used for handling background jobs, emitted by HTTP calls. start_observer_commands_handler( config, - chainhook_store, + predicates_database_access, observer_commands_rx, observer_events_tx, None, @@ -977,7 +985,10 @@ pub async fn start_bitcoin_event_observer( .await } -pub async fn start_stacks_event_observer( +pub async fn start_stacks_event_observer< + D: BlocksDatabaseAccess + Send + Sync + 'static, + P: PredicatesDatabaseAccess + Send + Sync + 'static, +>( config: EventObserverConfig, observer_commands_tx: Sender, observer_commands_rx: Receiver, @@ -986,6 +997,7 @@ pub async fn start_stacks_event_observer, stacks_database_access: Option, + predicates_database_access: P, ctx: Context, ) -> Result<(), Box> { let indexer_config = IndexerConfig { @@ -1015,16 +1027,15 @@ pub async fn start_stacks_event_observer( config: EventObserverConfig, - mut chainhook_store: ChainhookStore, + predicates_database_access: P, observer_commands_rx: Receiver, observer_events_tx: Option>, ingestion_shutdown: Option, @@ -1469,12 +1482,8 @@ pub async fn start_observer_commands_handler( let mut requests = vec![]; let mut report = PredicateEvaluationReport::new(); - let bitcoin_chainhooks = chainhook_store - .bitcoin_chainhooks - .iter() - .filter(|p| p.enabled) - .filter(|p| p.expired_at.is_none()) - .collect::>(); + let bitcoin_chainhooks = + predicates_database_access.get_active_bitcoin_predicates(&ctx)?; ctx.try_log(|logger| { slog::info!( logger, @@ -1483,10 +1492,11 @@ pub async fn start_observer_commands_handler( ) }); + let bitcoin_chainhooks_refs: Vec<_> = bitcoin_chainhooks.iter().collect(); let (predicates_triggered, predicates_evaluated, predicates_expired) = evaluate_bitcoin_chainhooks_on_chain_event( &chain_event, - &bitcoin_chainhooks, + &bitcoin_chainhooks_refs, &ctx, ); @@ -1551,6 +1561,9 @@ pub async fn start_observer_commands_handler( ) }); + predicates_database_access + .update_bitcoin_predicates_from_report(report.clone(), &ctx)?; + if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicatesTriggered( chainhooks_to_trigger.len(), @@ -1594,14 +1607,9 @@ pub async fn start_observer_commands_handler( ) }); - for hook_uuid in hooks_ids_to_deregister.iter() { - if chainhook_store - .deregister_bitcoin_hook(hook_uuid.clone()) - .is_some() - { + if let Some(ref tx) = observer_events_tx { + for hook_uuid in hooks_ids_to_deregister.iter() { prometheus_monitoring.btc_metrics_deregister_predicate(); - } - if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateDeregistered( PredicateDeregisteredEvent { predicate_uuid: hook_uuid.clone(), @@ -1622,17 +1630,47 @@ pub async fn start_observer_commands_handler( } } Err(e) => { - chainhook_store.deregister_bitcoin_hook(data.chainhook.uuid.clone()); + let error = + format!("Unable to evaluate predicate on Bitcoin chainstate: {e}"); + predicates_database_access.interrupt_predicate( + &data.chainhook.uuid, + error.clone(), + &ctx, + )?; if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateInterrupted(PredicateInterruptedData { - predicate_key: ChainhookInstance::bitcoin_key(&data.chainhook.uuid), - error: format!("Unable to evaluate predicate on Bitcoin chainstate: {}", e) - })); + let _ = tx.send(ObserverEvent::PredicateInterrupted( + PredicateInterruptedData { + predicate_key: ChainhookInstance::bitcoin_key( + &data.chainhook.uuid, + ), + error: error, + }, + )); } } }; } + // Expire predicates based on block height + match &chain_event { + BitcoinChainEvent::ChainUpdatedWithBlocks(update) => { + for confirmed_block in &update.confirmed_blocks { + predicates_database_access.expire_bitcoin_predicates_for_block( + confirmed_block.block_identifier.index, + &ctx, + )?; + } + } + BitcoinChainEvent::ChainUpdatedWithReorg(update) => { + for confirmed_block in &update.confirmed_blocks { + predicates_database_access.expire_bitcoin_predicates_for_block( + confirmed_block.block_identifier.index, + &ctx, + )?; + } + } + } + prometheus_monitoring.btc_metrics_block_evaluated(new_tip); if let Some(ref tx) = observer_events_tx { @@ -1640,26 +1678,17 @@ pub async fn start_observer_commands_handler( } } ObserverCommand::PropagateStacksChainEvent(chain_event) => { - ctx.try_log(|logger| { - slog::info!(logger, "Handling PropagateStacksChainEvent command") - }); let mut hooks_ids_to_deregister = vec![]; let mut requests = vec![]; let mut report = PredicateEvaluationReport::new(); - let stacks_chainhooks = chainhook_store - .stacks_chainhooks - .iter() - .filter(|p| p.enabled) - .filter(|p| p.expired_at.is_none()) - .collect::>(); - ctx.try_log(|logger| { - slog::info!( - logger, - "Evaluating {} stacks chainhooks registered", - stacks_chainhooks.len() - ) - }); + let stacks_chainhooks = + predicates_database_access.get_active_stacks_predicates(&ctx)?; + try_info!( + ctx, + "Evaluating {} stacks chainhooks registered", + stacks_chainhooks.len() + ); // track stacks chain metrics let new_tip = match &chain_event { @@ -1697,10 +1726,11 @@ pub async fn start_observer_commands_handler( // process hooks // TODO: use thread pool to evaluate predicates + let stacks_chainhooks_refs: Vec<_> = stacks_chainhooks.iter().collect(); let (predicates_triggered, predicates_evaluated, predicates_expired) = evaluate_stacks_chainhooks_on_chain_event( &chain_event, - stacks_chainhooks, + stacks_chainhooks_refs, &ctx, ); for (uuid, block_identifier) in predicates_evaluated.into_iter() { @@ -1725,13 +1755,11 @@ pub async fn start_observer_commands_handler( } report.track_trigger(&entry.chainhook.uuid, &block_ids); } - ctx.try_log(|logger| { - slog::info!( - logger, - "{} stacks chainhooks positive evaluations", - predicates_triggered.len() - ) - }); + try_info!( + ctx, + "{} stacks chainhooks positive evaluations", + predicates_triggered.len() + ); let mut chainhooks_to_trigger = vec![]; @@ -1751,6 +1779,9 @@ pub async fn start_observer_commands_handler( } } + predicates_database_access + .update_stacks_predicates_from_report(report.clone(), &ctx)?; + if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicatesTriggered( chainhooks_to_trigger.len(), @@ -1788,14 +1819,9 @@ pub async fn start_observer_commands_handler( } } - for hook_uuid in hooks_ids_to_deregister.iter() { - if chainhook_store - .deregister_stacks_hook(hook_uuid.clone()) - .is_some() - { + if let Some(ref tx) = observer_events_tx { + for hook_uuid in hooks_ids_to_deregister.iter() { prometheus_monitoring.stx_metrics_deregister_predicate(); - } - if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateDeregistered( PredicateDeregisteredEvent { predicate_uuid: hook_uuid.clone(), @@ -1816,12 +1842,22 @@ pub async fn start_observer_commands_handler( } } Err(e) => { - chainhook_store.deregister_stacks_hook(data.chainhook.uuid.clone()); + let error = + format!("Unable to evaluate predicate on Stacks chainstate: {e}"); + predicates_database_access.interrupt_predicate( + &data.chainhook.uuid, + error.clone(), + &ctx, + )?; if let Some(ref tx) = observer_events_tx { - let _ = tx.send(ObserverEvent::PredicateInterrupted(PredicateInterruptedData { - predicate_key: ChainhookInstance::stacks_key(&data.chainhook.uuid), - error: format!("Unable to evaluate predicate on Bitcoin chainstate: {}", e) - })); + let _ = tx.send(ObserverEvent::PredicateInterrupted( + PredicateInterruptedData { + predicate_key: ChainhookInstance::stacks_key( + &data.chainhook.uuid, + ), + error: error, + }, + )); } } }; @@ -1829,6 +1865,27 @@ pub async fn start_observer_commands_handler( prometheus_monitoring.stx_metrics_block_evaluated(new_tip); + // Expire predicates based on block height + match &chain_event { + StacksChainEvent::ChainUpdatedWithBlocks(update) => { + for confirmed_block in &update.confirmed_blocks { + predicates_database_access.expire_stacks_predicates_for_block( + confirmed_block.block_identifier.index, + &ctx, + )?; + } + } + StacksChainEvent::ChainUpdatedWithReorg(update) => { + for confirmed_block in &update.confirmed_blocks { + predicates_database_access.expire_stacks_predicates_for_block( + confirmed_block.block_identifier.index, + &ctx, + )?; + } + } + _ => {} + } + if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::StacksChainEvent((chain_event, report))); } else if let Some(ref flag) = stacks_block_processing_flag { @@ -1854,45 +1911,30 @@ pub async fn start_observer_commands_handler( ObserverCommand::RegisterPredicate(spec) => { ctx.try_log(|logger| slog::info!(logger, "Handling RegisterPredicate command")); - let mut spec = - match chainhook_store.register_instance_from_network_map(networks, spec) { - Ok(spec) => spec, - Err(e) => { - ctx.try_log(|logger| { - slog::warn!( - logger, - "Unable to register new chainhook spec: {}", - e.to_string() - ) - }); - continue; - } - }; - - match spec { - ChainhookInstance::Bitcoin(_) => { - prometheus_monitoring.btc_metrics_register_predicate() + // Convert network map to instance + let spec = match spec { + ChainhookSpecificationNetworkMap::Stacks(hook) => { + let spec = hook.into_specification_for_network(networks.1)?; + prometheus_monitoring.stx_metrics_register_predicate(); + ChainhookInstance::Stacks(spec) } - ChainhookInstance::Stacks(_) => { - prometheus_monitoring.stx_metrics_register_predicate() + ChainhookSpecificationNetworkMap::Bitcoin(hook) => { + let spec = hook.into_specification_for_network(networks.0)?; + prometheus_monitoring.btc_metrics_register_predicate(); + ChainhookInstance::Bitcoin(spec) } }; - ctx.try_log( |logger| slog::debug!(logger, "Registering chainhook {}", spec.uuid(),), ); + predicates_database_access.insert_predicate(spec.clone(), &ctx)?; if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateRegistered(spec.clone())); - } else { - ctx.try_log(|logger| { - slog::debug!(logger, "Enabling Predicate {}", spec.uuid()) - }); - chainhook_store.enable_instance(&mut spec); } } - ObserverCommand::EnablePredicate(mut spec) => { + ObserverCommand::EnablePredicate(spec) => { ctx.try_log(|logger| slog::info!(logger, "Enabling Predicate {}", spec.uuid())); - chainhook_store.enable_instance(&mut spec); + predicates_database_access.enable_predicate(spec.clone(), &ctx)?; if let Some(ref tx) = observer_events_tx { let _ = tx.send(ObserverEvent::PredicateEnabled(spec)); } @@ -1901,14 +1943,9 @@ pub async fn start_observer_commands_handler( ctx.try_log(|logger| { slog::info!(logger, "Handling DeregisterStacksPredicate command") }); - let hook = chainhook_store.deregister_stacks_hook(hook_uuid.clone()); + predicates_database_access.delete_predicate(&hook_uuid, &ctx)?; + prometheus_monitoring.stx_metrics_deregister_predicate(); - if hook.is_some() { - // on startup, only the predicates in the `chainhook_store` are added to the monitoring count, - // so only those that we find in the store should be removed - prometheus_monitoring.stx_metrics_deregister_predicate(); - }; - // event if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis if let Some(tx) = &observer_events_tx { let _ = tx.send(ObserverEvent::PredicateDeregistered( PredicateDeregisteredEvent { @@ -1922,39 +1959,18 @@ pub async fn start_observer_commands_handler( ctx.try_log(|logger| { slog::info!(logger, "Handling DeregisterBitcoinPredicate command") }); - let hook = chainhook_store.deregister_bitcoin_hook(hook_uuid.clone()); + predicates_database_access.delete_predicate(&hook_uuid, &ctx)?; + prometheus_monitoring.btc_metrics_deregister_predicate(); - if hook.is_some() { - // on startup, only the predicates in the `chainhook_store` are added to the monitoring count, - // so only those that we find in the store should be removed - prometheus_monitoring.btc_metrics_deregister_predicate(); - }; - // even if the predicate wasn't in the `chainhook_store`, propogate this event to delete from redis if let Some(tx) = &observer_events_tx { let _ = tx.send(ObserverEvent::PredicateDeregistered( PredicateDeregisteredEvent { - predicate_uuid: hook_uuid.clone(), + predicate_uuid: hook_uuid, chain: Chain::Bitcoin, }, )); }; } - ObserverCommand::ExpireStacksPredicate(HookExpirationData { - hook_uuid, - block_height, - }) => { - ctx.try_log(|logger| slog::info!(logger, "Handling ExpireStacksPredicate command")); - chainhook_store.expire_stacks_hook(hook_uuid, block_height); - } - ObserverCommand::ExpireBitcoinPredicate(HookExpirationData { - hook_uuid, - block_height, - }) => { - ctx.try_log(|logger| { - slog::info!(logger, "Handling ExpireBitcoinPredicate command") - }); - chainhook_store.expire_bitcoin_hook(hook_uuid, block_height); - } } } terminate(ingestion_shutdown, observer_events_tx, &ctx); diff --git a/components/chainhook-sdk/src/observer/tests/mod.rs b/components/chainhook-sdk/src/observer/tests/mod.rs index ffba75427..e3a636785 100644 --- a/components/chainhook-sdk/src/observer/tests/mod.rs +++ b/components/chainhook-sdk/src/observer/tests/mod.rs @@ -5,14 +5,15 @@ use crate::chainhooks::bitcoin::BitcoinPredicateType; use crate::chainhooks::bitcoin::InscriptionFeedData; use crate::chainhooks::bitcoin::OrdinalOperations; use crate::chainhooks::bitcoin::OutputPredicate; +use crate::chainhooks::database::PredicatesDatabaseAccess; use crate::chainhooks::stacks::StacksChainhookInstance; use crate::chainhooks::stacks::StacksChainhookSpecification; use crate::chainhooks::stacks::StacksChainhookSpecificationNetworkMap; use crate::chainhooks::stacks::StacksContractCallBasedPredicate; use crate::chainhooks::stacks::StacksPredicate; +use crate::chainhooks::types::PredicateStatus; use crate::chainhooks::types::{ - ChainhookInstance, ChainhookSpecificationNetworkMap, ChainhookStore, ExactMatchingRule, - HookAction, + ChainhookInstance, ChainhookSpecificationNetworkMap, ExactMatchingRule, HookAction, }; use crate::indexer::fork_scratch_pad::ForkScratchPad; use crate::indexer::tests::helpers::transactions::generate_test_tx_bitcoin_p2pkh_transfer; @@ -34,14 +35,175 @@ use chainhook_types::{ }; use hiro_system_kit; use std::collections::BTreeMap; -use std::sync::mpsc::{channel, Sender}; +use std::collections::HashMap; +use std::sync::{ + mpsc::{channel, Sender}, + Arc, Mutex, +}; use super::PredicatesConfig; use super::{ObserverEvent, DEFAULT_INGESTION_PORT}; -fn generate_test_config() -> (EventObserverConfig, ChainhookStore) { +struct TestPredicatesDatabaseAccess { + stacks: Arc>>, + bitcoin: Arc>>, +} + +impl TestPredicatesDatabaseAccess { + fn new() -> Self { + Self { + stacks: Arc::new(Mutex::new(HashMap::new())), + bitcoin: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl PredicatesDatabaseAccess for TestPredicatesDatabaseAccess { + fn insert_predicate(&self, predicate: ChainhookInstance, _ctx: &Context) -> Result<(), String> { + match predicate { + ChainhookInstance::Stacks(predicate) => { + self.stacks + .lock() + .unwrap() + .insert(predicate.uuid.clone(), (predicate, PredicateStatus::New)); + } + ChainhookInstance::Bitcoin(predicate) => { + self.bitcoin + .lock() + .unwrap() + .insert(predicate.uuid.clone(), (predicate, PredicateStatus::New)); + } + } + Ok(()) + } + + fn get_active_stacks_predicates( + &self, + _ctx: &Context, + ) -> Result, String> { + Ok(self.stacks.lock().unwrap().values().cloned().collect()) + } + + fn get_active_bitcoin_predicates( + &self, + _ctx: &Context, + ) -> Result, String> { + Ok(self.bitcoin.lock().unwrap().values().cloned().collect()) + } + + fn enable_predicate(&self, predicate: ChainhookInstance, _ctx: &Context) -> Result<(), String> { + match predicate { + ChainhookInstance::Stacks(predicate) => { + self.stacks + .lock() + .unwrap() + .insert(predicate.uuid.clone(), (predicate, PredicateStatus::New)); + } + ChainhookInstance::Bitcoin(predicate) => { + self.bitcoin + .lock() + .unwrap() + .insert(predicate.uuid.clone(), (predicate, PredicateStatus::New)); + } + } + Ok(()) + } + + fn delete_predicate(&self, uuid: &String, _ctx: &Context) -> Result<(), String> { + self.stacks.lock().unwrap().remove(uuid); + self.bitcoin.lock().unwrap().remove(uuid); + Ok(()) + } + + fn expire_stacks_predicates_for_block( + &self, + _block_height: u64, + _ctx: &Context, + ) -> Result<(), String> { + todo!() + } + + fn expire_bitcoin_predicates_for_block( + &self, + _block_height: u64, + _ctx: &Context, + ) -> Result<(), String> { + todo!() + } + + fn get_predicate( + &self, + uuid: &String, + _ctx: &Context, + ) -> Result, String> { + match self.stacks.lock().unwrap().get(uuid) { + Some(predicate) => Ok(Some(( + ChainhookInstance::Stacks(predicate.0.clone()), + predicate.1.clone(), + ))), + None => match self.bitcoin.lock().unwrap().get(uuid) { + Some(predicate) => Ok(Some(( + ChainhookInstance::Bitcoin(predicate.0.clone()), + predicate.1.clone(), + ))), + None => Ok(None), + }, + } + } + + fn interrupt_predicate( + &self, + uuid: &String, + _error: String, + _ctx: &Context, + ) -> Result<(), String> { + self.stacks.lock().unwrap().remove(uuid); + self.bitcoin.lock().unwrap().remove(uuid); + Ok(()) + } + + fn update_stacks_predicates_from_report( + &self, + _report: super::PredicateEvaluationReport, + _ctx: &Context, + ) -> Result<(), String> { + Ok(()) + } + + fn update_bitcoin_predicates_from_report( + &self, + _report: super::PredicateEvaluationReport, + _ctx: &Context, + ) -> Result<(), String> { + Ok(()) + } + + fn get_all_predicates( + &self, + _ctx: &Context, + ) -> Result, String> { + let mut all: Vec<(ChainhookInstance, PredicateStatus)> = self + .stacks + .lock() + .unwrap() + .values() + .cloned() + .map(|(predicate, status)| (ChainhookInstance::Stacks(predicate), status)) + .collect(); + all.extend( + self.bitcoin + .lock() + .unwrap() + .values() + .cloned() + .map(|(predicate, status)| (ChainhookInstance::Bitcoin(predicate), status)), + ); + Ok(all) + } +} + +fn generate_test_config() -> (EventObserverConfig, TestPredicatesDatabaseAccess) { let config: EventObserverConfig = EventObserverConfig { - registered_chainhooks: ChainhookStore::new(), predicates_config: PredicatesConfig::default(), bitcoin_rpc_proxy_enabled: false, bitcoind_rpc_username: "user".into(), @@ -55,7 +217,8 @@ fn generate_test_config() -> (EventObserverConfig, ChainhookStore) { stacks_network: StacksNetwork::Devnet, prometheus_monitoring_port: None, }; - (config, ChainhookStore::new()) + let test_db = TestPredicatesDatabaseAccess::new(); + (config, test_db) } fn stacks_chainhook_contract_call( @@ -83,7 +246,6 @@ fn stacks_chainhook_contract_call( }, ); - StacksChainhookSpecificationNetworkMap { uuid: format!("{}", id), name: format!("Chainhook {}", id), @@ -117,7 +279,6 @@ fn bitcoin_chainhook_p2pkh( }, ); - BitcoinChainhookSpecificationNetworkMap { uuid: format!("{}", id), name: format!("Chainhook {}", id), @@ -149,7 +310,6 @@ fn bitcoin_chainhook_ordinals(id: u8) -> BitcoinChainhookSpecificationNetworkMap }, ); - BitcoinChainhookSpecificationNetworkMap { uuid: format!("{}", id), name: format!("Chainhook {}", id),