Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc benchmark: log additional info for cursor handling #21099

Open
wants to merge 1 commit into
base: grafana-pull
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 58 additions & 23 deletions crates/sui-rpc-benchmark/src/bin/pull_grafana_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,52 @@ struct GrafanaLog {
message: String,
}

fn extract_body_from_message(message: &str) -> Option<String> {
#[derive(Debug)]
struct LogEntry {
timestamp: String,
host: String,
method: String,
body: String,
}

/// One example message is:
/// 2025-02-11T23:15:17.944697206Z stderr F 2025-02-11T23:15:17.944501Z INFO sui_edge_proxy::handlers: Sampled read request headers={"host": "wallet-rpc.mainnet.sui.io", "client-sdk-type": "typescript", "client-sdk-version": "1.17.0", "client-target-api-version": "1.40.0", "client-request-method": "suix_getBalance", "content-type": "application/json", "content-length": "152", "accept-encoding": "gzip", "user-agent": "okhttp/4.9.2", "x-cloud-trace-context": "31caa7db658044d850a002ccf4ff15b1/8018737809747708392", "cookie": "_cfuvid=h0GD1bYot45Ln6kVCdL4qsFCCyw3h2cLw3caDNmhWNw-1739262948231-0.0.1.1-604800000", "via": "1.1 google", "x-forwarded-for": "171.236.184.3, 34.8.28.138", "x-forwarded-proto": "https", "connection": "Keep-Alive"} body=b"{\"jsonrpc\":\"2.0\",\"id\":189393,\"method\":\"suix_getBalance\",\"params\":[\"0x23cad599a375b9c2cedd62fa20112526c90a71764230425cb7f557c0c0b3b150\",\"0x2::sui::SUI\"]}" peer_type=Read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very helpful for understanding what's going on in this function, thank you -- I'm slightly horrified now that I understand the format being parsed, but we can leave that as a problem for another day.

fn extract_from_message(message: &str) -> Option<LogEntry> {
gegaowp marked this conversation as resolved.
Show resolved Hide resolved
let timestamp = message.split_whitespace().next()?.to_string();

let headers_start = message.find("headers=")?;
let headers_str = &message[headers_start..];
let headers_json_str = headers_str
.trim_start_matches("headers=")
.split_once(" body=")?
.0;
let headers: Value = serde_json::from_str(headers_json_str).ok()?;
let host = headers
.get("host")
.and_then(|h| h.as_str())
.unwrap_or("unknown_host")
.to_string();

if let Some(body_start) = message.find("body=") {
if let Some(peer_type_start) = message.find(" peer_type=") {
let raw_body = &message[(body_start + 5)..peer_type_start].trim();
if raw_body.starts_with('b') {
let trimmed = raw_body.trim_start_matches('b').trim_matches('"');
let unescaped = trimmed.replace("\\\"", "\"");
return Some(unescaped);

if let Ok(parsed) = serde_json::from_str::<Value>(&unescaped) {
let method = parsed
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("unknown_method")
.to_string();
return Some(LogEntry {
timestamp,
host,
method,
body: unescaped,
});
}
}
}
}
Expand Down Expand Up @@ -167,35 +205,32 @@ async fn run() -> Result<(), Box<dyn Error>> {
all_logs.extend(batch);
offset = Some(offset.unwrap_or(0) + batch_len as u64);
}

info!("Found {} logs.", all_logs.len());

let mut method_map: HashMap<String, Vec<String>> = HashMap::new();
for log_entry in all_logs {
if let Some(body_content) = extract_body_from_message(&log_entry.message) {
if let Ok(parsed) = serde_json::from_str::<Value>(&body_content) {
let method = parsed
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("unknown_method")
.to_string();
method_map.entry(method).or_default().push(body_content);
}
// Gather method statistics
let mut method_map: HashMap<String, usize> = HashMap::new();
let mut asc_log_entries = Vec::new();
for log_entry in all_logs.into_iter().rev() {
if let Some(entry) = extract_from_message(&log_entry.message) {
*method_map.entry(entry.method.clone()).or_default() += 1;
asc_log_entries.push(entry);
}
}
for (method, count) in &method_map {
info!("Found {} logs for method: {}", count, method);
}

// Write logs to file in ascending timestamp order
let file = File::create("sampled_read_requests.jsonl")?;
let mut writer = BufWriter::new(file);

for (method, bodies) in method_map {
info!("Writing {} logs for method: {}", bodies.len(), method);
for body in bodies {
let line = format!(r#"{{"method":"{}", "body":{}}}"#, method, body);
writer.write_all(line.as_bytes())?;
writer.write_all(b"\n")?;
}
for entry in asc_log_entries {
let line = format!(
r#"{{"timestamp":"{}", "host":"{}", "method":"{}", "body":{}}}"#,
entry.timestamp, entry.host, entry.method, entry.body
);
writer.write_all(line.as_bytes())?;
writer.write_all(b"\n")?;
}

writer.flush()?;
info!("Done! Wrote grouped logs to sampled_read_requests.jsonl");
Ok(())
Expand Down
Loading