Skip to content

Commit

Permalink
handle JSON RPC cursor breaking change
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Feb 5, 2025
1 parent c02e845 commit 8636f5c
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 17 deletions.
2 changes: 1 addition & 1 deletion crates/sui-rpc-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ sui-indexer-alt-framework.workspace = true
telemetry-subscribers.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-postgres = "0.7.12"
bb8 = "0.9.0"
bb8-postgres = "0.9.0"
tokio-postgres = "0.7.12"

[[bin]]
name = "sui-rpc-benchmark"
Expand Down
246 changes: 246 additions & 0 deletions crates/sui-rpc-benchmark/src/bin/process_jsonl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// This script processes JSONL logs of timestamps, hosts, methods and bodies.
/// It filters specific RPC calls within a sliding time window and writes
/// aggregated results (with iteration data) to an output JSONL file.
/// This is useful for methods of suix_getOwnedObjects, suix_getAllCoins, suix_getCoins,
/// b/c the `cursor` has changed to encode more information, thus old cursors are not valid
/// on the `alt` stack.
///
/// Usage Example:
/// cargo run --bin process_jsonl -- \
/// --input /path/to/input.jsonl \
/// --output /path/to/output.jsonl \
/// --window_minutes 60
use chrono::{DateTime, Duration, Utc};
use clap::Parser;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::PathBuf;
use tracing::{debug, info};

const METHODS_TO_AGGREGATE: [&str; 3] =
["suix_getOwnedObjects", "suix_getAllCoins", "suix_getCoins"];
const WALLET_HOST_PREFIX: &str = "wallet";

#[derive(Debug, Serialize, Deserialize)]
struct LogEntry {
timestamp: String,
host: String,
method: String,
body: Value,
}

#[derive(Debug, Serialize)]
struct OutputLogEntry {
timestamp: String,
host: String,
method: String,
body: Value,
iteration: usize,
}

/// Process JSONL files with sliding window aggregation
#[derive(Parser, Debug)]
#[command(name = "process_jsonl")]
struct ProcessJsonlOpt {
/// Input JSONL file path
#[arg(value_parser)]
input: PathBuf,

/// Output JSONL file path
#[arg(value_parser)]
output: PathBuf,

/// Window size in minutes
#[arg(long, default_value = "60")]
window_minutes: i64,
}

fn parse_timestamp(ts: &str) -> Result<DateTime<Utc>, Box<dyn Error>> {
Ok(DateTime::parse_from_rfc3339(ts)?.with_timezone(&Utc))
}

fn generate_grouping_key(method: &str, body: &Value) -> Result<String, Box<dyn Error>> {
let params = body
.get("params")
.and_then(|p| p.as_array())
.ok_or(format!(
"Invalid request body: missing params array for method {}",
method
))?;

match method {
"suix_getOwnedObjects" => {
let address = params
.first()
.and_then(|a| a.as_str())
.ok_or("Missing or invalid address parameter for getOwnedObjects")?;
let query = params
.get(1)
.and_then(|q| q.as_object())
.map(serde_json::to_string)
.transpose()?
.unwrap_or_else(|| "null".to_string());
Ok(format!("{}:{}:{}", method, address, query))
}
"suix_getCoins" => {
let owner = params
.first()
.and_then(|a| a.as_str())
.ok_or("Missing or invalid owner parameter for getCoins")?;
let coin_type = params
.get(1)
.and_then(|c| c.as_str())
.unwrap_or("0x2::sui::SUI");
Ok(format!("{}:{}:{}", method, owner, coin_type))
}
"suix_getAllCoins" => {
let owner = params
.first()
.and_then(|a| a.as_str())
.ok_or("Missing or invalid owner parameter for getAllCoins")?;
Ok(format!("{}:{}", method, owner))
}
_ => Ok(method.to_string()),
}
}

struct PendingWindow {
entries: Vec<LogEntry>,
window_end: DateTime<Utc>,
}

struct StreamProcessor {
method_windows: HashMap<String, PendingWindow>,
window_duration: Duration,
writer: BufWriter<File>,
}

