Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 8d878f8

Browse files
authored
prom metrics (#760)
* wip * add prometheus metrics probes * expose metrics admin endpoint * move metric route to /metrics * use histogram macro
1 parent 146df90 commit 8d878f8

File tree

9 files changed

+537
-336
lines changed

9 files changed

+537
-336
lines changed

Cargo.lock

Lines changed: 406 additions & 319 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqld/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ rustls-pemfile = "1.0.3"
7171
rustls = "0.21.7"
7272
async-stream = "0.3.5"
7373
libsql = { git = "https://github.com/tursodatabase/libsql.git", rev = "bea8863", optional = true }
74+
metrics = "0.21.1"
75+
metrics-exporter-prometheus = "0.12.1"
7476

7577
[dev-dependencies]
7678
proptest = "1.0.0"

sqld/src/connection/libsql.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
33
use std::sync::atomic::{AtomicBool, Ordering};
44
use std::sync::Arc;
55

6+
use metrics::histogram;
67
use parking_lot::{Mutex, RwLock};
78
use rusqlite::{DatabaseName, ErrorCode, OpenFlags, StatementStatus};
89
use sqld_libsql_bindings::wal_hook::{TransparentMethods, WalMethodsHook};
@@ -12,6 +13,7 @@ use tokio::time::{Duration, Instant};
1213
use crate::auth::{Authenticated, Authorized, Permission};
1314
use crate::error::Error;
1415
use crate::libsql_bindings::wal_hook::WalHook;
16+
use crate::metrics::{READ_QUERY_COUNT, WRITE_QUERY_COUNT};
1517
use crate::query::Query;
1618
use crate::query_analysis::{State, StmtKind};
1719
use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder};
@@ -247,21 +249,34 @@ struct TxnSlot<T: WalHook> {
247249
/// is stolen.
248250
conn: Arc<Mutex<Connection<T>>>,
249251
/// Time at which the transaction can be stolen
250-
timeout_at: tokio::time::Instant,
252+
created_at: tokio::time::Instant,
251253
/// The transaction lock was stolen
252254
is_stolen: AtomicBool,
253255
}
254256

