Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/vss-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ jobs:
cd ldk-node
export TEST_VSS_BASE_URL="http://localhost:8080/vss"
RUSTFLAGS="--cfg vss_test" cargo test io::vss_store
RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss
RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss -- --nocapture
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
log = { version = "0.4.22", default-features = false, features = ["std"]}

vss-client = "0.3"
#vss-client-ng = "0.3"
vss-client-ng = { git = "https://github.com/tnull/vss-client", rev = "7cf661b4ba45983ecad0f59e6d74050e2c84212f" }
prost = { version = "0.11.6", default-features = false}

[target.'cfg(windows)'.dependencies]
Expand Down
8 changes: 6 additions & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use lightning::util::persist::{
use lightning::util::ser::ReadableArgs;
use lightning::util::sweep::OutputSweeper;
use lightning_persister::fs_store::FilesystemStore;
use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider};
use vss_client_ng::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider};

use crate::chain::ChainSource;
use crate::config::{
Expand Down Expand Up @@ -732,7 +732,11 @@ impl NodeBuilder {
let vss_seed_bytes: [u8; 32] = vss_xprv.private_key.secret_bytes();

let vss_store =
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider, Arc::clone(&runtime));
VssStore::new(vss_url, store_id, vss_seed_bytes, header_provider).map_err(|e| {
log_error!(logger, "Failed to setup store: {}", e);
BuildError::KVStoreSetupFailed
})?;

build_with_store_internal(
config,
self.chain_data_source_config.as_ref(),
Expand Down
2 changes: 1 addition & 1 deletion src/ffi/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use lightning_liquidity::lsps1::msgs::{
};
pub use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret};
pub use lightning_types::string::UntrustedString;
pub use vss_client::headers::{VssHeaderProvider, VssHeaderProviderError};
pub use vss_client_ng::headers::{VssHeaderProvider, VssHeaderProviderError};

use crate::builder::sanitize_alias;
pub use crate::config::{
Expand Down
189 changes: 82 additions & 107 deletions src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,37 @@ use std::panic::RefUnwindSafe;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
use lightning::io::{self, Error, ErrorKind};
use lightning::util::persist::{KVStore, KVStoreSync};
use prost::Message;
use rand::RngCore;
use vss_client::client::VssClient;
use vss_client::error::VssError;
use vss_client::headers::VssHeaderProvider;
use vss_client::types::{
use vss_client_ng::client::VssClient;
use vss_client_ng::error::VssError;
use vss_client_ng::headers::VssHeaderProvider;
use vss_client_ng::types::{
DeleteObjectRequest, GetObjectRequest, KeyValue, ListKeyVersionsRequest, PutObjectRequest,
Storable,
};
use vss_client::util::key_obfuscator::KeyObfuscator;
use vss_client::util::retry::{
ExponentialBackoffRetryPolicy, FilteredRetryPolicy, JitteredRetryPolicy,
MaxAttemptsRetryPolicy, MaxTotalDelayRetryPolicy, RetryPolicy,
};
use vss_client::util::storable_builder::{EntropySource, StorableBuilder};
use vss_client_ng::util::key_obfuscator::KeyObfuscator;
use vss_client_ng::util::storable_builder::{EntropySource, StorableBuilder};

use crate::io::utils::check_namespace_key_validity;
use crate::runtime::Runtime;

type CustomRetryPolicy = FilteredRetryPolicy<
JitteredRetryPolicy<
MaxTotalDelayRetryPolicy<MaxAttemptsRetryPolicy<ExponentialBackoffRetryPolicy<VssError>>>,
>,
Box<dyn Fn(&VssError) -> bool + 'static + Send + Sync>,
>;

// We set this to a small number of threads that would still allow to make some progress if one
// would hit a blocking case
const INTERNAL_RUNTIME_WORKERS: usize = 2;
const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5);

const HTTP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
const HTTP_RETRIES: u32 = 10;

/// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
pub struct VssStore {
inner: Arc<VssStoreInner>,
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
// operations aren't sensitive to the order of execution.
next_version: AtomicU64,
runtime: Arc<Runtime>,
// A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
// blocking task to finish while the blocked thread had acquired the reactor. In particular,
// this works around a previously-hit case where a concurrent call to
Expand All @@ -68,9 +56,9 @@ pub struct VssStore {
impl VssStore {
pub(crate) fn new(
base_url: String, store_id: String, vss_seed: [u8; 32],
header_provider: Arc<dyn VssHeaderProvider>, runtime: Arc<Runtime>,
) -> Self {
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider));
header_provider: Arc<dyn VssHeaderProvider>,
) -> io::Result<Self> {
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)?);
let next_version = AtomicU64::new(1);
let internal_runtime = Some(
tokio::runtime::Builder::new_multi_thread()
Expand All @@ -86,7 +74,7 @@ impl VssStore {
.unwrap(),
);

Self { inner, next_version, runtime, internal_runtime }
Ok(Self { inner, next_version, internal_runtime })
}

// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
Expand Down Expand Up @@ -131,15 +119,7 @@ impl KVStoreSync for VssStore {
let inner = Arc::clone(&self.inner);
let fut =
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::read timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}

fn write(
Expand Down Expand Up @@ -169,15 +149,7 @@ impl KVStoreSync for VssStore {
)
.await
};
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::write timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}

fn remove(
Expand Down Expand Up @@ -206,15 +178,7 @@ impl KVStoreSync for VssStore {
)
.await
};
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::remove timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}

fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
Expand All @@ -227,15 +191,7 @@ impl KVStoreSync for VssStore {
let secondary_namespace = secondary_namespace.to_string();
let inner = Arc::clone(&self.inner);
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
// times out.
let spawned_fut = internal_runtime.spawn(async move {
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
let msg = "VssStore::list timed out";
Error::new(ErrorKind::Other, msg)
})
});
self.runtime.block_on(spawned_fut).expect("We should always finish")?
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}
}

Expand Down Expand Up @@ -314,9 +270,9 @@ impl Drop for VssStore {
}

struct VssStoreInner {
client: VssClient<CustomRetryPolicy>,
client: VssClient,
store_id: String,
storable_builder: StorableBuilder<RandEntropySource>,
data_encryption_key: [u8; 32],
key_obfuscator: KeyObfuscator,
// Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
// The lock also encapsulates the latest written version per key.
Expand All @@ -327,27 +283,17 @@ impl VssStoreInner {
pub(crate) fn new(
base_url: String, store_id: String, vss_seed: [u8; 32],
header_provider: Arc<dyn VssHeaderProvider>,
) -> Self {
) -> io::Result<Self> {
let (data_encryption_key, obfuscation_master_key) =
derive_data_encryption_and_obfuscation_keys(&vss_seed);
let key_obfuscator = KeyObfuscator::new(obfuscation_master_key);
let storable_builder = StorableBuilder::new(data_encryption_key, RandEntropySource);
let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(10))
.with_max_attempts(10)
.with_max_total_delay(Duration::from_secs(15))
.with_max_jitter(Duration::from_millis(10))
.skip_retry_on_error(Box::new(|e: &VssError| {
matches!(
e,
VssError::NoSuchKeyError(..)
| VssError::InvalidRequestError(..)
| VssError::ConflictError(..)
)
}) as _);

let client = VssClient::new_with_headers(base_url, retry_policy, header_provider);
let reqwest_client = build_client(&base_url).map_err(|_| {
let msg = format!("Failed to setup HTTP client: invalid URL");
Error::new(ErrorKind::Other, msg)
})?;
let client = VssClient::from_client_and_headers(base_url, reqwest_client, header_provider);
let locks = Mutex::new(HashMap::new());
Self { client, store_id, storable_builder, key_obfuscator, locks }
Ok(Self { client, store_id, data_encryption_key, key_obfuscator, locks })
}

fn get_inner_lock_ref(&self, locking_key: String) -> Arc<tokio::sync::Mutex<u64>> {
Expand Down Expand Up @@ -413,9 +359,8 @@ impl VssStoreInner {
) -> io::Result<Vec<u8>> {
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;

let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key };
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
let request = GetObjectRequest { store_id: self.store_id.clone(), key: store_key.clone() };
let resp = self.client.get_object(&request).await.map_err(|e| {
let msg = format!(
"Failed to read from key {}/{}/{}: {}",
Expand All @@ -437,7 +382,11 @@ impl VssStoreInner {
Error::new(ErrorKind::Other, msg)
})?;

Ok(self.storable_builder.deconstruct(storable)?.0)
let storable_builder = StorableBuilder::new(RandEntropySource);
let decrypted = storable_builder
.deconstruct(storable, &self.data_encryption_key, store_key.as_bytes())?
.0;
Ok(decrypted)
}

async fn write_internal(
Expand All @@ -451,22 +400,27 @@ impl VssStoreInner {
"write",
)?;

self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
let obfuscated_key =
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
let vss_version = -1;
let storable = self.storable_builder.build(buf, vss_version);
let request = PutObjectRequest {
store_id: self.store_id.clone(),
global_version: None,
transaction_items: vec![KeyValue {
key: obfuscated_key,
version: vss_version,
value: storable.encode_to_vec(),
}],
delete_items: vec![],
};
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
let vss_version = -1;
let storable_builder = StorableBuilder::new(RandEntropySource);
let storable = storable_builder.build(
buf.to_vec(),
vss_version,
&self.data_encryption_key,
store_key.as_bytes(),
);
let request = PutObjectRequest {
store_id: self.store_id.clone(),
global_version: None,
transaction_items: vec![KeyValue {
key: store_key,
version: vss_version,
value: storable.encode_to_vec(),
}],
delete_items: vec![],
};

self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
self.client.put_object(&request).await.map_err(|e| {
let msg = format!(
"Failed to write to key {}/{}/{}: {}",
Expand Down Expand Up @@ -574,6 +528,32 @@ impl VssStoreInner {
}
}

fn build_client(base_url: &str) -> Result<reqwest::Client, ()> {
let url = reqwest::Url::parse(base_url).map_err(|_| ())?;
let host_str = url.host_str().ok_or(())?.to_string();
let retry = reqwest::retry::for_host(host_str)
.max_retries_per_request(HTTP_RETRIES)
.classify_fn(|req_rep| match req_rep.status() {
// VSS uses INTERNAL_SERVER_ERROR when sending back error repsonses. These are
// currently still covered by our `RetryPolicy`, so we tell `reqwest` not to retry them.
Some(reqwest::StatusCode::INTERNAL_SERVER_ERROR) => req_rep.success(),
Some(reqwest::StatusCode::BAD_REQUEST) => req_rep.success(),
Some(reqwest::StatusCode::UNAUTHORIZED) => req_rep.success(),
Some(reqwest::StatusCode::NOT_FOUND) => req_rep.success(),
Some(reqwest::StatusCode::CONFLICT) => req_rep.success(),
Some(reqwest::StatusCode::OK) => req_rep.success(),
_ => req_rep.retryable(),
});
let client = reqwest::Client::builder()
.timeout(HTTP_TIMEOUT)
.connect_timeout(HTTP_TIMEOUT)
.read_timeout(HTTP_TIMEOUT)
.retry(retry)
.build()
.unwrap();
Ok(client)
}

fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32], [u8; 32]) {
let hkdf = |initial_key_material: &[u8], salt: &[u8]| -> [u8; 32] {
let mut engine = HmacEngine::<sha256::Hash>::new(salt);
Expand Down Expand Up @@ -606,11 +586,10 @@ mod tests {

use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng, RngCore};
use vss_client::headers::FixedHeaders;
use vss_client_ng::headers::FixedHeaders;

use super::*;
use crate::io::test_utils::do_read_write_remove_list_persist;
use crate::logger::Logger;

#[test]
fn vss_read_write_remove_list_persist() {
Expand All @@ -620,10 +599,8 @@ mod tests {
let mut vss_seed = [0u8; 32];
rng.fill_bytes(&mut vss_seed);
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
let logger = Arc::new(Logger::new_log_facade());
let runtime = Arc::new(Runtime::new(logger).unwrap());
let vss_store =
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();

do_read_write_remove_list_persist(&vss_store);
}
Expand All @@ -636,10 +613,8 @@ mod tests {
let mut vss_seed = [0u8; 32];
rng.fill_bytes(&mut vss_seed);
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
let logger = Arc::new(Logger::new_log_facade());
let runtime = Arc::new(Runtime::new(logger).unwrap());
let vss_store =
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime);
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();

do_read_write_remove_list_persist(&vss_store);
drop(vss_store)
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub use types::{
};
pub use {
bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio,
vss_client,
vss_client_ng,
};

use crate::scoring::setup_background_pathfinding_scores_sync;
Expand Down
Loading