impl StreamProcessor {
fn new(output_file: File, window_minutes: i64) -> Self {
Self {
method_windows: HashMap::new(),
window_duration: Duration::minutes(window_minutes),
writer: BufWriter::new(output_file),
}
}

fn process_entry(&mut self, entry: LogEntry) -> Result<(), Box<dyn Error>> {
if !entry.host.starts_with(WALLET_HOST_PREFIX) {
return Ok(());
}

if !METHODS_TO_AGGREGATE.contains(&entry.method.as_str()) {
return self.write_entry(OutputLogEntry {
timestamp: entry.timestamp,
host: entry.host,
method: entry.method,
body: entry.body,
iteration: 1,
});
}

let ts = parse_timestamp(&entry.timestamp)?;
self.flush_completed_windows(ts)?;

let key = generate_grouping_key(&entry.method, &entry.body)
.map_err(|e| format!("Failed to generate key: {}", e))?;

if let Some(window) = self.method_windows.get_mut(&key) {
window.entries.push(entry);
} else {
self.method_windows.insert(
key,
PendingWindow {
entries: vec![entry],
window_end: ts + self.window_duration,
},
);
}
Ok(())
}

fn write_entries_from_window(&mut self, window: PendingWindow) -> Result<(), Box<dyn Error>> {
let base_entry = window
.entries
.first()
.ok_or("Attempted to write from empty window")?;

self.write_entry(OutputLogEntry {
timestamp: base_entry.timestamp.clone(),
host: base_entry.host.clone(),
method: base_entry.method.clone(),
body: base_entry.body.clone(),
iteration: window.entries.len(),
})
}

fn write_entry(&mut self, entry: OutputLogEntry) -> Result<(), Box<dyn Error>> {
if entry.iteration > 1 {
debug!(
method = entry.method,
iteration = entry.iteration,
"Aggregated entries"
);
}
serde_json::to_writer(&mut self.writer, &entry)?;
self.writer.write_all(b"\n")?;
Ok(())
}

fn finish(mut self) -> Result<(), Box<dyn Error>> {
let windows: Vec<_> = self.method_windows.drain().map(|(_, w)| w).collect();
for window in windows {
self.write_entries_from_window(window)?;
}
self.writer.flush()?;
Ok(())
}

fn flush_completed_windows(&mut self, current_ts: DateTime<Utc>) -> Result<(), Box<dyn Error>> {
let mut completed_keys = Vec::new();
for (key, window) in &self.method_windows {
if current_ts > window.window_end {
completed_keys.push(key.clone());
}
}

for key in completed_keys {
debug!("Flushing completed windows");
info!("Flushing completed windows");
if let Some(window) = self.method_windows.remove(&key) {
self.write_entries_from_window(window)?;
}
}
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();
info!("Starting process_jsonl");
let opt = ProcessJsonlOpt::parse();
let input_file = File::open(&opt.input)?;
let reader = BufReader::new(input_file);
let output_file = File::create(&opt.output)?;
let mut processor = StreamProcessor::new(output_file, opt.window_minutes);

for line in reader.lines() {
let line = line?;
if let Ok(entry) = serde_json::from_str(&line) {
processor.process_entry(entry)?;
}
}
processor.finish()?;
info!("Processing complete. Output written to {:?}", opt.output);
Ok(())
}
67 changes: 51 additions & 16 deletions crates/sui-rpc-benchmark/src/bin/pull_grafana_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,50 @@ struct GrafanaLog {
message: String,
}

fn extract_body_from_message(message: &str) -> Option<String> {
#[derive(Debug)]
struct LogEntry {
timestamp: String,
host: String,
method: String,
body: String,
}

fn extract_from_message(message: &str) -> Option<LogEntry> {
let timestamp = message.split_whitespace().next()?.to_string();

let headers_start = message.find("headers=")?;
let headers_str = &message[headers_start..];
let headers_json_str = headers_str
.trim_start_matches("headers=")
.split_once(" body=")
.and_then(|(json_str, _)| serde_json::from_str(json_str).ok())?;
let headers: Value = serde_json::from_str(headers_json_str).ok()?;
let host = headers
.get("host")
.and_then(|h| h.as_str())
.expect("No host found in headers - erroring out")
.to_string();

if let Some(body_start) = message.find("body=") {
if let Some(peer_type_start) = message.find(" peer_type=") {
let raw_body = &message[(body_start + 5)..peer_type_start].trim();
if raw_body.starts_with('b') {
let trimmed = raw_body.trim_start_matches('b').trim_matches('"');
let unescaped = trimmed.replace("\\\"", "\"");
return Some(unescaped);

if let Ok(parsed) = serde_json::from_str::<Value>(&unescaped) {
let method = parsed
.get("method")
.and_then(|m| m.as_str())
.expect("No method found in parsed request body")
.to_string();
return Some(LogEntry {
timestamp,
host,
method,
body: unescaped,
});
}
}
}
}
Expand Down Expand Up @@ -170,27 +206,26 @@ async fn run() -> Result<(), Box<dyn Error>> {

info!("Found {} logs.", all_logs.len());

let mut method_map: HashMap<String, Vec<String>> = HashMap::new();
let mut method_map: HashMap<String, Vec<LogEntry>> = HashMap::new();
for log_entry in all_logs {
if let Some(body_content) = extract_body_from_message(&log_entry.message) {
if let Ok(parsed) = serde_json::from_str::<Value>(&body_content) {
let method = parsed
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("unknown_method")
.to_string();
method_map.entry(method).or_default().push(body_content);
}
if let Some(entry) = extract_from_message(&log_entry.message) {
method_map
.entry(entry.method.clone())
.or_default()
.push(entry);
}
}

let file = File::create("sampled_read_requests.jsonl")?;
let mut writer = BufWriter::new(file);

for (method, bodies) in method_map {
info!("Writing {} logs for method: {}", bodies.len(), method);
for body in bodies {
let line = format!(r#"{{"method":"{}", "body":{}}}"#, method, body);
for (method, entries) in method_map {
info!("Writing {} logs for method: {}", entries.len(), method);
for entry in entries {
let line = format!(
r#"{{"timestamp":"{}", "host":"{}", "method":"{}", "body":{}}}"#,
entry.timestamp, entry.host, entry.method, entry.body
);
writer.write_all(line.as_bytes())?;
writer.write_all(b"\n")?;
}
Expand Down

0 comments on commit 8636f5c

Please sign in to comment.