257+
impl<T: WalHook> TxnSlot<T> {
258+
#[inline]
259+
fn expires_at(&self) -> Instant {
260+
self.created_at + TXN_TIMEOUT
261+
}
262+
263+
fn abort(&self) {
264+
let conn = self.conn.lock();
265+
// we have a lock on the connection, we don't need mode than a
266+
// Relaxed store.
267+
conn.rollback();
268+
histogram!("write_txn_duration", self.created_at.elapsed())
269+
// WRITE_TXN_DURATION.record(self.created_at.elapsed());
270+
}
271+
}
272+
255273
impl<T: WalHook> std::fmt::Debug for TxnSlot<T> {
256274
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257275
let stolen = self.is_stolen.load(Ordering::Relaxed);
258-
let time_left = self
259-
.timeout_at
260-
.duration_since(tokio::time::Instant::now())
261-
.as_millis();
276+
let time_left = self.expires_at().duration_since(Instant::now());
262277
write!(
263278
f,
264-
"(conn: {:?}, timeout_ms: {time_left}, stolen: {stolen})",
279+
"(conn: {:?}, timeout: {time_left:?}, stolen: {stolen})",
265280
self.conn
266281
)
267282
}
@@ -315,7 +330,7 @@ unsafe extern "C" fn busy_handler<W: WalHook>(state: *mut c_void, _retries: c_in
315330
tokio::runtime::Handle::current().block_on(async move {
316331
let timeout = {
317332
let slot = lock.as_ref().unwrap();
318-
let timeout_at = slot.timeout_at;
333+
let timeout_at = slot.expires_at();
319334
drop(lock);
320335
tokio::time::sleep_until(timeout_at)
321336
};
@@ -337,11 +352,7 @@ unsafe extern "C" fn busy_handler<W: WalHook>(state: *mut c_void, _retries: c_in
337352
// steal.
338353
assert!(lock.take().is_some());
339354

340-
let conn = slot.conn.lock();
341-
// we have a lock on the connection, we don't need mode than a
342-
// Relaxed store.
343-
conn.rollback();
344-
355+
slot.abort();
345356
tracing::info!("stole transaction lock");
346357
}
347358
}
@@ -422,7 +433,7 @@ impl<W: WalHook> Connection<W> {
422433
let mut lock = this.lock();
423434

424435
if let Some(slot) = &lock.slot {
425-
if slot.is_stolen.load(Ordering::Relaxed) || Instant::now() > slot.timeout_at {
436+
if slot.is_stolen.load(Ordering::Relaxed) || Instant::now() > slot.expires_at() {
426437
// we mark ourselves as stolen to notify any waiting lock thief.
427438
slot.is_stolen.store(true, Ordering::Relaxed);
428439
lock.rollback();
@@ -447,7 +458,7 @@ impl<W: WalHook> Connection<W> {
447458
(Tx::None | Tx::Read, Tx::Write) => {
448459
let slot = Arc::new(TxnSlot {
449460
conn: this.clone(),
450-
timeout_at: Instant::now() + TXN_TIMEOUT,
461+
created_at: Instant::now(),
451462
is_stolen: AtomicBool::new(false),
452463
});
453464

@@ -542,6 +553,11 @@ impl<W: WalHook> Connection<W> {
542553
}
543554

544555
let mut stmt = self.conn.prepare(&query.stmt.stmt)?;
556+
if stmt.readonly() {
557+
READ_QUERY_COUNT.increment(1);
558+
} else {
559+
WRITE_QUERY_COUNT.increment(1);
560+
}
545561

546562
let cols = stmt.columns();
547563
let cols_count = cols.len();

sqld/src/connection/mod.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
use metrics::histogram;
12
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
23
use std::sync::Arc;
3-
use tokio::time::Duration;
4+
use tokio::time::{Duration, Instant};
45

56
use futures::Future;
67
use tokio::{sync::Semaphore, time::timeout};
78

89
use crate::auth::Authenticated;
910
use crate::error::Error;
11+
use crate::metrics::CONCCURENT_CONNECTIONS_COUNT;
1012
use crate::query::{Params, Query};
1113
use crate::query_analysis::{State, Statement};
1214
use crate::query_result_builder::{IgnoreResult, QueryResultBuilder};
@@ -249,6 +251,7 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
249251
type Connection = TrackedConnection<F::Connection>;
250252

251253
async fn create(&self) -> Result<Self::Connection, Error> {
254+
let before_create = Instant::now();
252255
// If the memory pressure is high, request more units to reduce concurrency.
253256
tracing::trace!(
254257
"Available semaphore units: {}",
@@ -279,10 +282,16 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
279282
}
280283

281284
let inner = self.connection_maker.create().await?;
285+
286+
CONCCURENT_CONNECTIONS_COUNT.increment(1.0);
287+
// CONNECTION_CREATE_TIME.record(before_create.elapsed());
288+
histogram!("connection_create_time", before_create.elapsed());
289+
282290
Ok(TrackedConnection {
283291
permit,
284292
inner,
285293
atime: AtomicU64::new(now_millis()),
294+
created_at: Instant::now(),
286295
})
287296
}
288297
}
@@ -293,6 +302,15 @@ pub struct TrackedConnection<DB> {
293302
#[allow(dead_code)] // just hold on to it
294303
permit: tokio::sync::OwnedSemaphorePermit,
295304
atime: AtomicU64,
305+
created_at: Instant,
306+
}
307+
308+
impl<T> Drop for TrackedConnection<T> {
309+
fn drop(&mut self) {
310+
CONCCURENT_CONNECTIONS_COUNT.decrement(1.0);
311+
histogram!("connection_create_time", self.created_at.elapsed());
312+
// CONNECTION_ALIVE_DURATION.record();
313+
}
296314
}
297315

298316
impl<DB: Connection> TrackedConnection<DB> {

sqld/src/connection/write_proxy.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use uuid::Uuid;
1212

1313
use crate::auth::Authenticated;
1414
use crate::error::Error;
15+
use crate::metrics::REQUESTS_PROXIED;
1516
use crate::namespace::NamespaceName;
1617
use crate::query::Value;
1718
use crate::query_analysis::State;
@@ -197,6 +198,8 @@ impl WriteProxyConnection {
197198
auth: Authenticated,
198199
builder: B,
199200
) -> Result<(B, State)> {
201+
REQUESTS_PROXIED.increment(1);
202+
200203
self.stats.inc_write_requests_delegated();
201204
let mut client = self.write_proxy.clone();
202205

sqld/src/http/admin/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use anyhow::Context as _;
2-
use axum::extract::{Path, State};
2+
use axum::extract::{FromRef, Path, State};
33
use axum::routing::delete;
44
use axum::Json;
55
use chrono::NaiveDateTime;
66
use futures::TryStreamExt;
77
use hyper::Body;
8+
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
89
use serde::{Deserialize, Serialize};
910
use std::io::ErrorKind;
1011
use std::path::PathBuf;
@@ -24,10 +25,24 @@ pub mod stats;
2425
type UserHttpServer<M> =
2526
Arc<hrana::http::Server<<<M as MakeNamespace>::Database as Database>::Connection>>;
2627

28+
#[derive(Clone)]
29+
struct Metrics {
30+
handle: PrometheusHandle,
31+
}
32+
2733
struct AppState<M: MakeNamespace, C> {
2834
namespaces: NamespaceStore<M>,
2935
user_http_server: UserHttpServer<M>,
3036
connector: C,
37+
metrics: PrometheusHandle,
38+
}
39+
40+
impl<M: MakeNamespace, C> FromRef<Arc<AppState<M, C>>> for Metrics {
41+
fn from_ref(input: &Arc<AppState<M, C>>) -> Self {
42+
Metrics {
43+
handle: input.metrics.clone(),
44+
}
45+
}
3146
}
3247

3348
pub async fn run<M, A, C>(
@@ -59,10 +74,12 @@ where
5974
.route("/v1/namespaces/:namespace", delete(handle_delete_namespace))
6075
.route("/v1/namespaces/:namespace/stats", get(stats::handle_stats))
6176
.route("/v1/diagnostics", get(handle_diagnostics))
77+
.route("/metrics", get(handle_metrics))
6278
.with_state(Arc::new(AppState {
6379
namespaces,
6480
connector,
6581
user_http_server,
82+
metrics: PrometheusBuilder::new().install_recorder().unwrap(),
6683
}));
6784

6885
hyper::server::Server::builder(acceptor)
@@ -76,6 +93,10 @@ async fn handle_get_index() -> &'static str {
7693
"Welcome to the sqld admin API"
7794
}
7895

96+
async fn handle_metrics(State(metrics): State<Metrics>) -> String {
97+
metrics.handle.render()
98+
}
99+
79100
async fn handle_get_config<M: MakeNamespace, C: Connector>(
80101
State(app_state): State<Arc<AppState<M, C>>>,
81102
Path(namespace): Path<String>,

sqld/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ mod h2c;
5555
mod heartbeat;
5656
mod hrana;
5757
mod http;
58+
mod metrics;
5859
mod migration;
5960
mod namespace;
6061
mod query;

sqld/src/metrics.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#![allow(dead_code)]
2+
use metrics::{
3+
describe_counter, describe_gauge, describe_histogram, register_counter, register_gauge,
4+
register_histogram, Counter, Gauge, Histogram,
5+
};
6+
use once_cell::sync::Lazy;
7+
8+
pub static WRITE_QUERY_COUNT: Lazy<Counter> = Lazy::new(|| {
9+
const NAME: &str = "writes_count";
10+
describe_counter!(NAME, "number of write statements");
11+
register_counter!(NAME)
12+
});
13+
pub static READ_QUERY_COUNT: Lazy<Counter> = Lazy::new(|| {
14+
const NAME: &str = "read_count";
15+
describe_counter!(NAME, "number of read statements");
16+
register_counter!(NAME)
17+
});
18+
pub static REQUESTS_PROXIED: Lazy<Counter> = Lazy::new(|| {
19+
const NAME: &str = "requests_proxied";
20+
describe_counter!(NAME, "number of proxied requests");
21+
register_counter!(NAME)
22+
});
23+
pub static CONCCURENT_CONNECTIONS_COUNT: Lazy<Gauge> = Lazy::new(|| {
24+
const NAME: &str = "conccurent_connections";
25+
describe_gauge!(NAME, "mumber of conccurent connections");
26+
register_gauge!(NAME)
27+
});
28+
pub static NAMESPACE_LOAD_LATENCY: Lazy<Histogram> = Lazy::new(|| {
29+
const NAME: &str = "namespace_load_latency";
30+
describe_histogram!(NAME, "latency is us when loading a namespace");
31+
register_histogram!(NAME)
32+
});
33+
pub static CONNECTION_CREATE_TIME: Lazy<Histogram> = Lazy::new(|| {
34+
const NAME: &str = "connection_create_time";
35+
describe_histogram!(NAME, "time to create a connection");
36+
register_histogram!(NAME)
37+
});
38+
pub static CONNECTION_ALIVE_DURATION: Lazy<Histogram> = Lazy::new(|| {
39+
const NAME: &str = "connection_alive_duration";
40+
describe_histogram!(NAME, "duration for which a connection was kept alive");
41+
register_histogram!(NAME)
42+
});
43+
pub static WRITE_TXN_DURATION: Lazy<Histogram> = Lazy::new(|| {
44+
const NAME: &str = "write_txn_duration";
45+
describe_histogram!(NAME, "duration for which a write transaction was kept open");
46+
register_histogram!(NAME)
47+
});

sqld/src/namespace/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ use chrono::NaiveDateTime;
1212
use enclose::enclose;
1313
use futures_core::Stream;
1414
use hyper::Uri;
15+
use metrics::histogram;
1516
use parking_lot::Mutex;
1617
use rusqlite::ErrorCode;
1718
use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS;
1819
use tokio::io::AsyncBufReadExt;
1920
use tokio::sync::watch;
2021
use tokio::task::JoinSet;
21-
use tokio::time::Duration;
22+
use tokio::time::{Duration, Instant};
2223
use tokio_util::io::StreamReader;
2324
use tonic::transport::Channel;
2425
use uuid::Uuid;
@@ -433,6 +434,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
433434
where
434435
Fun: FnOnce(&Namespace<M::Database>) -> R,
435436
{
437+
let before_load = Instant::now();
436438
let lock = self.inner.store.upgradable_read().await;
437439
if let Some(ns) = lock.get(&namespace) {
438440
Ok(f(ns))
@@ -451,6 +453,10 @@ impl<M: MakeNamespace> NamespaceStore<M> {
451453
let ret = f(&ns);
452454
tracing::info!("loaded namespace: `{namespace}`");
453455
lock.insert(namespace, ns);
456+
457+
// NAMESPACE_LOAD_LATENCY.record(before_load.elapsed());
458+
histogram!("namespace_load_latency", before_load.elapsed());
459+
454460
Ok(ret)
455461
}
456462
}

0 commit comments

Comments
 (0)