-
Notifications
You must be signed in to change notification settings - Fork 11.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
handle JSON RPC cursor breaking change
- Loading branch information
Showing
3 changed files
with
301 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,248 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
/// This script processes JSONL logs of timestamps, hosts, methods and bodies. | ||
/// It filters specific RPC calls within a sliding time window and writes | ||
/// aggregated results (with iteration data) to an output JSONL file. | ||
/// This is useful for methods of suix_getOwnedObjects, suix_getAllCoins, suix_getCoins, | ||
/// b/c the `cursor` has changed to encode more information, thus old cursors are not valid | ||
/// on the `alt` stack. | ||
/// | ||
/// Usage Example: | ||
/// cargo run --bin process_jsonl -- \ | ||
/// --input /path/to/input.jsonl \ | ||
/// --output /path/to/output.jsonl \ | ||
/// --window_minutes 60 | ||
use chrono::{DateTime, Duration, Utc}; | ||
use clap::Parser; | ||
use serde::{Deserialize, Serialize}; | ||
use serde_json::Value; | ||
use std::collections::HashMap; | ||
use std::error::Error; | ||
use std::fs::File; | ||
use std::io::{BufRead, BufReader, BufWriter, Write}; | ||
use std::path::PathBuf; | ||
use tracing::{debug, info}; | ||
|
||
const METHODS_TO_AGGREGATE: [&str; 3] = | ||
["suix_getOwnedObjects", "suix_getAllCoins", "suix_getCoins"]; | ||
const WALLET_HOST_PREFIX: &str = "wallet"; | ||
|
||
#[derive(Debug, Serialize, Deserialize)] | ||
struct LogEntry { | ||
timestamp: String, | ||
host: String, | ||
method: String, | ||
body: Value, | ||
} | ||
|
||
#[derive(Debug, Serialize)] | ||
struct OutputLogEntry { | ||
timestamp: String, | ||
host: String, | ||
method: String, | ||
body: Value, | ||
iteration: usize, | ||
} | ||
|
||
/// Process JSONL files with sliding window aggregation | ||
#[derive(Parser, Debug)] | ||
#[command(name = "process_jsonl")] | ||
struct ProcessJsonlOpt { | ||
/// Input JSONL file path | ||
#[arg(value_parser)] | ||
input: PathBuf, | ||
|
||
/// Output JSONL file path | ||
#[arg(value_parser)] | ||
output: PathBuf, | ||
|
||
/// Window size in minutes | ||
#[arg(long, default_value = "60")] | ||
window_minutes: i64, | ||
} | ||
|
||
fn parse_timestamp(ts: &str) -> Result<DateTime<Utc>, Box<dyn Error>> { | ||
Ok(DateTime::parse_from_rfc3339(ts)?.with_timezone(&Utc)) | ||
} | ||
|
||
fn generate_grouping_key(method: &str, body: &Value) -> Result<String, Box<dyn Error>> { | ||
let params = body | ||
.get("params") | ||
.and_then(|p| p.as_array()) | ||
.ok_or_else(|| { | ||
format!( | ||
"Invalid request body: missing params array for method {}", | ||
method | ||
) | ||
})?; | ||
|
||
match method { | ||
"suix_getOwnedObjects" => { | ||
let address = params | ||
.first() | ||
.and_then(|a| a.as_str()) | ||
.ok_or_else(|| "Missing or invalid address parameter for getOwnedObjects")?; | ||
let query = params | ||
.get(1) | ||
.and_then(|q| q.as_object()) | ||
.map(|q| serde_json::to_string(q)) | ||
.transpose()? | ||
.unwrap_or_else(|| "null".to_string()); | ||
Ok(format!("{}:{}:{}", method, address, query)) | ||
} | ||
"suix_getCoins" => { | ||
let owner = params | ||
.first() | ||
.and_then(|a| a.as_str()) | ||
.ok_or_else(|| "Missing or invalid owner parameter for getCoins")?; | ||
let coin_type = params | ||
.get(1) | ||
.and_then(|c| c.as_str()) | ||
.unwrap_or("0x2::sui::SUI"); | ||
Ok(format!("{}:{}:{}", method, owner, coin_type)) | ||
} | ||
"suix_getAllCoins" => { | ||
let owner = params | ||
.first() | ||
.and_then(|a| a.as_str()) | ||
.ok_or_else(|| "Missing or invalid owner parameter for getAllCoins")?; | ||
Ok(format!("{}:{}", method, owner)) | ||
} | ||
_ => Ok(method.to_string()), | ||
} | ||
} | ||
|
||
struct PendingWindow { | ||
entries: Vec<LogEntry>, | ||
window_end: DateTime<Utc>, | ||
} | ||
|
||
struct StreamProcessor { | ||
method_windows: HashMap<String, PendingWindow>, | ||
window_duration: Duration, | ||
writer: BufWriter<File>, | ||
} | ||
|
||
impl StreamProcessor { | ||
fn new(output_file: File, window_minutes: i64) -> Self { | ||
Self { | ||
method_windows: HashMap::new(), | ||
window_duration: Duration::minutes(window_minutes), | ||
writer: BufWriter::new(output_file), | ||
} | ||
} | ||
|
||
fn process_entry(&mut self, entry: LogEntry) -> Result<(), Box<dyn Error>> { | ||
if !entry.host.starts_with(WALLET_HOST_PREFIX) { | ||
return Ok(()); | ||
} | ||
|
||
if !METHODS_TO_AGGREGATE.contains(&entry.method.as_str()) { | ||
return self.write_entry(OutputLogEntry { | ||
timestamp: entry.timestamp, | ||
host: entry.host, | ||
method: entry.method, | ||
body: entry.body, | ||
iteration: 1, | ||
}); | ||
} | ||
|
||
let ts = parse_timestamp(&entry.timestamp)?; | ||
self.flush_completed_windows(ts)?; | ||
|
||
let key = generate_grouping_key(&entry.method, &entry.body) | ||
.map_err(|e| format!("Failed to generate key: {}", e))?; | ||
|
||
if let Some(window) = self.method_windows.get_mut(&key) { | ||
window.entries.push(entry); | ||
} else { | ||
self.method_windows.insert( | ||
key, | ||
PendingWindow { | ||
entries: vec![entry], | ||
window_end: ts + self.window_duration, | ||
}, | ||
); | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn write_entries_from_window(&mut self, window: PendingWindow) -> Result<(), Box<dyn Error>> { | ||
let base_entry = window | ||
.entries | ||
.first() | ||
.ok_or_else(|| "Attempted to write from empty window".to_string())?; | ||
|
||
self.write_entry(OutputLogEntry { | ||
timestamp: base_entry.timestamp.clone(), | ||
host: base_entry.host.clone(), | ||
method: base_entry.method.clone(), | ||
body: base_entry.body.clone(), | ||
iteration: window.entries.len(), | ||
}) | ||
} | ||
|
||
fn write_entry(&mut self, entry: OutputLogEntry) -> Result<(), Box<dyn Error>> { | ||
if entry.iteration > 1 { | ||
debug!( | ||
method = entry.method, | ||
iteration = entry.iteration, | ||
"Aggregated entries" | ||
); | ||
} | ||
serde_json::to_writer(&mut self.writer, &entry)?; | ||
self.writer.write_all(b"\n")?; | ||
Ok(()) | ||
} | ||
|
||
fn finish(mut self) -> Result<(), Box<dyn Error>> { | ||
let windows: Vec<_> = self.method_windows.drain().map(|(_, w)| w).collect(); | ||
for window in windows { | ||
self.write_entries_from_window(window)?; | ||
} | ||
self.writer.flush()?; | ||
Ok(()) | ||
} | ||
|
||
fn flush_completed_windows(&mut self, current_ts: DateTime<Utc>) -> Result<(), Box<dyn Error>> { | ||
let mut completed_keys = Vec::new(); | ||
for (key, window) in &self.method_windows { | ||
if current_ts > window.window_end { | ||
completed_keys.push(key.clone()); | ||
} | ||
} | ||
|
||
for key in completed_keys { | ||
debug!("Flushing completed windows"); | ||
info!("Flushing completed windows"); | ||
if let Some(window) = self.method_windows.remove(&key) { | ||
self.write_entries_from_window(window)?; | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn Error>> { | ||
let _guard = telemetry_subscribers::TelemetryConfig::new() | ||
.with_env() | ||
.init(); | ||
info!("Starting process_jsonl"); | ||
let opt = ProcessJsonlOpt::parse(); | ||
let input_file = File::open(&opt.input)?; | ||
let reader = BufReader::new(input_file); | ||
let output_file = File::create(&opt.output)?; | ||
let mut processor = StreamProcessor::new(output_file, opt.window_minutes); | ||
|
||
for line in reader.lines() { | ||
let line = line?; | ||
if let Ok(entry) = serde_json::from_str(&line) { | ||
processor.process_entry(entry)?; | ||
} | ||
} | ||
processor.finish()?; | ||
info!("Processing complete. Output written to {:?}", opt.output); | ||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters