Skip to content

Commit 7810d96

Browse files
authored
add aws kms client caching (#58)
* add aws kms client caching * don't inject cache into credential in signer abstraction * add justfile * refactor cache injection * script updates * remove bun declaration * remove unused code path * reduce noisy logs * atomic cache inserts * increase webhook workers count
1 parent 47dd4fd commit 7810d96

File tree

13 files changed

+329
-35
lines changed

13 files changed

+329
-35
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ alloy-signer-aws = { version = "1.0.23", features = ["eip712"] }
2323
aws-config = "1.8.2"
2424
aws-sdk-kms = "1.79.0"
2525
aws-credential-types = "1.2.4"
26+
moka = { version = "0.12", features = ["future"] }

core/src/credentials.rs

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,33 @@ use aws_config::{BehaviorVersion, Region};
55
use aws_credential_types::provider::future::ProvideCredentials as ProvideCredentialsFuture;
66
use aws_sdk_kms::config::{Credentials, ProvideCredentials};
77
use serde::{Deserialize, Serialize};
8+
use std::collections::hash_map::DefaultHasher;
9+
use std::hash::{Hash, Hasher};
10+
use std::ops::Deref;
811
use thirdweb_core::auth::ThirdwebAuth;
912
use thirdweb_core::iaw::AuthToken;
1013
use vault_types::enclave::auth::Auth as VaultAuth;
1114

1215
use crate::error::EngineError;
1316

17+
/// Cache for AWS KMS clients to avoid recreating connections
18+
pub type KmsClientCache = moka::future::Cache<u64, aws_sdk_kms::Client>;
19+
1420
impl SigningCredential {
1521
/// Create a random private key credential for testing
1622
pub fn random_local() -> Self {
1723
SigningCredential::PrivateKey(PrivateKeySigner::random())
1824
}
25+
26+
/// Inject KMS cache into AWS KMS credentials (useful after deserialization)
27+
pub fn with_aws_kms_cache(self, kms_client_cache: &KmsClientCache) -> Self {
28+
match self {
29+
SigningCredential::AwsKms(creds) => {
30+
SigningCredential::AwsKms(creds.with_cache(kms_client_cache.clone()))
31+
}
32+
other => other,
33+
}
34+
}
1935
}
2036

2137
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -38,6 +54,18 @@ pub struct AwsKmsCredential {
3854
pub secret_access_key: String,
3955
pub key_id: String,
4056
pub region: String,
57+
#[serde(skip)]
58+
pub kms_client_cache: Option<KmsClientCache>,
59+
}
60+
61+
impl Hash for AwsKmsCredential {
62+
fn hash<H: Hasher>(&self, state: &mut H) {
63+
self.access_key_id.hash(state);
64+
self.secret_access_key.hash(state);
65+
self.key_id.hash(state);
66+
self.region.hash(state);
67+
// Don't hash the cache - it's not part of the credential identity
68+
}
4169
}
4270

4371
impl ProvideCredentials for AwsKmsCredential {
@@ -57,14 +85,71 @@ impl ProvideCredentials for AwsKmsCredential {
5785
}
5886

5987
impl AwsKmsCredential {
60-
pub async fn get_signer(&self, chain_id: Option<ChainId>) -> Result<AwsSigner, EngineError> {
88+
/// Create a new AwsKmsCredential with cache
89+
pub fn new(
90+
access_key_id: String,
91+
secret_access_key: String,
92+
key_id: String,
93+
region: String,
94+
kms_client_cache: KmsClientCache,
95+
) -> Self {
96+
Self {
97+
access_key_id,
98+
secret_access_key,
99+
key_id,
100+
region,
101+
kms_client_cache: Some(kms_client_cache),
102+
}
103+
}
104+
105+
/// Inject cache into this credential (useful after deserialization)
106+
pub fn with_cache(mut self, kms_client_cache: KmsClientCache) -> Self {
107+
self.kms_client_cache = Some(kms_client_cache);
108+
self
109+
}
110+
111+
/// Create a cache key from the credential
112+
fn cache_key(&self) -> u64 {
113+
let mut hasher = DefaultHasher::new();
114+
self.hash(&mut hasher);
115+
hasher.finish()
116+
}
117+
118+
/// Create a new AWS KMS client (without caching)
119+
async fn create_kms_client(&self) -> Result<aws_sdk_kms::Client, EngineError> {
61120
let config = aws_config::defaults(BehaviorVersion::latest())
62121
.credentials_provider(self.clone())
63122
.region(Region::new(self.region.clone()))
64123
.load()
65124
.await;
66-
let client = aws_sdk_kms::Client::new(&config);
125+
Ok(aws_sdk_kms::Client::new(&config))
126+
}
67127

128+
/// Get a cached AWS KMS client, creating one if it doesn't exist
129+
async fn get_cached_kms_client(&self) -> Result<aws_sdk_kms::Client, EngineError> {
130+
match &self.kms_client_cache {
131+
Some(cache) => {
132+
let cache_key = self.cache_key();
133+
134+
cache
135+
.try_get_with(cache_key, async {
136+
tracing::debug!("Creating new KMS client for key: {}", cache_key);
137+
self.create_kms_client().await
138+
})
139+
.await
140+
.map_err(|e| e.deref().clone())
141+
}
142+
None => {
143+
// Fallback to creating a new client without caching
144+
tracing::debug!("No cache available, creating new KMS client");
145+
self.create_kms_client().await
146+
}
147+
}
148+
}
149+
150+
/// Get signer (uses cache if available)
151+
pub async fn get_signer(&self, chain_id: Option<ChainId>) -> Result<AwsSigner, EngineError> {
152+
let client = self.get_cached_kms_client().await?;
68153
let signer = AwsSigner::new(client, self.key_id.clone(), chain_id).await?;
69154
Ok(signer)
70155
}

executors/src/eoa/worker/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use alloy::primitives::{Address, U256};
33
use alloy::providers::Provider;
44
use engine_core::{
55
chain::{Chain, ChainService},
6-
credentials::SigningCredential,
6+
credentials::{SigningCredential, KmsClientCache},
77
error::AlloyRpcErrorToEngineError,
88
signer::EoaSigner,
99
};
@@ -127,6 +127,9 @@ where
127127

128128
// EOA metrics abstraction with encapsulated configuration
129129
pub eoa_metrics: EoaMetrics,
130+
131+
// KMS client cache for AWS KMS credentials
132+
pub kms_client_cache: KmsClientCache,
130133
}
131134

132135
impl<CS> DurableExecution for EoaExecutorJobHandler<CS>
@@ -182,12 +185,15 @@ where
182185

183186
let chain_id = chain.chain_id();
184187

188+
// Inject KMS cache into the noop signing credential (after deserialization from Redis)
189+
let noop_signing_credential = data.noop_signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache);
190+
185191
let worker = EoaExecutorWorker {
186192
store: scoped,
187193
chain,
188194
eoa: data.eoa_address,
189195
chain_id: data.chain_id,
190-
noop_signing_credential: data.noop_signing_credential.clone(),
196+
noop_signing_credential,
191197

192198
max_inflight: if is_minimal_account {
193199
1
@@ -199,6 +205,7 @@ where
199205
max_recycled_nonces: self.max_recycled_nonces,
200206
webhook_queue: self.webhook_queue.clone(),
201207
signer: self.eoa_signer.clone(),
208+
kms_client_cache: self.kms_client_cache.clone(),
202209
};
203210

204211
let job_start_time = current_timestamp_ms();
@@ -311,6 +318,7 @@ pub struct EoaExecutorWorker<C: Chain> {
311318

312319
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
313320
pub signer: Arc<EoaSigner>,
321+
pub kms_client_cache: KmsClientCache,
314322
}
315323

316324
impl<C: Chain> EoaExecutorWorker<C> {

executors/src/eoa/worker/transaction.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
176176
// Try EIP-1559 fees first, fall back to legacy if unsupported
177177
match self.chain.provider().estimate_eip1559_fees().await {
178178
Ok(eip1559_fees) => {
179-
tracing::debug!(
179+
tracing::trace!(
180180
"Using EIP-1559 fees: max_fee={}, max_priority_fee={}",
181181
eip1559_fees.max_fee_per_gas,
182182
eip1559_fees.max_priority_fee_per_gas
@@ -397,7 +397,11 @@ impl<C: Chain> EoaExecutorWorker<C> {
397397
nonce: u64,
398398
) -> Result<Signed<TypedTransaction>, EoaExecutorWorkerError> {
399399
let typed_tx = self.build_typed_transaction(request, nonce).await?;
400-
self.sign_transaction(typed_tx, &request.signing_credential)
400+
401+
// Inject KMS cache into the signing credential (after deserialization from Redis)
402+
let credential_with_cache = request.signing_credential.clone().with_aws_kms_cache(&self.kms_client_cache);
403+
404+
self.sign_transaction(typed_tx, &credential_with_cache)
401405
.await
402406
}
403407

justfile

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Start local chain with anvil + speedbump proxy (300ms variable latency)
2+
local-chain:
3+
#!/usr/bin/env bash
4+
set -e
5+
6+
echo "🧹 Cleaning up existing processes..."
7+
# Kill any existing anvil or speedbump processes
8+
pkill -f "anvil.*8546" || true
9+
pkill -f "speedbump.*8545" || true
10+
lsof -ti:8545 | xargs kill -9 2>/dev/null || true
11+
lsof -ti:8546 | xargs kill -9 2>/dev/null || true
12+
13+
echo "🔨 Starting anvil on port 8546 (1s blocktime)..."
14+
anvil --port 8546 --block-time 1 &
15+
ANVIL_PID=$!
16+
17+
# Cleanup function
18+
cleanup() {
19+
echo ""
20+
echo "🛑 Stopping services..."
21+
kill $ANVIL_PID 2>/dev/null || true
22+
pkill -f "speedbump.*8545" || true
23+
exit 0
24+
}
25+
26+
trap cleanup INT TERM EXIT
27+
28+
# Wait a moment for anvil to start
29+
sleep 2
30+
31+
echo "🐌 Starting speedbump proxy on port 8545 (→ localhost:8546)"
32+
echo " Latency: 300ms base + 150ms sine wave (150-450ms variable)"
33+
echo " Connect to: http://localhost:8545"
34+
echo ""
35+
speedbump --port=8545 --latency=300ms --sine-amplitude=150ms --sine-period=1m localhost:8546
36+
37+
# Fund an address with 1 ETH (bypasses speedbump for faster setup)
38+
fund address:
39+
@echo "💰 Funding {{address}} with 1 ETH..."
40+
@cast rpc anvil_setBalance {{address}} $(cast to-wei 1) --rpc-url http://localhost:8546
41+
@echo "✅ Done!"
42+

scripts/benchmarks/eoa.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,3 @@
1-
/// <reference lib="dom" />
2-
3-
// Bun globals (runtime will provide these)
4-
declare const Bun: any;
5-
declare const process: any;
6-
7-
// Extend ImportMeta for Bun
8-
interface ImportMeta {
9-
dir: string;
10-
}
11-
121
// Types based on events.rs
132
interface WebhookEvent {
143
transactionId: string;

0 commit comments

Comments
 (0)