diff --git a/common/src/main/java/dev/opendata/common/AppendTimeoutException.java b/common/src/main/java/dev/opendata/common/AppendTimeoutException.java new file mode 100644 index 0000000..0b884b6 --- /dev/null +++ b/common/src/main/java/dev/opendata/common/AppendTimeoutException.java @@ -0,0 +1,13 @@ +package dev.opendata.common; + +/** + * Thrown when an append with a timeout expires before the write queue has space. + * + *

The caller can retry the same batch of records. + */ +public class AppendTimeoutException extends OpenDataNativeException { + + public AppendTimeoutException(String message) { + super(message); + } +} diff --git a/common/src/main/java/dev/opendata/common/QueueFullException.java b/common/src/main/java/dev/opendata/common/QueueFullException.java new file mode 100644 index 0000000..4c2eade --- /dev/null +++ b/common/src/main/java/dev/opendata/common/QueueFullException.java @@ -0,0 +1,13 @@ +package dev.opendata.common; + +/** + * Thrown when a non-blocking append fails because the write queue is full. + * + *

The caller can retry the same batch of records after the queue drains. + */ +public class QueueFullException extends OpenDataNativeException { + + public QueueFullException(String message) { + super(message); + } +} diff --git a/log/native/Cargo.lock b/log/native/Cargo.lock index e2e3083..c29b898 100644 --- a/log/native/Cargo.lock +++ b/log/native/Cargo.lock @@ -47,15 +47,15 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.100" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" [[package]] name = "arc-swap" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d03449bb8ca2cc2ef70869af31463d1ae5ccc8fa3e334b307203fbf815207e" +checksum = "9ded5f9a03ac8f24d1b8a25101ee812cd32cdc8c50a4c50237de2c4915850e73" dependencies = [ "rustversion", ] @@ -199,24 +199,24 @@ checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "bytemuck" -version = "1.24.0" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" dependencies = [ "serde", ] [[package]] name = "cc" -version = "1.2.54" +version = "1.2.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" +checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" dependencies = [ "find-msvc-tools", "jobserver", @@ -272,21 +272,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "common" -version = "0.1.0" -source = "git+https://github.com/opendata-oss/opendata.git#74d36908ffa729652ba665cd335f01b661bcfc0c" -dependencies = [ - "async-trait", - "bytes", - "futures", - "serde", - "slatedb", - "tokio", - "tracing", - "uuid", -] - [[package]] name = "concurrent-queue" version = "2.5.0" @@ -548,7 +533,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5666e8ca4ec174d896fb742789c29b1bea9319dcfd623c41bececc0a60c4939d" dependencies = [ - "log 0.4.29", + "log", "once_cell", "rand 0.8.5", ] @@ -577,9 +562,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "flatbuffers" @@ -1027,14 +1012,13 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "base64", "bytes", "futures-channel", - "futures-core", "futures-util", "http", "http-body", @@ -1059,7 +1043,7 @@ dependencies = [ "core-foundation-sys", "iana-time-zone-haiku", "js-sys", - "log 0.4.29", + "log", "wasm-bindgen", "windows-core 0.62.2", ] @@ -1251,7 +1235,7 @@ dependencies = [ "cfg-if", "combine", "jni-sys", - "log 0.4.29", + "log", "thiserror 1.0.69", "walkdir", "windows-sys 0.45.0", @@ -1316,24 +1300,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "log" -version = "0.1.0" -source = "git+https://github.com/opendata-oss/opendata.git#74d36908ffa729652ba665cd335f01b661bcfc0c" -dependencies = [ - "async-trait", - "bytes", - "common", - "prost", - "serde", - "serde_json", - "serde_with", - "slatedb", - "tokio", - "tracing", - "tracing-subscriber", -] - [[package]] name = "log" version = "0.4.29" @@ -1441,9 +1407,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.6" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "memoffset" @@ -1584,14 +1550,48 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "opendata-common" +version = "0.1.6" +source = "git+https://github.com/opendata-oss/opendata.git#9ca32ec3ca66b9403efe9aa313c88d73cf96c5e6" +dependencies = [ + "async-trait", + "bytes", + "futures", + "serde", + "slatedb", + "tokio", + "tokio-util", + "tracing", + "uuid", +] + +[[package]] +name = "opendata-log" +version = "0.2.1" +source = "git+https://github.com/opendata-oss/opendata.git#9ca32ec3ca66b9403efe9aa313c88d73cf96c5e6" +dependencies = [ + "async-trait", + "bytes", + "opendata-common", + "prost", + "serde", + "serde_json", + "serde_with", + "slatedb", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "opendata-log-jni" version = "0.1.0" dependencies = [ "bytes", - "common", "jni", - "log 0.1.0", + "opendata-common", + "opendata-log", "tokio", ] @@ -1999,9 +1999,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" dependencies = [ "aho-corasick", "memchr", @@ -2010,9 +2010,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" [[package]] name = "reqwest" @@ -2032,7 +2032,7 @@ dependencies = [ "hyper-rustls", "hyper-util", "js-sys", - "log 0.4.29", + "log", "percent-encoding", "pin-project-lite", "quinn", @@ -2199,9 +2199,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e910108742c57a770f492731f99be216a52fadd361b06c8fb59d74ccc267d2" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ "dyn-clone", "ref-cast", @@ -2329,7 +2329,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.13.0", "schemars 0.9.0", - "schemars 1.2.0", + "schemars 1.2.1", "serde_core", "serde_json", "serde_with_macros", @@ -2394,14 +2394,15 @@ checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" [[package]] name = "slab" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "slatedb" -version = "0.10.0" -source = "git+https://github.com/slatedb/slatedb.git?branch=main#c02305e2a80b26df7c2a3feae42c1c639b69baa9" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "588e9ae32019696205a05e54e4456486e8d11b69c72662739be853736833b3dd" dependencies = [ "anyhow", "async-trait", @@ -2420,7 +2421,7 @@ dependencies = [ "flatbuffers", "foyer", "futures", - "log 0.4.29", + "log", "object_store", "once_cell", "ouroboros", @@ -2430,8 +2431,6 @@ dependencies = [ "serde", "serde_json", "siphasher", - "slatedb-common", - "slatedb-txn-obj", "sysinfo", "thiserror 1.0.69", "thread_local", @@ -2444,30 +2443,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "slatedb-common" -version = "0.10.0" -source = "git+https://github.com/slatedb/slatedb.git?branch=main#c02305e2a80b26df7c2a3feae42c1c639b69baa9" -dependencies = [ - "chrono", - "tokio", -] - -[[package]] -name = "slatedb-txn-obj" -version = "0.10.0" -source = "git+https://github.com/slatedb/slatedb.git?branch=main#c02305e2a80b26df7c2a3feae42c1c639b69baa9" -dependencies = [ - "async-trait", - "bytes", - "chrono", - "futures", - "log 0.4.29", - "object_store", - "slatedb-common", - "thiserror 1.0.69", -] - [[package]] name = "smallvec" version = "1.15.1" @@ -2630,9 +2605,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.46" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", @@ -2651,9 +2626,9 @@ checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.26" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", @@ -2868,7 +2843,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ - "log 0.4.29", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2901,7 +2876,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "log 0.4.29", + "log", "once_cell", "tracing-core", ] @@ -3622,18 +3597,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.36" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dafd85c832c1b68bbb4ec0c72c7f6f4fc5179627d2bc7c26b30e4c0cc11e76cc" +checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.36" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cb7e4e8436d9db52fbd6625dbf2f45243ab84994a72882ec8227b99e72b439a" +checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", @@ -3702,9 +3677,9 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.17" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" +checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" [[package]] name = "zstd" diff --git a/log/native/Cargo.toml b/log/native/Cargo.toml index fdf7685..3fb826c 100644 --- a/log/native/Cargo.toml +++ b/log/native/Cargo.toml @@ -11,5 +11,5 @@ jni = "0.21" bytes = "1" tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] } -log = { git = "https://github.com/opendata-oss/opendata.git" } -common = { git = "https://github.com/opendata-oss/opendata.git" } +log = { package = "opendata-log", git = "https://github.com/opendata-oss/opendata.git" } +common = { package = "opendata-common", git = "https://github.com/opendata-oss/opendata.git" } diff --git a/log/native/src/lib.rs b/log/native/src/lib.rs index 50c7b4a..cba1143 100644 --- a/log/native/src/lib.rs +++ b/log/native/src/lib.rs @@ -72,7 +72,8 @@ use common::storage::config::{ }; use common::StorageRuntime; use log::{ - AppendResult, Config, LogDb, LogDbBuilder, LogDbReader, LogEntry, LogRead, ReaderConfig, Record, + AppendError, AppendOutput, Config, LogDb, LogDbBuilder, LogDbReader, LogEntry, LogRead, + ReaderConfig, Record, SegmentConfig, }; /// Handle to a LogDb instance with its associated Tokio runtime. @@ -117,9 +118,18 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeCreate<'local>( } }; + // Extract segment config from LogDbConfig + let segment_config = match extract_segment_config(&mut env, &config) { + Ok(c) => c, + Err(e) => { + let _ = env.throw_new("java/lang/IllegalArgumentException", e); + return 0; + } + }; + let config = Config { storage: storage_config, - ..Config::default() + segmentation: segment_config, }; // Create a dedicated runtime for this LogDb instance (for user operations) @@ -346,6 +356,43 @@ fn extract_object_store_config( } } +/// Extracts SegmentConfig from a Java LogDbConfig object. +fn extract_segment_config( + env: &mut JNIEnv<'_>, + config: &JObject<'_>, +) -> Result { + let seg_obj = env + .call_method( + config, + "segmentation", + "()Ldev/opendata/SegmentConfig;", + &[], + ) + .map_err(|e| format!("Failed to get segmentation: {}", e))? + .l() + .map_err(|e| format!("Failed to get segmentation object: {}", e))?; + + // Get sealIntervalMs (nullable Long) + let interval_obj = env + .call_method(&seg_obj, "sealIntervalMs", "()Ljava/lang/Long;", &[]) + .map_err(|e| format!("Failed to get sealIntervalMs: {}", e))? + .l() + .map_err(|e| format!("Failed to get sealIntervalMs object: {}", e))?; + + let seal_interval = if interval_obj.is_null() { + None + } else { + let ms = env + .call_method(&interval_obj, "longValue", "()J", &[]) + .map_err(|e| format!("Failed to unbox sealIntervalMs: {}", e))? + .j() + .map_err(|e| format!("Failed to get long value: {}", e))?; + Some(std::time::Duration::from_millis(ms as u64)) + }; + + Ok(SegmentConfig { seal_interval }) +} + /// Extracts StorageConfig from a Java LogDbReaderConfig object. fn extract_reader_storage_config( env: &mut JNIEnv<'_>, @@ -413,7 +460,7 @@ fn extract_refresh_interval( Ok(Some(std::time::Duration::from_millis(interval_ms as u64))) } -/// Appends a batch of records to the log with timestamp headers. +/// Appends a batch of records without blocking for queue space. /// /// Each value is stored as: `[8-byte timestamp (big-endian i64)] + [original payload]` /// The timestamp is read from each Java Record object (captured at submission time). @@ -429,7 +476,7 @@ fn extract_refresh_interval( /// JNI function - handle must be a valid pointer returned by nativeCreate. #[no_mangle] #[allow(clippy::not_unsafe_ptr_arg_deref)] -pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( +pub extern "system" fn Java_dev_opendata_LogDb_nativeTryAppend<'local>( mut env: JNIEnv<'local>, _class: JClass<'local>, handle: jlong, @@ -442,13 +489,94 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( let log_handle = unsafe { &*(handle as *const LogHandle) }; - // Convert Java Record[] to Rust Vec + let (rust_records, first_timestamp_ms) = match convert_java_records(&mut env, records) { + Ok(v) => v, + Err(()) => return std::ptr::null_mut(), // exception already thrown + }; + + let result = log_handle + .runtime_handle + .block_on(async { log_handle.log.try_append(rust_records).await }); + + match result { + Ok(append_output) => { + match create_append_result(&mut env, &append_output, first_timestamp_ms) { + Ok(obj) => obj.into_raw(), + Err(e) => { + let _ = + env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); + std::ptr::null_mut() + } + } + } + Err(e) => { + throw_append_error(&mut env, e); + std::ptr::null_mut() + } + } +} + +/// Appends a batch of records, blocking up to `timeoutMs` for queue space. +/// +/// # Safety +/// JNI function - handle must be a valid pointer returned by nativeCreate. +#[no_mangle] +#[allow(clippy::not_unsafe_ptr_arg_deref)] +pub extern "system" fn Java_dev_opendata_LogDb_nativeAppendTimeout<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + handle: jlong, + records: jobjectArray, + timeout_ms: jlong, +) -> jobject { + if handle == 0 { + let _ = env.throw_new("java/lang/NullPointerException", "LogDb handle is null"); + return std::ptr::null_mut(); + } + + let log_handle = unsafe { &*(handle as *const LogHandle) }; + + let (rust_records, first_timestamp_ms) = match convert_java_records(&mut env, records) { + Ok(v) => v, + Err(()) => return std::ptr::null_mut(), + }; + + let timeout = std::time::Duration::from_millis(timeout_ms as u64); + let result = log_handle + .runtime_handle + .block_on(async { log_handle.log.append_timeout(rust_records, timeout).await }); + + match result { + Ok(append_output) => { + match create_append_result(&mut env, &append_output, first_timestamp_ms) { + Ok(obj) => obj.into_raw(), + Err(e) => { + let _ = + env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); + std::ptr::null_mut() + } + } + } + Err(e) => { + throw_append_error(&mut env, e); + std::ptr::null_mut() + } + } +} + +/// Converts a Java Record[] into a Rust `Vec` and the first record's timestamp. +/// +/// Returns `Err(())` if a JNI exception was thrown (caller should return null). +fn convert_java_records( + env: &mut JNIEnv<'_>, + records: jobjectArray, +) -> Result<(Vec, i64), ()> { let records_array = unsafe { JObjectArray::from_raw(records) }; let len = match env.get_array_length(&records_array) { Ok(l) => l as usize, Err(e) => { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - return std::ptr::null_mut(); + return Err(()); } }; @@ -457,7 +585,7 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( "java/lang/IllegalArgumentException", "Records array is empty", ); - return std::ptr::null_mut(); + return Err(()); } let mut rust_records = Vec::with_capacity(len); @@ -468,7 +596,7 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( Ok(obj) => obj, Err(e) => { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - return std::ptr::null_mut(); + return Err(()); } }; @@ -477,7 +605,7 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( Ok(v) => v.l().unwrap(), Err(e) => { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - return std::ptr::null_mut(); + return Err(()); } }; let key_array: JByteArray = key_obj.into(); @@ -485,7 +613,7 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( Ok(b) => Bytes::from(b), Err(e) => { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - return std::ptr::null_mut(); + return Err(()); } }; @@ -494,7 +622,7 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( Ok(v) => v.l().unwrap(), Err(e) => { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - return std::ptr::null_mut(); + return Err(()); } }; let value_array: JByteArray = value_obj.into(); @@ -504,7 +632,7 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( Ok(v) => v.j().unwrap(), Err(e) => { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - return std::ptr::null_mut(); + return Err(()); } }; @@ -513,11 +641,11 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( } // Convert value with timestamp header - let value_bytes = match copy_value_with_timestamp(&mut env, &value_array, timestamp_ms) { + let value_bytes = match copy_value_with_timestamp(env, &value_array, timestamp_ms) { Ok(b) => b, Err(e) => { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - return std::ptr::null_mut(); + return Err(()); } }; @@ -527,26 +655,35 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeAppend<'local>( }); } - // Use block_on with separate compaction runtime to avoid deadlocks - let result = log_handle - .runtime_handle - .block_on(async { log_handle.log.append(rust_records).await }); + Ok((rust_records, first_timestamp_ms)) +} - match result { - Ok(append_result) => { - // Create Java AppendResult object with first record's timestamp - match create_append_result(&mut env, &append_result, first_timestamp_ms) { - Ok(obj) => obj.into_raw(), - Err(e) => { - let _ = - env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - std::ptr::null_mut() - } - } +/// Maps an `AppendError` to the appropriate Java exception and throws it. +fn throw_append_error(env: &mut JNIEnv<'_>, error: AppendError) { + match error { + AppendError::QueueFull(_) => { + let _ = env.throw_new( + "dev/opendata/common/QueueFullException", + "Write queue is full", + ); } - Err(e) => { - let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); - std::ptr::null_mut() + AppendError::Timeout(_) => { + let _ = env.throw_new( + "dev/opendata/common/AppendTimeoutException", + "Timed out waiting for queue space", + ); + } + AppendError::Shutdown => { + let _ = env.throw_new( + "dev/opendata/common/OpenDataNativeException", + "Writer has shut down", + ); + } + AppendError::InvalidRecord(msg) => { + let _ = env.throw_new( + "dev/opendata/common/OpenDataNativeException", + format!("Invalid record: {}", msg), + ); } } } @@ -628,8 +765,11 @@ pub extern "system" fn Java_dev_opendata_LogDb_nativeClose<'local>( compaction_runtime, } = *log_handle; - // Close the log using block_on - let result = runtime_handle.block_on(async { log.close().await }); + // Flush pending writes then close the log + let result = runtime_handle.block_on(async { + log.flush().await?; + log.close().await + }); if let Err(e) = result { let _ = env.throw_new("dev/opendata/common/OpenDataNativeException", e.to_string()); @@ -883,10 +1023,10 @@ pub extern "system" fn Java_dev_opendata_LogDbReader_nativeClose<'local>( // Helper Functions // ============================================================================= -/// Creates a Java AppendResult object from a Rust AppendResult. +/// Creates a Java AppendResult object from a Rust AppendOutput. fn create_append_result<'local>( env: &mut JNIEnv<'local>, - result: &AppendResult, + result: &AppendOutput, timestamp_ms: i64, ) -> Result, jni::errors::Error> { let class = env.find_class("dev/opendata/AppendResult")?; diff --git a/log/src/main/java/dev/opendata/LogDb.java b/log/src/main/java/dev/opendata/LogDb.java index 3b46a46..74fa46f 100644 --- a/log/src/main/java/dev/opendata/LogDb.java +++ b/log/src/main/java/dev/opendata/LogDb.java @@ -1,5 +1,8 @@ package dev.opendata; +import dev.opendata.common.AppendTimeoutException; +import dev.opendata.common.QueueFullException; + import java.io.Closeable; import java.util.List; @@ -53,31 +56,65 @@ public static LogDb openInMemory() { } /** - * Appends a batch of records to the log. + * Appends a batch of records without blocking for queue space. * - *

