diff --git a/README.md b/README.md index 960df8b..27575eb 100644 --- a/README.md +++ b/README.md @@ -17,15 +17,16 @@ This leaderboard uses the `data.json.gz` dataset included in the repo. Use this | 1 | [kjcao](src/kjcao.rs) | 5,188,464 | | 2 | [fulmicoton](src/fulmicoton.rs) | 5,677,291 | | 3 | [xinyuzeng](src/xinyuzeng.rs) | 5,784,827 | -| 4 | [natebrennand](src/natebrennand.rs)| 5,996,236 | -| 5 | [jakedgy](src/jakedgy.rs) | 6,402,499 | -| 6 | [hachikuji](src/hachikuji.rs) | 6,524,516 | -| 7 | [XiangpengHao](src/xiangpenghao.rs)| 6,847,283 | -| 8 | [agavra](src/agavra.rs) | 7,273,680 | -| 9 | [fabinout](src/fabinout.rs) | 7,283,778 | -| 10 | [samsond](src/samsond.rs) | 7,564,554 | -| 11 | *[Zstd(22)](src/zstd.rs)* | 11,917,798 | -| 12 | *[Zstd(9)](src/zstd.rs)* | 17,869,403 | +| 4 | [cometkim](src/cometkim.rs) | 5,797,365 | +| 5 | [natebrennand](src/natebrennand.rs)| 5,996,236 | +| 6 | [jakedgy](src/jakedgy.rs) | 6,402,499 | +| 7 | [hachikuji](src/hachikuji.rs) | 6,524,516 | +| 8 | [XiangpengHao](src/xiangpenghao.rs)| 6,847,283 | +| 9 | [agavra](src/agavra.rs) | 7,273,680 | +| 10 | [fabinout](src/fabinout.rs) | 7,283,778 | +| 11 | [samsond](src/samsond.rs) | 7,564,554 | +| 12 | *[Zstd(22)](src/zstd.rs)* | 11,917,798 | +| 13 | *[Zstd(9)](src/zstd.rs)* | 17,869,403 | | | *[Naive (baseline)](src/naive.rs)* | 210,727,389 | ### Evaluation Dataset Leaderboard diff --git a/src/cometkim.rs b/src/cometkim.rs new file mode 100644 index 0000000..0124468 --- /dev/null +++ b/src/cometkim.rs @@ -0,0 +1,848 @@ +//! # cometkim Codec +//! +//! **Core Strategy:** +//! Assuming all data is correctly ordered by `created_at` field. (It mostly is) +//! There are exceptions in data but its rate is pretty low. (0.1218% in the sample data) +//! +//! Push abnormal (out-of-order) items into separate block. +//! The decoder uses these block to perform "error-correction" phase. +//! +//! **Additional optimizations:** +//! - pcodec for all numeric columns (IDs, types, counts, repo indices) +//! - Split repo names into owner/suffix with dictionary encoding for owners +//! - Better owner preprocessing inspired by kjcao's approach +//! +//! **Binary Format:** +//! +//! Header: +//! base_id: u64 +//! base_timestamp: u32 (seconds since epoch) +//! total_clean_events: varint +//! interval_count: varint (seconds span of clean stream) +//! +//! Repo Dictionary: +//! count: varint +//! repo_id_deltas: [signed_varint; count] // first is absolute +//! owner dictionary + indices +//! suffixes +//! +//! Clean Timeseries (per second): +//! event_count: u8 +//! if event_count > 0: +//! type_indices: [u8; event_count] +//! id_deltas: [signed_varint; event_count] +//! repo_indices: [varint; event_count] +//! +//! Error Batches: +//! batch_count: varint +//! for each batch: same as original format +//! +//! Set COMETKIM_DEBUG=1 to see column size statistics and compression experiments. + +use bytes::Bytes; +use chrono::{DateTime, TimeZone, Utc}; +use pco::standalone::{simple_compress, simple_decompress}; +use pco::ChunkConfig; +use std::collections::HashMap; +use std::error::Error; + +use crate::codec::EventCodec; +use crate::{EventKey, EventValue, Repo}; + +const ZSTD_LEVEL: i32 = 22; + +fn debug_enabled() -> bool { + std::env::var("COMETKIM_DEBUG").is_ok() +} + +// GitHub events are well-known. +// All available events are listed on https://docs.github.com/en/rest/using-the-rest-api/github-event-types +#[derive(Clone, Copy, Debug)] +#[repr(u8)] +enum GitHubEventType { + CommitComment = 0, + Create = 1, + Delete = 2, + Discussion = 3, + Fork = 4, + Gollum = 5, + IssueComment = 6, + Issues = 7, + Member = 8, + Public = 9, + PullRequest = 10, + PullRequestReview = 11, + PullRequestReviewComment = 12, + Push = 13, + Release = 14, + Watch = 15, +} + +impl From<&str> for GitHubEventType { + fn from(s: &str) -> Self { + match s { + "CommitCommentEvent" => GitHubEventType::CommitComment, + "CreateEvent" => GitHubEventType::Create, + "DeleteEvent" => GitHubEventType::Delete, + "DiscussionEvent" => GitHubEventType::Discussion, + "ForkEvent" => GitHubEventType::Fork, + "GollumEvent" => GitHubEventType::Gollum, + "IssueCommentEvent" => GitHubEventType::IssueComment, + "IssuesEvent" => GitHubEventType::Issues, + "MemberEvent" => GitHubEventType::Member, + "PublicEvent" => GitHubEventType::Public, + "PullRequestEvent" => GitHubEventType::PullRequest, + "PullRequestReviewEvent" => GitHubEventType::PullRequestReview, + "PullRequestReviewCommentEvent" => GitHubEventType::PullRequestReviewComment, + "PushEvent" => GitHubEventType::Push, + "ReleaseEvent" => GitHubEventType::Release, + "WatchEvent" => GitHubEventType::Watch, + _ => panic!("Unknown event type: {}", s), + } + } +} + +impl From for &'static str { + fn from(val: GitHubEventType) -> Self { + match val { + GitHubEventType::CommitComment => "CommitCommentEvent", + GitHubEventType::Create => "CreateEvent", + GitHubEventType::Delete => "DeleteEvent", + GitHubEventType::Discussion => "DiscussionEvent", + GitHubEventType::Fork => "ForkEvent", + GitHubEventType::Gollum => "GollumEvent", + GitHubEventType::IssueComment => "IssueCommentEvent", + GitHubEventType::Issues => "IssuesEvent", + GitHubEventType::Member => "MemberEvent", + GitHubEventType::Public => "PublicEvent", + GitHubEventType::PullRequest => "PullRequestEvent", + GitHubEventType::PullRequestReview => "PullRequestReviewEvent", + GitHubEventType::PullRequestReviewComment => "PullRequestReviewCommentEvent", + GitHubEventType::Push => "PushEvent", + GitHubEventType::Release => "ReleaseEvent", + GitHubEventType::Watch => "WatchEvent", + } + } +} + +impl From for GitHubEventType { + fn from(val: u8) -> Self { + match val { + 0 => GitHubEventType::CommitComment, + 1 => GitHubEventType::Create, + 2 => GitHubEventType::Delete, + 3 => GitHubEventType::Discussion, + 4 => GitHubEventType::Fork, + 5 => GitHubEventType::Gollum, + 6 => GitHubEventType::IssueComment, + 7 => GitHubEventType::Issues, + 8 => GitHubEventType::Member, + 9 => GitHubEventType::Public, + 10 => GitHubEventType::PullRequest, + 11 => GitHubEventType::PullRequestReview, + 12 => GitHubEventType::PullRequestReviewComment, + 13 => GitHubEventType::Push, + 14 => GitHubEventType::Release, + 15 => GitHubEventType::Watch, + _ => panic!("Unknown event type value: {}", val), + } + } +} + +// Varint encoding/decoding helpers +fn encode_varint(mut value: u64, buf: &mut Vec) { + while value >= 0x80 { + buf.push((value as u8) | 0x80); + value >>= 7; + } + buf.push(value as u8); +} + +fn decode_varint(bytes: &[u8], pos: &mut usize) -> u64 { + let mut result: u64 = 0; + let mut shift = 0; + loop { + let byte = bytes[*pos]; + *pos += 1; + result |= ((byte & 0x7F) as u64) << shift; + if byte & 0x80 == 0 { + break; + } + shift += 7; + } + result +} + +fn encode_signed_varint(value: i64, buf: &mut Vec) { + let encoded = ((value << 1) ^ (value >> 63)) as u64; + encode_varint(encoded, buf); +} + +fn decode_signed_varint(bytes: &[u8], pos: &mut usize) -> i64 { + let encoded = decode_varint(bytes, pos); + ((encoded >> 1) as i64) ^ (-((encoded & 1) as i64)) +} + +fn parse_timestamp(ts: &str) -> Option { + DateTime::parse_from_rfc3339(ts) + .ok() + .map(|dt| dt.timestamp() as u32) +} + +fn format_timestamp(ts: u32) -> String { + Utc.timestamp_opt(ts as i64, 0) + .single() + .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)) + .unwrap_or_default() +} + +/// A parsed event for internal processing +#[derive(Clone, Debug)] +struct ParsedEvent { + id: u64, + event_type: u8, + repo_idx: u32, + timestamp: u32, // seconds since epoch +} + +/// Error batch containing events that are out of timestamp order +#[derive(Clone, Debug)] +struct ErrorBatch { + insert_position: usize, // position in original input array + events: Vec, // events in this batch +} + +/// Grouped events by timestamp within an error batch +struct ErrorBatchInterval { + timestamp_offset: u32, // offset from base_timestamp + event_count: usize, +} + +/// Repo entry with owner/suffix split +#[derive(Clone, Debug)] +struct RepoEntry { + id: u64, + owner: String, + suffix: String, +} + +pub struct CometkimCodec; + +impl CometkimCodec { + pub fn new() -> Self { + Self + } +} + +impl EventCodec for CometkimCodec { + fn name(&self) -> &str { + "cometkim" + } + + fn encode(&self, events: &[(EventKey, EventValue)]) -> Result> { + if events.is_empty() { + return Ok(Bytes::new()); + } + + // Step 1: Build repo dictionary with owner/suffix split + let (repo_to_idx, repo_list) = build_repo_dictionary(events); + + // Step 2: Parse all events + let parsed_events: Vec = events + .iter() + .map(|(key, value)| { + let id: u64 = key.id.parse().unwrap(); + let event_type = GitHubEventType::from(key.event_type.as_str()) as u8; + let repo_key = (value.repo.id, value.repo.name.clone()); + let repo_idx = *repo_to_idx.get(&repo_key).unwrap(); + let timestamp = parse_timestamp(&value.created_at).unwrap(); + ParsedEvent { + id, + event_type, + repo_idx, + timestamp, + } + }) + .collect(); + + // Step 3: Separate clean events and error batches + let (clean_events, error_batches) = separate_errors(&parsed_events); + + // Step 4: Calculate base values + let base_id = clean_events.iter().map(|e| e.id).min().unwrap_or(0); + let base_timestamp = clean_events.iter().map(|e| e.timestamp).min().unwrap_or(0); + let max_timestamp = clean_events.iter().map(|e| e.timestamp).max().unwrap_or(0); + let interval_count = if max_timestamp >= base_timestamp { + (max_timestamp - base_timestamp + 1) as usize + } else { + 0 + }; + + // Step 5: Group clean events by second + let mut events_by_second: Vec> = vec![Vec::new(); interval_count]; + for event in &clean_events { + let offset = (event.timestamp - base_timestamp) as usize; + events_by_second[offset].push(event); + } + + // Step 6: Encode everything with per-column compression + let mut buf = Vec::new(); + + // Header (uncompressed - small fixed size) + buf.extend_from_slice(&base_id.to_le_bytes()); + buf.extend_from_slice(&base_timestamp.to_le_bytes()); + encode_varint(clean_events.len() as u64, &mut buf); + encode_varint(interval_count as u64, &mut buf); + + // Repo dictionary (compressed with zstd) + let mut repo_buf = Vec::new(); + encode_repo_dictionary(&repo_list, &mut repo_buf)?; + let repo_compressed = zstd::encode_all(&repo_buf[..], ZSTD_LEVEL)?; + encode_varint(repo_compressed.len() as u64, &mut buf); + buf.extend_from_slice(&repo_compressed); + + // Clean timeseries columns + // Column 1: Event counts per second + let counts: Vec = events_by_second.iter().map(|s| s.len() as u8).collect(); + + // Column 2: Type indices + let mut all_types: Vec = Vec::new(); + for second_events in &events_by_second { + for event in second_events { + all_types.push(event.event_type); + } + } + + // Column 3: Event IDs + let mut all_ids: Vec = Vec::new(); + for second_events in &events_by_second { + for event in second_events { + all_ids.push(event.id); + } + } + + // Column 4: Repo indices + let mut all_repo_indices: Vec = Vec::new(); + for second_events in &events_by_second { + for event in second_events { + all_repo_indices.push(event.repo_idx); + } + } + + // Use pcodec for numeric columns with max compression + let mut pco_config = ChunkConfig::default(); + pco_config.compression_level = 12; + pco_config.enable_8_bit = true; + + let counts_compressed = simple_compress(&counts, &pco_config)?; + let types_compressed = simple_compress(&all_types, &pco_config)?; + let ids_compressed = simple_compress(&all_ids, &pco_config)?; + let repos_compressed = simple_compress(&all_repo_indices, &pco_config)?; + + // Write columns with lengths + encode_varint(counts_compressed.len() as u64, &mut buf); + buf.extend_from_slice(&counts_compressed); + encode_varint(types_compressed.len() as u64, &mut buf); + buf.extend_from_slice(&types_compressed); + encode_varint(ids_compressed.len() as u64, &mut buf); + buf.extend_from_slice(&ids_compressed); + encode_varint(repos_compressed.len() as u64, &mut buf); + buf.extend_from_slice(&repos_compressed); + + // Error batches (compressed with zstd) + let mut err_buf = Vec::new(); + encode_varint(error_batches.len() as u64, &mut err_buf); + for batch in &error_batches { + encode_error_batch(batch, base_id, base_timestamp, &mut err_buf); + } + let err_compressed = zstd::encode_all(&err_buf[..], ZSTD_LEVEL)?; + encode_varint(err_compressed.len() as u64, &mut buf); + buf.extend_from_slice(&err_compressed); + + if debug_enabled() { + // Debug size breakdown + eprintln!("=== Size Breakdown ==="); + eprintln!("Repo dict: {} bytes", repo_compressed.len()); + eprintln!("Counts: {} bytes", counts_compressed.len()); + eprintln!("Types: {} bytes", types_compressed.len()); + eprintln!("IDs: {} bytes", ids_compressed.len()); + eprintln!("Repo idx: {} bytes", repos_compressed.len()); + eprintln!("Errors: {} bytes", err_compressed.len()); + } + + Ok(Bytes::from(buf)) + } + + fn decode(&self, bytes: &[u8]) -> Result, Box> { + if bytes.is_empty() { + return Ok(Vec::new()); + } + + let mut pos = 0; + + // Read header + let base_id = u64::from_le_bytes(bytes[pos..pos + 8].try_into()?); + pos += 8; + let base_timestamp = u32::from_le_bytes(bytes[pos..pos + 4].try_into()?); + pos += 4; + let total_clean_events = decode_varint(bytes, &mut pos) as usize; + let interval_count = decode_varint(bytes, &mut pos) as usize; + + // Read and decompress repo dictionary + let repo_len = decode_varint(bytes, &mut pos) as usize; + let repo_bytes = zstd::decode_all(&bytes[pos..pos + repo_len])?; + pos += repo_len; + let mut repo_pos = 0; + let repo_list = decode_repo_dictionary(&repo_bytes, &mut repo_pos)?; + + // Read column data (pcodec compressed) + let counts_len = decode_varint(bytes, &mut pos) as usize; + let counts: Vec = simple_decompress(&bytes[pos..pos + counts_len])?; + pos += counts_len; + + let types_len = decode_varint(bytes, &mut pos) as usize; + let all_types: Vec = simple_decompress(&bytes[pos..pos + types_len])?; + pos += types_len; + + let ids_len = decode_varint(bytes, &mut pos) as usize; + let all_ids: Vec = simple_decompress(&bytes[pos..pos + ids_len])?; + pos += ids_len; + + let repos_len = decode_varint(bytes, &mut pos) as usize; + let all_repo_indices: Vec = simple_decompress(&bytes[pos..pos + repos_len])?; + pos += repos_len; + + // Parse clean timeseries from columns + let mut clean_events: Vec = Vec::with_capacity(total_clean_events); + let mut event_pos = 0; + + for (second_offset, &count) in counts.iter().enumerate().take(interval_count) { + let count = count as usize; + + if count > 0 { + let timestamp = base_timestamp + second_offset as u32; + + for _ in 0..count { + let event_type = all_types[event_pos]; + let id = all_ids[event_pos]; + let repo_idx = all_repo_indices[event_pos]; + event_pos += 1; + + clean_events.push(ParsedEvent { + id, + event_type, + repo_idx, + timestamp, + }); + } + } + } + + // Read and decompress error batches + let err_len = decode_varint(bytes, &mut pos) as usize; + let err_bytes = zstd::decode_all(&bytes[pos..pos + err_len])?; + let mut err_pos = 0; + + let batch_count = decode_varint(&err_bytes, &mut err_pos) as usize; + let mut error_batches: Vec = Vec::with_capacity(batch_count); + + for _ in 0..batch_count { + let batch = decode_error_batch(&err_bytes, &mut err_pos, base_id, base_timestamp)?; + error_batches.push(batch); + } + + // Reconstruct final events by inserting error batches + let mut final_events = clean_events; + + // Process error batches in reverse order to maintain correct positions + for batch in error_batches.into_iter().rev() { + let insert_pos = batch.insert_position.min(final_events.len()); + for (i, event) in batch.events.into_iter().enumerate() { + final_events.insert(insert_pos + i, event); + } + } + + // Convert to output format + let result: Vec<(EventKey, EventValue)> = final_events + .into_iter() + .map(|event| { + let repo_entry = &repo_list[event.repo_idx as usize]; + let repo_name = if repo_entry.suffix.is_empty() { + repo_entry.owner.clone() + } else { + format!("{}/{}", repo_entry.owner, repo_entry.suffix) + }; + let key = EventKey { + id: event.id.to_string(), + event_type: <&str>::from(GitHubEventType::from(event.event_type)).to_string(), + }; + let value = EventValue { + repo: Repo { + id: repo_entry.id, + name: repo_name.clone(), + url: format!("https://api.github.com/repos/{}", repo_name), + }, + created_at: format_timestamp(event.timestamp), + }; + (key, value) + }) + .collect(); + + Ok(result) + } +} + +type RepoDict = (HashMap<(u64, String), u32>, Vec); + +/// Build repo dictionary sorted by frequency for better index compression +/// with owner/suffix split for better compression +fn build_repo_dictionary(events: &[(EventKey, EventValue)]) -> RepoDict { + // Count frequency of each repo + let mut repo_counts: HashMap<(u64, String), usize> = HashMap::new(); + for (_, value) in events { + let key = (value.repo.id, value.repo.name.clone()); + *repo_counts.entry(key).or_insert(0) += 1; + } + + // Build repo entries + let mut repo_entries: Vec<((u64, String), RepoEntry, usize)> = repo_counts + .into_iter() + .map(|(key, count)| { + let parts: Vec<&str> = key.1.splitn(2, '/').collect(); + let (owner, suffix) = if parts.len() == 2 { + (parts[0].to_string(), parts[1].to_string()) + } else { + (key.1.clone(), String::new()) + }; + let entry = RepoEntry { + id: key.0, + owner, + suffix, + }; + (key, entry, count) + }) + .collect(); + + // Sort by frequency (descending) so frequent repos get small indices + repo_entries.sort_by(|a, b| b.2.cmp(&a.2).then_with(|| a.0.cmp(&b.0))); + + // Build index map and repo list + let mut repo_to_idx: HashMap<(u64, String), u32> = HashMap::new(); + let mut repo_list: Vec = Vec::with_capacity(repo_entries.len()); + + for (idx, (key, entry, _)) in repo_entries.into_iter().enumerate() { + repo_to_idx.insert(key, idx as u32); + repo_list.push(entry); + } + + (repo_to_idx, repo_list) +} + +/// Separate clean events from error batches +fn separate_errors(events: &[ParsedEvent]) -> (Vec, Vec) { + let mut clean_events: Vec = Vec::new(); + let mut error_batches: Vec = Vec::new(); + + let mut checkpoint: u32 = 0; + let mut current_error_batch: Option = None; + + for event in events.iter() { + if event.timestamp < checkpoint { + // This is an error event + if let Some(ref mut batch) = current_error_batch { + batch.events.push(event.clone()); + } else { + // insert_position = number of clean events so far + current_error_batch = Some(ErrorBatch { + insert_position: clean_events.len(), + events: vec![event.clone()], + }); + } + } else { + // This is a clean event + if let Some(batch) = current_error_batch.take() { + error_batches.push(batch); + } + checkpoint = event.timestamp; + clean_events.push(event.clone()); + } + } + + // Don't forget the last error batch + if let Some(batch) = current_error_batch { + error_batches.push(batch); + } + + (clean_events, error_batches) +} + +/// Encode repo dictionary with improved owner/suffix split and preprocessing +fn encode_repo_dictionary( + repo_list: &[RepoEntry], + buf: &mut Vec, +) -> Result<(), Box> { + encode_varint(repo_list.len() as u64, buf); + + // Repo IDs - use pcodec directly + let ids: Vec = repo_list.iter().map(|e| e.id).collect(); + let mut pco_config = ChunkConfig::default(); + pco_config.compression_level = 12; + let ids_compressed = simple_compress(&ids, &pco_config)?; + encode_varint(ids_compressed.len() as u64, buf); + buf.extend_from_slice(&ids_compressed); + + // Build owner dictionary (frequency-sorted for better compression) + let mut owner_counts: HashMap<&str, usize> = HashMap::new(); + for entry in repo_list { + *owner_counts.entry(&entry.owner).or_insert(0) += 1; + } + let mut owner_list: Vec<&str> = owner_counts.keys().copied().collect(); + owner_list.sort_by(|a, b| owner_counts[b].cmp(&owner_counts[a]).then_with(|| a.cmp(b))); + + let owner_to_idx: HashMap<&str, u32> = owner_list + .iter() + .enumerate() + .map(|(i, &s)| (s, i as u32)) + .collect(); + + // Write owner count + encode_varint(owner_list.len() as u64, buf); + + // Write owner strings (null-terminated for efficient parsing) + for owner in &owner_list { + buf.extend_from_slice(owner.as_bytes()); + buf.push(0); // null terminator + } + + // Write owner indices for each repo (pcodec compressed) + let owner_indices: Vec = repo_list + .iter() + .map(|e| owner_to_idx[e.owner.as_str()]) + .collect(); + let owner_indices_compressed = simple_compress(&owner_indices, &pco_config)?; + encode_varint(owner_indices_compressed.len() as u64, buf); + buf.extend_from_slice(&owner_indices_compressed); + + // Write suffixes with transformations (like kjcao's approach) + for entry in repo_list { + let owner = &entry.owner; + let suffix = &entry.suffix; + + if suffix == owner { + // Same as owner + buf.push(0x01); + } else if suffix.starts_with(owner) { + // Starts with owner + let remainder = &suffix[owner.len()..]; + if let Some(first_char) = remainder.chars().next() { + if matches!(first_char, '-' | '_' | '.') { + buf.push(0x02); + buf.extend_from_slice(remainder.as_bytes()); + buf.push(0); + continue; + } + } + // Fall through to raw + buf.push(0x00); + buf.extend_from_slice(suffix.as_bytes()); + buf.push(0); + } else { + // Raw string + buf.push(0x00); + buf.extend_from_slice(suffix.as_bytes()); + buf.push(0); + } + } + Ok(()) +} + +/// Decode repo dictionary +fn decode_repo_dictionary(bytes: &[u8], pos: &mut usize) -> Result, Box> { + let count = decode_varint(bytes, pos) as usize; + + // Decode repo IDs with pcodec + let ids_len = decode_varint(bytes, pos) as usize; + let repo_ids: Vec = simple_decompress(&bytes[*pos..*pos + ids_len])?; + *pos += ids_len; + + // Read owner dictionary + let owner_count = decode_varint(bytes, pos) as usize; + let mut owner_list: Vec = Vec::with_capacity(owner_count); + for _ in 0..owner_count { + let null_pos = bytes[*pos..] + .iter() + .position(|&b| b == 0) + .unwrap_or(bytes.len() - *pos); + let owner = String::from_utf8(bytes[*pos..*pos + null_pos].to_vec())?; + *pos += null_pos + 1; // Skip null terminator + owner_list.push(owner); + } + + // Read owner indices (pcodec compressed) + let owner_indices_len = decode_varint(bytes, pos) as usize; + let owner_indices_u32: Vec = simple_decompress(&bytes[*pos..*pos + owner_indices_len])?; + *pos += owner_indices_len; + let owner_indices: Vec = owner_indices_u32.iter().map(|&i| i as usize).collect(); + + // Read suffixes with transformations + let mut repo_list: Vec = Vec::with_capacity(count); + for i in 0..count { + let owner_idx = owner_indices[i]; + let owner = if owner_idx < owner_list.len() { + owner_list[owner_idx].clone() + } else { + String::new() + }; + + let flag = bytes[*pos]; + *pos += 1; + + let suffix = match flag { + 0x01 => { + // Same as owner + owner.clone() + } + 0x02 => { + // Starts with owner + remainder + let null_pos = bytes[*pos..] + .iter() + .position(|&b| b == 0) + .unwrap_or(bytes.len() - *pos); + let remainder = String::from_utf8(bytes[*pos..*pos + null_pos].to_vec())?; + *pos += null_pos + 1; + format!("{}{}", owner, remainder) + } + _ => { + // Raw string + let null_pos = bytes[*pos..] + .iter() + .position(|&b| b == 0) + .unwrap_or(bytes.len() - *pos); + let raw = String::from_utf8(bytes[*pos..*pos + null_pos].to_vec())?; + *pos += null_pos + 1; + raw + } + }; + + repo_list.push(RepoEntry { + id: repo_ids[i], + owner, + suffix, + }); + } + + Ok(repo_list) +} + +/// Encode error batch to buffer +fn encode_error_batch(batch: &ErrorBatch, base_id: u64, base_timestamp: u32, buf: &mut Vec) { + encode_varint(batch.insert_position as u64, buf); + encode_varint(batch.events.len() as u64, buf); + + // Group events by timestamp + let mut intervals: Vec = Vec::new(); + let mut current_ts: Option = None; + + for event in &batch.events { + if current_ts == Some(event.timestamp) { + intervals.last_mut().unwrap().event_count += 1; + } else { + intervals.push(ErrorBatchInterval { + timestamp_offset: event.timestamp.saturating_sub(base_timestamp), + event_count: 1, + }); + current_ts = Some(event.timestamp); + } + } + + // Write interval info + encode_varint(intervals.len() as u64, buf); + for interval in &intervals { + encode_varint(interval.timestamp_offset as u64, buf); + } + for interval in &intervals { + encode_varint(interval.event_count as u64, buf); + } + + // Write columnar data + // Type indices + for event in &batch.events { + buf.push(event.event_type); + } + + // Event ID deltas + let mut prev_id = base_id as i64; + for event in &batch.events { + let delta = event.id as i64 - prev_id; + encode_signed_varint(delta, buf); + prev_id = event.id as i64; + } + + // Repo indices + for event in &batch.events { + encode_varint(event.repo_idx as u64, buf); + } +} + +/// Decode error batch from buffer +fn decode_error_batch( + bytes: &[u8], + pos: &mut usize, + base_id: u64, + base_timestamp: u32, +) -> Result> { + let insert_position = decode_varint(bytes, pos) as usize; + let total_event_count = decode_varint(bytes, pos) as usize; + + // Read interval info + let interval_count = decode_varint(bytes, pos) as usize; + let mut timestamp_offsets: Vec = Vec::with_capacity(interval_count); + for _ in 0..interval_count { + timestamp_offsets.push(decode_varint(bytes, pos) as u32); + } + let mut events_per_interval: Vec = Vec::with_capacity(interval_count); + for _ in 0..interval_count { + events_per_interval.push(decode_varint(bytes, pos) as usize); + } + + // Read columnar data + // Type indices + let type_indices: Vec = bytes[*pos..*pos + total_event_count].to_vec(); + *pos += total_event_count; + + // Event ID deltas + let mut event_ids: Vec = Vec::with_capacity(total_event_count); + let mut prev_id = base_id as i64; + for _ in 0..total_event_count { + let delta = decode_signed_varint(bytes, pos); + prev_id += delta; + event_ids.push(prev_id as u64); + } + + // Repo indices + let mut repo_indices: Vec = Vec::with_capacity(total_event_count); + for _ in 0..total_event_count { + repo_indices.push(decode_varint(bytes, pos) as u32); + } + + // Reconstruct events with timestamps + let mut events: Vec = Vec::with_capacity(total_event_count); + let mut event_idx = 0; + for (interval_idx, &count) in events_per_interval.iter().enumerate() { + let timestamp = base_timestamp + timestamp_offsets[interval_idx]; + for _ in 0..count { + events.push(ParsedEvent { + id: event_ids[event_idx], + event_type: type_indices[event_idx], + repo_idx: repo_indices[event_idx], + timestamp, + }); + event_idx += 1; + } + } + + Ok(ErrorBatch { + insert_position, + events, + }) +} diff --git a/src/main.rs b/src/main.rs index caf292d..57de07d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use std::io::{BufRead, BufReader}; mod agavra; mod codec; +mod cometkim; mod fabinout; mod fulmicoton; mod hachikuji; @@ -19,6 +20,7 @@ mod zstd; use agavra::AgavraCodec; use codec::EventCodec; +use cometkim::CometkimCodec; use fabinout::FabinoutCodec; use hachikuji::HachikujiCodec; use jakedgy::JakedgyCodec; @@ -204,6 +206,7 @@ fn main() -> Result<(), Box> { (Box::new(FulmicotonCodec), &sorted_events), (Box::new(XinyuzengCodec::new()), &sorted_events), (Box::new(KjcaoCodec::new()), &sorted_events), + (Box::new(CometkimCodec::new()), &events), ]; for (codec, expected) in codecs {