Skip to content

Commit

Permalink
refactor: adjust performance metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Jan 8, 2025
1 parent b08a5c0 commit 6c8a7e5
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 41 deletions.
56 changes: 49 additions & 7 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::util;
use crate::{config::get_current_config, util};
use bytesize::ByteSize;
use memory_stats::memory_stats;
use once_cell::sync::OnceCell;
use snafu::Snafu;
use std::sync::Arc;

use tracing::info;
mod file;
mod http_cache;
mod tiny;
Expand All @@ -39,21 +42,60 @@ impl From<Error> for pingora::BError {
}
}

pub fn new_tiny_ufo_cache(size: usize) -> HttpCache {
fn new_tiny_ufo_cache(size: usize) -> HttpCache {
HttpCache {
directory: None,
cached: Arc::new(tiny::new_tiny_ufo_cache(size / PAGE_SIZE, size)),
cache: Arc::new(tiny::new_tiny_ufo_cache(size / PAGE_SIZE, size)),
}
}
pub fn new_file_cache(dir: &str) -> Result<HttpCache> {
fn new_file_cache(dir: &str) -> Result<HttpCache> {
let cache = file::new_file_cache(dir)?;
Ok(HttpCache {
directory: Some(cache.directory.clone()),
cached: Arc::new(cache),
cache: Arc::new(cache),
})
}

static CACHE_BACKEND: OnceCell<HttpCache> = OnceCell::new();
const MAX_MEMORY_SIZE: usize = 100 * 1024 * 1024;
pub fn get_cache_backend() -> Result<&'static HttpCache> {
// get global cache backend
CACHE_BACKEND.get_or_try_init(|| {
let basic_conf = &get_current_config().basic;
let mut size = if let Some(cache_max_size) = basic_conf.cache_max_size {
cache_max_size.as_u64() as usize
} else {
MAX_MEMORY_SIZE
};
let mut cache_type = "memory";
// file cache
let cache = if let Some(dir) = &basic_conf.cache_directory {
cache_type = "file";
new_file_cache(dir.as_str()).map_err(|e| Error::Invalid {
message: e.to_string(),
})?
} else {
// max memory
let max_memory = if let Some(value) = memory_stats() {
value.physical_mem / 2
} else {
ByteSize::gb(4).as_u64() as usize
};
size = size.min(max_memory);
// tiny ufo cache
new_tiny_ufo_cache(size)
};
info!(
category = "cache",
size = ByteSize::b(size as u64).to_string(),
cache_type,
"init cache backend success"
);
Ok(cache)
})
}

pub use http_cache::{new_file_storage_clear_service, HttpCache};
pub use http_cache::{new_storage_clear_service, HttpCache};

#[cfg(test)]
mod tests {
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use acme::new_lets_encrypt_service;
use cache::new_file_storage_clear_service;
use cache::new_storage_clear_service;
use certificate::{
new_certificate_validity_service,
new_self_signed_certificate_validity_service,
Expand Down Expand Up @@ -481,7 +481,7 @@ fn run() -> Result<(), Box<dyn Error>> {
new_self_signed_certificate_validity_service(),
new_performance_metrics_log_service(),
];
if let Some(task) = new_file_storage_clear_service() {
if let Some(task) = new_storage_clear_service() {
simple_tasks.push(task);
}
if let Some(compression_task) = compression_task {
Expand Down
4 changes: 2 additions & 2 deletions src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ pub use logger::Parser;
pub use server::*;
pub use server_conf::ServerConf;
pub use upstream::{
get_upstreams_processing, new_upstream_health_check_task,
try_init_upstreams, try_update_upstreams,
get_upstreams_processing_connected, new_upstream_health_check_task,
try_init_upstreams, try_update_upstreams, Upstream,
};
25 changes: 15 additions & 10 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl Server {
}

#[derive(Debug, Default)]
struct DigestDeailt {
struct DigestDetail {
connection_reused: bool,
connection_time: u64,
tcp_established: u64,
Expand All @@ -376,7 +376,7 @@ struct DigestDeailt {
}

#[inline]
fn get_digest_detail(digest: &Digest) -> DigestDeailt {
fn get_digest_detail(digest: &Digest) -> DigestDetail {
let get_established = |value: Option<&Option<TimingDigest>>| -> u64 {
value
.map(|item| {
Expand All @@ -400,15 +400,15 @@ fn get_digest_detail(digest: &Digest) -> DigestDeailt {
let connection_reused = connection_time > 100;

let Some(ssl_digest) = &digest.ssl_digest else {
return DigestDeailt {
return DigestDetail {
connection_reused,
tcp_established,
connection_time,
..Default::default()
};
};

DigestDeailt {
DigestDetail {
connection_reused,
tcp_established,
connection_time,
Expand Down Expand Up @@ -682,7 +682,12 @@ impl ProxyHttp for Server {
));
ctx.upstream_span = Some(span);
}
up.new_http_peer(session, ctx)
let peer = up.new_http_peer(session, ctx).map(|peer| {
ctx.upstream_address = peer.address().to_string();
peer
});
ctx.upstream = Some(up);
peer
} else {
None
}
Expand All @@ -705,7 +710,7 @@ impl ProxyHttp for Server {
&self,
_session: &mut Session,
reused: bool,
peer: &HttpPeer,
_peer: &HttpPeer,
#[cfg(unix)] _fd: std::os::unix::io::RawFd,
#[cfg(windows)] _sock: std::os::windows::io::RawSocket,
digest: Option<&Digest>,
Expand Down Expand Up @@ -735,7 +740,6 @@ impl ProxyHttp for Server {
}

ctx.upstream_reused = reused;
ctx.upstream_address = peer.address().to_string();
ctx.upstream_connect_time =
util::get_latency(&ctx.upstream_connect_time);
ctx.upstream_processing_time =
Expand Down Expand Up @@ -1103,9 +1107,10 @@ impl ProxyHttp for Server {
self.processing.fetch_sub(1, Ordering::Relaxed);
if let Some(location) = &ctx.location {
location.sub_processing();
if let Some(up) = get_upstream(&location.upstream) {
ctx.upstream_processing = Some(up.completed());
}
}
// get from cache does not connect to upstream
if let Some(up) = &ctx.upstream {
ctx.upstream_processing = Some(up.completed());
}
if ctx.status.is_none() {
if let Some(header) = session.response_written() {
Expand Down
27 changes: 17 additions & 10 deletions src/proxy/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,26 @@ enum SelectionLb {
// UpstreamPeerTracer tracks active connections to upstream servers
#[derive(Clone, Debug)]
struct UpstreamPeerTracer {
connected: Arc<AtomicU32>, // Number of active connections
name: String,
connected: Arc<AtomicI32>, // Number of active connections
}

impl UpstreamPeerTracer {
fn new() -> Self {
fn new(name: &str) -> Self {
Self {
connected: Arc::new(AtomicU32::new(0)),
name: name.to_string(),
connected: Arc::new(AtomicI32::new(0)),
}
}
}

impl Tracing for UpstreamPeerTracer {
fn on_connected(&self) {
debug!(name = self.name, "upstream peer connected");
self.connected.fetch_add(1, Ordering::Relaxed);
}
fn on_disconnected(&self) {
debug!(name = self.name, "upstream peer disconnected");
self.connected.fetch_sub(1, Ordering::Relaxed);
}
fn boxed_clone(&self) -> Box<dyn Tracing> {
Expand Down Expand Up @@ -371,7 +375,7 @@ impl Upstream {
};

let peer_tracer = if conf.enable_tracer.unwrap_or_default() {
Some(UpstreamPeerTracer::new())
Some(UpstreamPeerTracer::new(name))
} else {
None
};
Expand Down Expand Up @@ -460,7 +464,7 @@ impl Upstream {

/// Get the connected count of upstream
#[inline]
pub fn connected(&self) -> Option<u32> {
pub fn connected(&self) -> Option<i32> {
self.peer_tracer
.as_ref()
.map(|tracer| tracer.connected.load(Ordering::Relaxed))
Expand Down Expand Up @@ -494,12 +498,15 @@ pub fn get_upstream(name: &str) -> Option<Arc<Upstream>> {
UPSTREAM_MAP.load().get(name).cloned()
}

pub fn get_upstreams_processing() -> HashMap<String, i32> {
let mut processing = HashMap::new();
pub fn get_upstreams_processing_connected(
) -> HashMap<String, (i32, Option<i32>)> {
let mut processing_connected = HashMap::new();
UPSTREAM_MAP.load().iter().for_each(|(k, v)| {
processing.insert(k.to_string(), v.processing.load(Ordering::Relaxed));
let count = v.processing.load(Ordering::Relaxed);
let connected = v.connected();
processing_connected.insert(k.to_string(), (count, connected));
});
processing
processing_connected
}

fn new_ahash_upstreams(
Expand Down Expand Up @@ -860,7 +867,7 @@ mod tests {
}
#[test]
fn test_upstream_peer_tracer() {
let tracer = UpstreamPeerTracer::new();
let tracer = UpstreamPeerTracer::new("upstreamname");
tracer.on_connected();
assert_eq!(1, tracer.connected.load(Ordering::Relaxed));
tracer.on_disconnected();
Expand Down
36 changes: 28 additions & 8 deletions src/service/performance_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::proxy::{get_locations_processing, get_upstreams_processing};
use crate::cache::get_cache_backend;
use crate::proxy::{
get_locations_processing, get_upstreams_processing_connected,
};
use crate::service::SimpleServiceTaskFuture;
use crate::state::{get_process_system_info, get_processing_accepted};
use tracing::info;
Expand All @@ -24,29 +27,46 @@ pub fn new_performance_metrics_log_service() -> (String, SimpleServiceTaskFuture
let task: SimpleServiceTaskFuture = Box::new(move |_count: u32| {
Box::pin({
async move {
let mut cache_reading: i64 = -1;
let mut cache_writing: i64 = -1;
if let Ok(cache) = get_cache_backend() {
if let Some(stats) = cache.stats() {
cache_reading = stats.reading as i64;
cache_writing = stats.writing as i64;
}
}
let locations_processing = get_locations_processing()
.iter()
.map(|(name, count)| format!("{name}:{count}"))
.collect::<Vec<String>>()
.join(",");
let upstreams_processing = get_upstreams_processing()
.iter()
.map(|(name, count)| format!("{name}:{count}"))
.collect::<Vec<String>>()
.join(",");
.join(", ");
let mut upstreams_processing = vec![];
let mut upstreams_connected = vec![];
for (name, (processing, connected)) in
get_upstreams_processing_connected()
{
upstreams_processing.push(format!("{name}:{processing}"));
if let Some(connected) = connected {
upstreams_connected.push(format!("{name}:{connected}"));
}
}

let system_info = get_process_system_info();
let (processing, accepted) = get_processing_accepted();
info!(
category = PERFORMANCE_METRICS_LOG_SERVICE,
threads = system_info.threads,
locations_processing,
upstreams_processing,
upstreams_processing = upstreams_processing.join(", "),
upstreams_connected = upstreams_connected.join(", "),
accepted,
processing,
used_memory = system_info.memory,
fd_count = system_info.fd_count,
tcp_count = system_info.tcp_count,
tcp6_count = system_info.tcp6_count,
cache_reading,
cache_writing,
"performance metrics"
);
Ok(true)
Expand Down
6 changes: 4 additions & 2 deletions src/state/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::proxy::{Location, Upstream};
use crate::util;
use crate::util::format_duration;
use crate::{proxy::Location, util};
use ahash::AHashMap;
use bytes::{Bytes, BytesMut};
use http::StatusCode;
Expand Down Expand Up @@ -130,14 +131,15 @@ pub struct State {
pub cache_lock_time: Option<u64>,
/// Maximum time-to-live for cache entries
pub cache_max_ttl: Option<Duration>,
pub upstream: Option<Arc<Upstream>>,
/// Indicates if the upstream connection is being reused
pub upstream_reused: bool,
/// Number of requests being processed by upstream
pub upstream_processing: Option<i32>,
/// Time taken to establish/reuse upstream connection (in milliseconds)
pub upstream_connect_time: Option<u64>,
/// Current number of active upstream connections
pub upstream_connected: Option<u32>,
pub upstream_connected: Option<i32>,
/// Time taken for TCP connection to upstream (in milliseconds)
pub upstream_tcp_connect_time: Option<u64>,
/// Time taken for TLS handshake with upstream (in milliseconds)
Expand Down

0 comments on commit 6c8a7e5

Please sign in to comment.