This is a blocking call that returns when all records have been persisted. - * For better throughput, batch multiple records into a single call. + *

Fails immediately with {@link QueueFullException} if the write queue is full. + * The caller can retry the same {@code records} array after backpressure clears. * * @param records the records to append * @return the result of the append operation (sequence of first record) + * @throws QueueFullException if the write queue is full */ - public AppendResult append(Record[] records) { + public AppendResult tryAppend(Record[] records) { checkNotClosed(); - return nativeAppend(handle, records); + return nativeTryAppend(handle, records); } /** - * Appends a single record to the log. + * Appends a single record without blocking for queue space. * *

Convenience method for single-record appends. For better throughput, - * prefer {@link #append(Record[])} with batched records. + * prefer {@link #tryAppend(Record[])} with batched records. * * @param key the key to append under * @param value the value to append * @return the result of the append operation + * @throws QueueFullException if the write queue is full + */ + public AppendResult tryAppend(byte[] key, byte[] value) { + return tryAppend(new Record[]{new Record(key, value)}); + } + + /** + * Appends a batch of records, blocking up to {@code timeoutMs} for queue space. + * + *

If the write queue does not drain within the deadline, throws + * {@link AppendTimeoutException}. The caller can retry the same {@code records} + * array. + * + * @param records the records to append + * @param timeoutMs maximum time to wait in milliseconds + * @return the result of the append operation (sequence of first record) + * @throws AppendTimeoutException if the timeout expires before queue space is available + * @throws QueueFullException if the write queue is full (unlikely with timeout) + */ + public AppendResult appendTimeout(Record[] records, long timeoutMs) { + checkNotClosed(); + return nativeAppendTimeout(handle, records, timeoutMs); + } + + /** + * Appends a single record, blocking up to {@code timeoutMs} for queue space. + * + * @param key the key to append under + * @param value the value to append + * @param timeoutMs maximum time to wait in milliseconds + * @return the result of the append operation + * @throws AppendTimeoutException if the timeout expires before queue space is available + * @throws QueueFullException if the write queue is full (unlikely with timeout) */ - public AppendResult append(byte[] key, byte[] value) { - return append(new Record[]{new Record(key, value)}); + public AppendResult appendTimeout(byte[] key, byte[] value, long timeoutMs) { + return appendTimeout(new Record[]{new Record(key, value)}, timeoutMs); } @Override @@ -119,7 +156,8 @@ long getHandle() { // Native methods private static native long nativeCreate(LogDbConfig config); - private static native AppendResult nativeAppend(long handle, Record[] records); + private static native AppendResult nativeTryAppend(long handle, Record[] records); + private static native AppendResult nativeAppendTimeout(long handle, Record[] records, long timeoutMs); private static native LogEntry[] nativeScan(long handle, byte[] key, long startSequence, long maxEntries); private static native void nativeFlush(long handle); private static native void nativeClose(long handle); diff --git a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java index 864d1ec..e18ae57 100644 --- a/log/src/test/java/dev/opendata/LogDbIntegrationTest.java +++ b/log/src/test/java/dev/opendata/LogDbIntegrationTest.java @@ -43,7 +43,7 @@ void shouldAppendAndReadSingleRecord() { byte[] key = "test-key".getBytes(StandardCharsets.UTF_8); byte[] value = "test-value".getBytes(StandardCharsets.UTF_8); - AppendResult result = log.append(key, value); + AppendResult result = log.tryAppend(key, value); assertThat(result.sequence()).isEqualTo(0); @@ -65,7 +65,7 @@ void shouldAppendBatchOfRecords() { new Record(key, "value-2".getBytes(StandardCharsets.UTF_8)), }; - AppendResult result = log.append(records); + AppendResult result = log.tryAppend(records); assertThat(result.sequence()).isEqualTo(0); @@ -85,9 +85,9 @@ void shouldAssignSequentialSequencesAcrossAppends() { try (LogDb log = LogDb.openInMemory()) { byte[] key = "seq-key".getBytes(StandardCharsets.UTF_8); - log.append(key, "first".getBytes(StandardCharsets.UTF_8)); - log.append(key, "second".getBytes(StandardCharsets.UTF_8)); - AppendResult third = log.append(key, "third".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "first".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "second".getBytes(StandardCharsets.UTF_8)); + AppendResult third = log.tryAppend(key, "third".getBytes(StandardCharsets.UTF_8)); assertThat(third.sequence()).isEqualTo(2); @@ -104,9 +104,9 @@ void shouldReadFromStartSequence() { try (LogDb log = LogDb.openInMemory()) { byte[] key = "offset-key".getBytes(StandardCharsets.UTF_8); - log.append(key, "value-0".getBytes(StandardCharsets.UTF_8)); - log.append(key, "value-1".getBytes(StandardCharsets.UTF_8)); - log.append(key, "value-2".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "value-1".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "value-2".getBytes(StandardCharsets.UTF_8)); // Read starting from sequence 1 List entries = log.scan(key, 1, 10); @@ -122,7 +122,7 @@ void shouldRespectMaxEntries() { byte[] key = "limit-key".getBytes(StandardCharsets.UTF_8); for (int i = 0; i < 10; i++) { - log.append(key, ("value-" + i).getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, ("value-" + i).getBytes(StandardCharsets.UTF_8)); } List entries = log.scan(key, 0, 3); @@ -134,7 +134,7 @@ void shouldRespectMaxEntries() { void shouldReturnEmptyListForUnknownKey() { try (LogDb log = LogDb.openInMemory()) { byte[] key = "known".getBytes(StandardCharsets.UTF_8); - log.append(key, "value".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(key, "value".getBytes(StandardCharsets.UTF_8)); byte[] unknownKey = "unknown".getBytes(StandardCharsets.UTF_8); List entries = log.scan(unknownKey, 0, 10); @@ -148,9 +148,9 @@ void shouldIsolateEntriesByKey() { byte[] keyA = "key-a".getBytes(StandardCharsets.UTF_8); byte[] keyB = "key-b".getBytes(StandardCharsets.UTF_8); - log.append(keyA, "value-a-0".getBytes(StandardCharsets.UTF_8)); - log.append(keyB, "value-b-0".getBytes(StandardCharsets.UTF_8)); - log.append(keyA, "value-a-1".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(keyA, "value-a-0".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(keyB, "value-b-0".getBytes(StandardCharsets.UTF_8)); + log.tryAppend(keyA, "value-a-1".getBytes(StandardCharsets.UTF_8)); List entriesA = log.scan(keyA, 0, 10); assertThat(entriesA).hasSize(2); @@ -171,7 +171,7 @@ void shouldThrowWhenOperatingOnClosedLog() { byte[] key = "key".getBytes(StandardCharsets.UTF_8); byte[] value = "value".getBytes(StandardCharsets.UTF_8); - assertThatThrownBy(() -> log.append(key, value)) + assertThatThrownBy(() -> log.tryAppend(key, value)) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("closed"); } @@ -189,7 +189,7 @@ void shouldOpenWithSlateDbLocalConfig(@TempDir Path tempDir) { byte[] key = "persistent-key".getBytes(StandardCharsets.UTF_8); byte[] value = "persistent-value".getBytes(StandardCharsets.UTF_8); - log.append(key, value); + log.tryAppend(key, value); List entries = log.scan(key, 0, 10); assertThat(entries).hasSize(1); @@ -206,7 +206,7 @@ void shouldHandleLargeValues() { largeValue[i] = (byte) (i % 256); } - log.append(key, largeValue); + log.tryAppend(key, largeValue); List entries = log.scan(key, 0, 10); assertThat(entries).hasSize(1); @@ -221,7 +221,7 @@ void shouldPreserveTimestamp() { byte[] value = "ts-value".getBytes(StandardCharsets.UTF_8); long beforeAppend = System.currentTimeMillis(); - log.append(key, value); + log.tryAppend(key, value); long afterAppend = System.currentTimeMillis(); List entries = log.scan(key, 0, 10); @@ -244,11 +244,12 @@ void shouldReadFromSeparateLogDbReader(@TempDir Path tempDir) { byte[] key = "e2e-key".getBytes(StandardCharsets.UTF_8); - // Write with LogDb + // Write with LogDb and flush to ensure durability before reader opens try (LogDb writer = LogDb.open(writerConfig)) { - writer.append(key, "value-0".getBytes(StandardCharsets.UTF_8)); - writer.append(key, "value-1".getBytes(StandardCharsets.UTF_8)); - writer.append(key, "value-2".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-1".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-2".getBytes(StandardCharsets.UTF_8)); + writer.flush(); } // Read with separate LogDbReader @@ -279,7 +280,7 @@ void shouldCoexistWriterAndReaderWithoutFencingError(@TempDir Path tempDir) { // Open writer and keep it open try (LogDb writer = LogDb.open(writerConfig)) { // Write initial data - writer.append(key, "value-0".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); // Open reader while writer is still open - this should NOT cause fencing error try (LogDbReader reader = LogDbReader.open(readerConfig)) { @@ -289,12 +290,12 @@ void shouldCoexistWriterAndReaderWithoutFencingError(@TempDir Path tempDir) { assertThat(new String(entries.get(0).value(), StandardCharsets.UTF_8)).isEqualTo("value-0"); // Writer can still write more data while reader is open - writer.append(key, "value-1".getBytes(StandardCharsets.UTF_8)); - writer.append(key, "value-2".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-1".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-2".getBytes(StandardCharsets.UTF_8)); } // After reader closes, writer should still work - writer.append(key, "value-3".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-3".getBytes(StandardCharsets.UTF_8)); List finalEntries = writer.scan(key, 0, 10); assertThat(finalEntries).hasSize(4); @@ -313,9 +314,10 @@ void shouldOpenReaderWithCustomRefreshInterval(@TempDir Path tempDir) { byte[] key = "refresh-key".getBytes(StandardCharsets.UTF_8); - // Write with LogDb + // Write with LogDb and flush to ensure durability before reader opens try (LogDb writer = LogDb.open(writerConfig)) { - writer.append(key, "value-0".getBytes(StandardCharsets.UTF_8)); + writer.tryAppend(key, "value-0".getBytes(StandardCharsets.UTF_8)); + writer.flush(); } // Read with LogDbReader using custom refresh interval