Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 90c96ba

Browse files
bors[bot]MarinPostmapsarna
authored
Merge #315
315: introduce stats route r=MarinPostma a=MarinPostma This PR introduces the `GET /v1/stats` route that return the total amount of rows read/written by an instance: ``` { "rows_read_count": 0, "rows_written_count": 0 } ``` The number of rows read/written persists accross restarts, thanks to a sync loop that flushes the information to disk about every 5s. Co-authored-by: ad hoc <[email protected]> Co-authored-by: Piotr Sarna <[email protected]>
2 parents 46e4d88 + 7b55732 commit 90c96ba

File tree

9 files changed

+575
-373
lines changed

9 files changed

+575
-373
lines changed

Cargo.lock

Lines changed: 425 additions & 357 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqld-libsql-bindings/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ edition = "2021"
99
anyhow = "1.0.66"
1010
mvfs = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true }
1111
mwal = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true }
12-
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "63b7aabfccbc21738", default-features = false, features = [
12+
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
1313
"buildtime_bindgen",
1414
"bundled-libsql-wasm-experimental",
1515
"column_decltype"

sqld/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pin-project-lite = "0.2.9"
3333
postgres-protocol = "0.6.4"
3434
prost = "0.11.3"
3535
regex = "1.7.0"
36-
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "63b7aabfccbc21738", default-features = false, features = [
36+
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
3737
"buildtime_bindgen",
3838
"bundled-libsql-wasm-experimental",
3939
"column_decltype"

sqld/src/database/libsql.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ use std::str::FromStr;
33
use std::time::{Duration, Instant};
44

55
use crossbeam::channel::RecvTimeoutError;
6-
use rusqlite::OpenFlags;
6+
use rusqlite::{OpenFlags, StatementStatus};
77
use tokio::sync::oneshot;
88
use tracing::warn;
99

1010
use crate::error::Error;
1111
use crate::libsql::wal_hook::WalHook;
1212
use crate::query::{Column, Query, QueryResponse, QueryResult, ResultSet, Row};
1313
use crate::query_analysis::{State, Statement};
14+
use crate::stats::Stats;
1415
use crate::Result;
1516

1617
use super::{Cond, Database, Program, Step, TXN_TIMEOUT_SECS};
@@ -143,11 +144,13 @@ impl LibSqlDb {
143144
path: impl AsRef<Path> + Send + 'static,
144145
wal_hook: impl WalHook + Send + Clone + 'static,
145146
with_bottomless: bool,
147+
stats: Stats,
146148
) -> crate::Result<Self> {
147149
let (sender, receiver) = crossbeam::channel::unbounded::<Message>();
148150

149151
tokio::task::spawn_blocking(move || {
150-
let mut connection = Connection::new(path.as_ref(), wal_hook, with_bottomless).unwrap();
152+
let mut connection =
153+
Connection::new(path.as_ref(), wal_hook, with_bottomless, stats).unwrap();
151154
loop {
152155
let Message { pgm, resp } = match connection.state.deadline() {
153156
Some(deadline) => match receiver.recv_deadline(deadline) {
@@ -189,18 +192,21 @@ struct Connection {
189192
state: ConnectionState,
190193
conn: rusqlite::Connection,
191194
timed_out: bool,
195+
stats: Stats,
192196
}
193197

194198
impl Connection {
195199
fn new(
196200
path: &Path,
197201
wal_hook: impl WalHook + Send + Clone + 'static,
198202
with_bottomless: bool,
203+
stats: Stats,
199204
) -> anyhow::Result<Self> {
200205
Ok(Self {
201206
conn: open_db(path, wal_hook, with_bottomless)?,
202207
state: ConnectionState::initial(),
203208
timed_out: false,
209+
stats,
204210
})
205211
}
206212

@@ -245,8 +251,8 @@ impl Connection {
245251

246252
fn execute_query_inner(&self, query: &Query) -> QueryResult {
247253
let mut rows = vec![];
248-
let mut prepared = self.conn.prepare(&query.stmt.stmt)?;
249-
let columns = prepared
254+
let mut stmt = self.conn.prepare(&query.stmt.stmt)?;
255+
let columns = stmt
250256
.columns()
251257
.iter()
252258
.map(|col| Column {
@@ -262,10 +268,10 @@ impl Connection {
262268

263269
query
264270
.params
265-
.bind(&mut prepared)
271+
.bind(&mut stmt)
266272
.map_err(Error::LibSqlInvalidQueryParams)?;
267273

268-
let mut qresult = prepared.raw_query();
274+
let mut qresult = stmt.raw_query();
269275
while let Some(row) = qresult.next()? {
270276
let mut values = vec![];
271277
for (i, _) in columns.iter().enumerate() {
@@ -288,6 +294,10 @@ impl Connection {
288294
false => None,
289295
};
290296

297+
drop(qresult);
298+
299+
self.update_stats(&stmt);
300+
291301
Ok(QueryResponse::ResultSet(ResultSet {
292302
columns,
293303
rows,
@@ -302,6 +312,13 @@ impl Connection {
302312
.execute("rollback transaction;", ())
303313
.expect("failed to rollback");
304314
}
315+
316+
fn update_stats(&self, stmt: &rusqlite::Statement) {
317+
self.stats
318+
.inc_rows_read(stmt.get_status(StatementStatus::RowsRead) as usize);
319+
self.stats
320+
.inc_rows_written(stmt.get_status(StatementStatus::RowsWritten) as usize);
321+
}
305322
}
306323

307324
fn eval_cond(cond: &Cond, results: &[Option<QueryResult>]) -> Result<bool> {

sqld/src/database/write_proxy.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
1616
use crate::rpc::proxy::rpc::query_result::RowResult;
1717
use crate::rpc::proxy::rpc::DisconnectMessage;
1818
use crate::rpc::replication_log::rpc::replication_log_client::ReplicationLogClient;
19+
use crate::stats::Stats;
1920
use crate::Result;
2021

2122
use super::Program;
@@ -25,6 +26,7 @@ use super::{libsql::LibSqlDb, service::DbFactory, Database};
2526
pub struct WriteProxyDbFactory {
2627
write_proxy: ProxyClient<Channel>,
2728
db_path: PathBuf,
29+
stats: Stats,
2830
/// abort handle: abort db update loop on drop
2931
_abort_handle: crossbeam::channel::Sender<()>,
3032
}
@@ -37,6 +39,7 @@ impl WriteProxyDbFactory {
3739
key_path: Option<PathBuf>,
3840
ca_cert_path: Option<PathBuf>,
3941
db_path: PathBuf,
42+
stats: Stats,
4043
) -> anyhow::Result<(Self, JoinHandle<anyhow::Result<()>>)> {
4144
let mut endpoint = Channel::from_shared(addr.to_string())?;
4245
if tls {
@@ -84,6 +87,7 @@ impl WriteProxyDbFactory {
8487
let this = Self {
8588
write_proxy,
8689
db_path,
90+
stats,
8791
_abort_handle,
8892
};
8993
Ok((this, handle))
@@ -93,7 +97,11 @@ impl WriteProxyDbFactory {
9397
#[async_trait::async_trait]
9498
impl DbFactory for WriteProxyDbFactory {
9599
async fn create(&self) -> Result<Arc<dyn Database>> {
96-
let db = WriteProxyDatabase::new(self.write_proxy.clone(), self.db_path.clone())?;
100+
let db = WriteProxyDatabase::new(
101+
self.write_proxy.clone(),
102+
self.db_path.clone(),
103+
self.stats.clone(),
104+
)?;
97105
Ok(Arc::new(db))
98106
}
99107
}
@@ -106,8 +114,8 @@ pub struct WriteProxyDatabase {
106114
}
107115

108116
impl WriteProxyDatabase {
109-
fn new(write_proxy: ProxyClient<Channel>, path: PathBuf) -> Result<Self> {
110-
let read_db = LibSqlDb::new(path, (), false)?;
117+
fn new(write_proxy: ProxyClient<Channel>, path: PathBuf, stats: Stats) -> Result<Self> {
118+
let read_db = LibSqlDb::new(path, (), false, stats)?;
111119
Ok(Self {
112120
read_db,
113121
write_proxy,

sqld/src/http/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod hrana_over_http;
2+
mod stats;
23
mod types;
34

45
use std::future::poll_fn;
@@ -29,6 +30,7 @@ use crate::hrana;
2930
use crate::http::types::HttpQuery;
3031
use crate::query::{self, Query, QueryResult, ResultSet};
3132
use crate::query_analysis::{predict_final_state, State, Statement};
33+
use crate::stats::Stats;
3234
use crate::utils::services::idle_shutdown::IdleShutdownLayer;
3335

3436
use self::types::QueryObject;
@@ -236,6 +238,7 @@ async fn handle_request(
236238
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
237239
db_factory: Arc<dyn DbFactory>,
238240
enable_console: bool,
241+
stats: Stats,
239242
) -> anyhow::Result<Response<Body>> {
240243
if hyper_tungstenite::is_upgrade_request(&req) {
241244
return Ok(handle_upgrade(&upgrade_tx, req).await);
@@ -257,6 +260,7 @@ async fn handle_request(
257260
(&Method::GET, "/v1") => hrana_over_http::handle_index(req).await,
258261
(&Method::POST, "/v1/execute") => hrana_over_http::handle_execute(db_factory, req).await,
259262
(&Method::POST, "/v1/batch") => hrana_over_http::handle_batch(db_factory, req).await,
263+
(&Method::GET, "/v1/stats") => Ok(stats::handle_stats(&stats)),
260264
_ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()),
261265
}
262266
}
@@ -266,6 +270,8 @@ fn handle_version() -> Response<Body> {
266270
Response::new(Body::from(version.as_bytes()))
267271
}
268272

273+
// TODO: refactor
274+
#[allow(clippy::too_many_arguments)]
269275
pub async fn run_http<F>(
270276
addr: SocketAddr,
271277
auth: Arc<Auth>,
@@ -274,6 +280,7 @@ pub async fn run_http<F>(
274280
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
275281
enable_console: bool,
276282
idle_shutdown_layer: Option<IdleShutdownLayer>,
283+
stats: Stats,
277284
) -> anyhow::Result<()>
278285
where
279286
F: MakeService<(), Vec<Query>> + Send + 'static,
@@ -317,6 +324,7 @@ where
317324
upgrade_tx.clone(),
318325
db_factory.clone(),
319326
enable_console,
327+
stats.clone(),
320328
)
321329
});
322330

sqld/src/http/stats.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use hyper::{Body, Response};
2+
use serde::Serialize;
3+
4+
use crate::stats::Stats;
5+
6+
#[derive(Serialize)]
7+
struct StatsResponse {
8+
rows_read_count: usize,
9+
rows_written_count: usize,
10+
}
11+
12+
pub fn handle_stats(stats: &Stats) -> Response<Body> {
13+
let resp = StatsResponse {
14+
rows_read_count: stats.rows_read(),
15+
rows_written_count: stats.rows_written(),
16+
};
17+
18+
let payload = serde_json::to_vec(&resp).unwrap();
19+
Response::builder()
20+
.header("Content-Type", "application/json")
21+
.body(Body::from(payload))
22+
.unwrap()
23+
}

sqld/src/lib.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::auth::Auth;
2525
use crate::error::Error;
2626
use crate::postgres::service::PgConnectionFactory;
2727
use crate::server::Server;
28+
use crate::stats::Stats;
2829

2930
pub use sqld_libsql_bindings as libsql;
3031

@@ -39,6 +40,7 @@ mod query_analysis;
3940
mod replication;
4041
pub mod rpc;
4142
mod server;
43+
mod stats;
4244
mod utils;
4345

4446
#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
@@ -97,6 +99,7 @@ async fn run_service(
9799
config: &Config,
98100
join_set: &mut JoinSet<anyhow::Result<()>>,
99101
idle_shutdown_layer: Option<IdleShutdownLayer>,
102+
stats: Stats,
100103
) -> anyhow::Result<()> {
101104
let auth = get_auth(config)?;
102105

@@ -122,6 +125,7 @@ async fn run_service(
122125
upgrade_tx,
123126
config.enable_http_console,
124127
idle_shutdown_layer.clone(),
128+
stats,
125129
));
126130
}
127131

@@ -183,6 +187,7 @@ async fn start_replica(
183187
join_set: &mut JoinSet<anyhow::Result<()>>,
184188
addr: &str,
185189
idle_shutdown_layer: Option<IdleShutdownLayer>,
190+
stats: Stats,
186191
) -> anyhow::Result<()> {
187192
let (factory, handle) = WriteProxyDbFactory::new(
188193
addr,
@@ -191,6 +196,7 @@ async fn start_replica(
191196
config.writer_rpc_key.clone(),
192197
config.writer_rpc_ca_cert.clone(),
193198
config.db_path.clone(),
199+
stats.clone(),
194200
)
195201
.await
196202
.context("failed to start WriteProxy DB")?;
@@ -201,7 +207,7 @@ async fn start_replica(
201207
join_set.spawn(async move { handle.await.expect("WriteProxy DB task failed") });
202208

203209
let service = DbFactoryService::new(Arc::new(factory));
204-
run_service(service, config, join_set, idle_shutdown_layer).await?;
210+
run_service(service, config, join_set, idle_shutdown_layer, stats).await?;
205211

206212
Ok(())
207213
}
@@ -214,6 +220,7 @@ async fn start_primary(
214220
config: &Config,
215221
join_set: &mut JoinSet<anyhow::Result<()>>,
216222
idle_shutdown_layer: Option<IdleShutdownLayer>,
223+
stats: Stats,
217224
) -> anyhow::Result<()> {
218225
let is_fresh_db = check_fresh_db(&config.db_path);
219226
let logger = Arc::new(ReplicationLogger::open(
@@ -237,10 +244,12 @@ async fn start_primary(
237244
dump_loader.load_dump(path.into()).await?;
238245
}
239246

247+
let stats_clone = stats.clone();
240248
let db_factory = Arc::new(move || {
241249
let db_path = path_clone.clone();
242250
let hook = hook.clone();
243-
async move { LibSqlDb::new(db_path, hook, enable_bottomless) }
251+
let stats_clone = stats_clone.clone();
252+
async move { LibSqlDb::new(db_path, hook, enable_bottomless, stats_clone) }
244253
});
245254
let service = DbFactoryService::new(db_factory.clone());
246255
if let Some(ref addr) = config.rpc_server_addr {
@@ -256,7 +265,7 @@ async fn start_primary(
256265
));
257266
}
258267

259-
run_service(service, config, join_set, idle_shutdown_layer).await?;
268+
run_service(service, config, join_set, idle_shutdown_layer, stats).await?;
260269

261270
Ok(())
262271
}
@@ -309,11 +318,13 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
309318
.idle_shutdown_timeout
310319
.map(|d| IdleShutdownLayer::new(d, shutdown_notify.clone()));
311320

321+
let stats = Stats::new(&config.db_path)?;
322+
312323
match config.writer_rpc_addr {
313324
Some(ref addr) => {
314-
start_replica(&config, &mut join_set, addr, idle_shutdown_layer).await?
325+
start_replica(&config, &mut join_set, addr, idle_shutdown_layer, stats).await?
315326
}
316-
None => start_primary(&config, &mut join_set, idle_shutdown_layer).await?,
327+
None => start_primary(&config, &mut join_set, idle_shutdown_layer, stats).await?,
317328
}
318329

319330
let reset = HARD_RESET.clone();

0 commit comments

Comments
 (0)