From 18e6fe333e871c521e30bc1fd9ba10089382df08 Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Fri, 31 Jan 2025 11:52:08 -0500 Subject: [PATCH] handle JSON RPC cursor breaking change --- crates/sui-rpc-benchmark/Cargo.toml | 2 +- .../src/bin/process_jsonl.rs | 248 ++++++++++++++++++ .../src/bin/pull_grafana_logs.rs | 68 +++-- 3 files changed, 301 insertions(+), 17 deletions(-) create mode 100644 crates/sui-rpc-benchmark/src/bin/process_jsonl.rs diff --git a/crates/sui-rpc-benchmark/Cargo.toml b/crates/sui-rpc-benchmark/Cargo.toml index c21ed26bdb10e9..2d73b6a0776214 100644 --- a/crates/sui-rpc-benchmark/Cargo.toml +++ b/crates/sui-rpc-benchmark/Cargo.toml @@ -20,9 +20,9 @@ sui-indexer-alt-framework.workspace = true telemetry-subscribers.workspace = true tracing.workspace = true tokio = { workspace = true, features = ["full"] } -tokio-postgres = "0.7.12" bb8 = "0.9.0" bb8-postgres = "0.9.0" +tokio-postgres = "0.7.12" [[bin]] name = "sui-rpc-benchmark" diff --git a/crates/sui-rpc-benchmark/src/bin/process_jsonl.rs b/crates/sui-rpc-benchmark/src/bin/process_jsonl.rs new file mode 100644 index 00000000000000..74b3f1aaa0f689 --- /dev/null +++ b/crates/sui-rpc-benchmark/src/bin/process_jsonl.rs @@ -0,0 +1,248 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +/// This script processes JSONL logs of timestamps, hosts, methods and bodies. +/// It filters specific RPC calls within a sliding time window and writes +/// aggregated results (with iteration data) to an output JSONL file. +/// This is useful for methods of suix_getOwnedObjects, suix_getAllCoins, suix_getCoins, +/// b/c the `cursor` has changed to encode more information, thus old cursors are not valid +/// on the `alt` stack. +/// +/// Usage Example: +/// cargo run --bin process_jsonl -- \ +/// --input /path/to/input.jsonl \ +/// --output /path/to/output.jsonl \ +/// --window_minutes 60 +use chrono::{DateTime, Duration, Utc}; +use clap::Parser; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use std::error::Error; +use std::fs::File; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::PathBuf; +use tracing::{debug, info}; + +const METHODS_TO_AGGREGATE: [&str; 3] = + ["suix_getOwnedObjects", "suix_getAllCoins", "suix_getCoins"]; +const WALLET_HOST_PREFIX: &str = "wallet"; + +#[derive(Debug, Serialize, Deserialize)] +struct LogEntry { + timestamp: String, + host: String, + method: String, + body: Value, +} + +#[derive(Debug, Serialize)] +struct OutputLogEntry { + timestamp: String, + host: String, + method: String, + body: Value, + iteration: usize, +} + +/// Process JSONL files with sliding window aggregation +#[derive(Parser, Debug)] +#[command(name = "process_jsonl")] +struct ProcessJsonlOpt { + /// Input JSONL file path + #[arg(value_parser)] + input: PathBuf, + + /// Output JSONL file path + #[arg(value_parser)] + output: PathBuf, + + /// Window size in minutes + #[arg(long, default_value = "60")] + window_minutes: i64, +} + +fn parse_timestamp(ts: &str) -> Result, Box> { + Ok(DateTime::parse_from_rfc3339(ts)?.with_timezone(&Utc)) +} + +fn generate_grouping_key(method: &str, body: &Value) -> Result> { + let params = body + .get("params") + .and_then(|p| p.as_array()) + .ok_or_else(|| { + format!( + "Invalid request body: missing params array for method {}", + method + ) + })?; + + match method { + "suix_getOwnedObjects" => { + let address = params + .first() + .and_then(|a| a.as_str()) + .ok_or_else(|| "Missing or invalid address parameter for getOwnedObjects")?; + let query = params + .get(1) + .and_then(|q| q.as_object()) + .map(|q| serde_json::to_string(q)) + .transpose()? + .unwrap_or_else(|| "null".to_string()); + Ok(format!("{}:{}:{}", method, address, query)) + } + "suix_getCoins" => { + let owner = params + .first() + .and_then(|a| a.as_str()) + .ok_or_else(|| "Missing or invalid owner parameter for getCoins")?; + let coin_type = params + .get(1) + .and_then(|c| c.as_str()) + .unwrap_or("0x2::sui::SUI"); + Ok(format!("{}:{}:{}", method, owner, coin_type)) + } + "suix_getAllCoins" => { + let owner = params + .first() + .and_then(|a| a.as_str()) + .ok_or_else(|| "Missing or invalid owner parameter for getAllCoins")?; + Ok(format!("{}:{}", method, owner)) + } + _ => Ok(method.to_string()), + } +} + +struct PendingWindow { + entries: Vec, + window_end: DateTime, +} + +struct StreamProcessor { + method_windows: HashMap, + window_duration: Duration, + writer: BufWriter, +} + +impl StreamProcessor { + fn new(output_file: File, window_minutes: i64) -> Self { + Self { + method_windows: HashMap::new(), + window_duration: Duration::minutes(window_minutes), + writer: BufWriter::new(output_file), + } + } + + fn process_entry(&mut self, entry: LogEntry) -> Result<(), Box> { + if !entry.host.starts_with(WALLET_HOST_PREFIX) { + return Ok(()); + } + + if !METHODS_TO_AGGREGATE.contains(&entry.method.as_str()) { + return self.write_entry(OutputLogEntry { + timestamp: entry.timestamp, + host: entry.host, + method: entry.method, + body: entry.body, + iteration: 1, + }); + } + + let ts = parse_timestamp(&entry.timestamp)?; + self.flush_completed_windows(ts)?; + + let key = generate_grouping_key(&entry.method, &entry.body) + .map_err(|e| format!("Failed to generate key: {}", e))?; + + if let Some(window) = self.method_windows.get_mut(&key) { + window.entries.push(entry); + } else { + self.method_windows.insert( + key, + PendingWindow { + entries: vec![entry], + window_end: ts + self.window_duration, + }, + ); + } + Ok(()) + } + + fn write_entries_from_window(&mut self, window: PendingWindow) -> Result<(), Box> { + let base_entry = window + .entries + .first() + .ok_or_else(|| "Attempted to write from empty window".to_string())?; + + self.write_entry(OutputLogEntry { + timestamp: base_entry.timestamp.clone(), + host: base_entry.host.clone(), + method: base_entry.method.clone(), + body: base_entry.body.clone(), + iteration: window.entries.len(), + }) + } + + fn write_entry(&mut self, entry: OutputLogEntry) -> Result<(), Box> { + if entry.iteration > 1 { + debug!( + method = entry.method, + iteration = entry.iteration, + "Aggregated entries" + ); + } + serde_json::to_writer(&mut self.writer, &entry)?; + self.writer.write_all(b"\n")?; + Ok(()) + } + + fn finish(mut self) -> Result<(), Box> { + let windows: Vec<_> = self.method_windows.drain().map(|(_, w)| w).collect(); + for window in windows { + self.write_entries_from_window(window)?; + } + self.writer.flush()?; + Ok(()) + } + + fn flush_completed_windows(&mut self, current_ts: DateTime) -> Result<(), Box> { + let mut completed_keys = Vec::new(); + for (key, window) in &self.method_windows { + if current_ts > window.window_end { + completed_keys.push(key.clone()); + } + } + + for key in completed_keys { + debug!("Flushing completed windows"); + info!("Flushing completed windows"); + if let Some(window) = self.method_windows.remove(&key) { + self.write_entries_from_window(window)?; + } + } + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _guard = telemetry_subscribers::TelemetryConfig::new() + .with_env() + .init(); + info!("Starting process_jsonl"); + let opt = ProcessJsonlOpt::parse(); + let input_file = File::open(&opt.input)?; + let reader = BufReader::new(input_file); + let output_file = File::create(&opt.output)?; + let mut processor = StreamProcessor::new(output_file, opt.window_minutes); + + for line in reader.lines() { + let line = line?; + if let Ok(entry) = serde_json::from_str(&line) { + processor.process_entry(entry)?; + } + } + processor.finish()?; + info!("Processing complete. Output written to {:?}", opt.output); + Ok(()) +} diff --git a/crates/sui-rpc-benchmark/src/bin/pull_grafana_logs.rs b/crates/sui-rpc-benchmark/src/bin/pull_grafana_logs.rs index 3c8eff22ac1102..347a16e5d56a1b 100644 --- a/crates/sui-rpc-benchmark/src/bin/pull_grafana_logs.rs +++ b/crates/sui-rpc-benchmark/src/bin/pull_grafana_logs.rs @@ -38,14 +38,51 @@ struct GrafanaLog { message: String, } -fn extract_body_from_message(message: &str) -> Option { +#[derive(Debug)] +struct LogEntry { + timestamp: String, + host: String, + method: String, + body: String, +} + +fn extract_from_message(message: &str) -> Option { + let timestamp = message.split_whitespace().next()?.to_string(); + + let headers_start = message.find("headers=")?; + let headers_str = &message[headers_start..]; + let headers_json_str = headers_str + .trim_start_matches("headers=") + .split_once(" body=")? + .0; + let headers: Value = serde_json::from_str(headers_json_str).ok()?; + let host = headers + .get("host") + .and_then(|h| h.as_str()) + .unwrap_or("unknown_host") + .to_string(); + if let Some(body_start) = message.find("body=") { if let Some(peer_type_start) = message.find(" peer_type=") { let raw_body = &message[(body_start + 5)..peer_type_start].trim(); if raw_body.starts_with('b') { let trimmed = raw_body.trim_start_matches('b').trim_matches('"'); let unescaped = trimmed.replace("\\\"", "\""); - return Some(unescaped); + + if let Ok(parsed) = serde_json::from_str::(&unescaped) { + let method = parsed + .get("method") + .and_then(|m| m.as_str()) + .unwrap_or("unknown_method") + .to_string(); + + return Some(LogEntry { + timestamp, + host, + method, + body: unescaped, + }); + } } } } @@ -170,27 +207,26 @@ async fn run() -> Result<(), Box> { info!("Found {} logs.", all_logs.len()); - let mut method_map: HashMap> = HashMap::new(); + let mut method_map: HashMap> = HashMap::new(); for log_entry in all_logs { - if let Some(body_content) = extract_body_from_message(&log_entry.message) { - if let Ok(parsed) = serde_json::from_str::(&body_content) { - let method = parsed - .get("method") - .and_then(|m| m.as_str()) - .unwrap_or("unknown_method") - .to_string(); - method_map.entry(method).or_default().push(body_content); - } + if let Some(entry) = extract_from_message(&log_entry.message) { + method_map + .entry(entry.method.clone()) + .or_default() + .push(entry); } } let file = File::create("sampled_read_requests.jsonl")?; let mut writer = BufWriter::new(file); - for (method, bodies) in method_map { - info!("Writing {} logs for method: {}", bodies.len(), method); - for body in bodies { - let line = format!(r#"{{"method":"{}", "body":{}}}"#, method, body); + for (method, entries) in method_map { + info!("Writing {} logs for method: {}", entries.len(), method); + for entry in entries { + let line = format!( + r#"{{"timestamp":"{}", "host":"{}", "method":"{}", "body":{}}}"#, + entry.timestamp, entry.host, entry.method, entry.body + ); writer.write_all(line.as_bytes())?; writer.write_all(b"\n")?; }