From afc681a70b8be181af289e6c879a9cb0c8b11f44 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Tue, 21 Apr 2026 20:48:14 +0200 Subject: [PATCH 1/7] healthcheck fix --- src/utils/healthcheck.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index ded5b4e1..879154be 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -67,10 +67,12 @@ impl HealthCheckServer { let response = match request.url() { "/healthz" => { debug!("Received the '/healthz' request"); - self.state_to_http_response() + // Liveness endpoint: return OK as long as the process is up. + Response::from_string("OK").with_status_code(HTTP_STATUS_OK) } "/readyz" => { debug!("Received the '/readyz' request"); + // Readiness endpoint: only ready when initialization is complete. self.state_to_http_response() } _ => Response::from_string("Not Found").with_status_code(HTTP_STATUS_NOT_FOUND), From d05b5dc21ef4ec92855f5a2bbdf1738b6cfb76fd Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Tue, 21 Apr 2026 21:05:58 +0200 Subject: [PATCH 2/7] healthcheck port -> port --- bin/env.template | 2 +- src/config.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/env.template b/bin/env.template index 7ff6912e..9f3ec35b 100644 --- a/bin/env.template +++ b/bin/env.template @@ -31,7 +31,7 @@ CROSSBAR_API_URL= # Port to check liquidator health state via HTTP. Example: http://localhost:3000/healthz # Defaults to 3000 if not set -HEALTHCHECK_PORT=3000 +PORT=3000 # Liquidation is skipped if there is less than this amount of tokens-to-be-used-for-repayment in the wallet. In USD. # Defaults to 0.001 if not set diff --git a/src/config.rs b/src/config.rs index 513f21a4..af43eb53 100644 --- a/src/config.rs +++ b/src/config.rs @@ -76,10 +76,10 @@ impl Eva01Config { .parse() .expect("Invalid MIN_PROFIT number"); - let healthcheck_port: u16 = std::env::var("HEALTHCHECK_PORT") + let healthcheck_port: u16 = std::env::var("PORT") .unwrap_or("3000".to_string()) .parse() - .expect("Invalid HEALTHCHECK_PORT number"); + .expect("Invalid PORT number"); let metrics_bind_addr = std::env::var("METRICS_BIND_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string()); @@ -248,7 +248,7 @@ mod tests { jail.set_env("MARGINFI_GROUP_KEY", &marginfi_group_key); jail.set_env("ADDRESS_LOOKUP_TABLES", &address_lookup_tables); jail.set_env("MIN_PROFIT", min_profit); - jail.set_env("HEALTHCHECK_PORT", healthcheck_port); + jail.set_env("PORT", healthcheck_port); jail.set_env("METRICS_BIND_ADDR", "127.0.0.1"); jail.set_env("METRICS_PORT", metrics_port); jail.set_env("DEFAULT_TOKEN_MAX_THRESHOLD", default_token_max_threshold); From 1c7368145866e67e8815778fccc1721e2c126fb9 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Tue, 21 Apr 2026 22:14:27 +0200 Subject: [PATCH 3/7] update logs format --- src/main.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3ad399b1..7b423c23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -93,16 +93,15 @@ fn init_logging() { .format(|buf, record| { use serde_json::json; - let sev = match record.level() { - Level::Error => "ERROR", - Level::Warn => "WARNING", - Level::Info => "INFO", - Level::Debug => "DEBUG", - Level::Trace => "DEBUG", + let level = match record.level() { + Level::Error => "error", + Level::Warn => "warn", + Level::Info => "info", + Level::Debug | Level::Trace => "debug", }; let line = json!({ - "logging.googleapis.com/severity": sev, + "level": level, "message": record.args().to_string(), "target": record.target(), }); From 8f892e91546dc7cc8e26e636081b303e7e81a0c3 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Tue, 21 Apr 2026 22:33:24 +0200 Subject: [PATCH 4/7] split prod & staging builds --- Cargo.toml | 15 +++++++-------- Dockerfile | 6 +++--- Dockerfile.staging | 22 ++++++++++++++++++++++ README.md | 37 ++++++++++++++++++++++++++++--------- bin/env.template | 3 --- bin/start.sh | 18 ------------------ run-eva.sh | 4 ++++ src/main.rs | 6 ++++++ 8 files changed, 70 insertions(+), 41 deletions(-) create mode 100644 Dockerfile.staging delete mode 100644 bin/start.sh diff --git a/Cargo.toml b/Cargo.toml index 6b082ef9..0f4e1a89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,10 @@ version = "0.1.0" edition = "2021" [features] +default = ["network-mainnet"] pretty_logs = [] +network-mainnet = ["marginfi/mainnet-beta", "marginfi_type_crate/mainnet-beta"] +network-staging = ["marginfi/staging", "marginfi_type_crate/staging"] [dependencies] solana-account-decoder = "=2.1.20" @@ -57,19 +60,15 @@ git = "https://github.com/mrgnlabs/marginfi-v2" # tag = "mrgn-0.1.8-rc3" rev = "4ce8a15e70b4108059b74e1aedbfde1e3e564ace" package = "marginfi-type-crate" -#STAGE -#default-features = false -#features = ["staging", "client"] +default-features = false +features = ["client"] [dependencies.marginfi] git = "https://github.com/mrgnlabs/marginfi-v2" # tag = "mrgn-0.1.8-rc3" rev = "4ce8a15e70b4108059b74e1aedbfde1e3e564ace" -#PROD -features = ["mainnet-beta", "client", "no-entrypoint"] -#STAGE -#default-features = false -#features = ["staging", "client", "no-entrypoint"] +default-features = false +features = ["client", "no-entrypoint", "custom-heap"] [dependencies.jupiter-swap-api-client] git = "https://github.com/IliaZyrin/jupiter-swap-api-client" diff --git a/Dockerfile b/Dockerfile index 024ad3be..b107752c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,11 +7,11 @@ RUN cargo chef prepare --recipe-path recipe.json FROM chef AS builder COPY --from=planner /app/recipe.json recipe.json -RUN cargo chef cook --release --recipe-path recipe.json +RUN cargo chef cook --release --recipe-path recipe.json --no-default-features --features network-mainnet COPY . . -RUN cargo build --release --bin eva01 +RUN cargo build --release --bin eva01 --no-default-features --features network-mainnet FROM debian:bookworm-slim AS runner @@ -19,4 +19,4 @@ RUN apt-get update && apt-get install -y ca-certificates COPY --from=builder /app/target/release/eva01 /usr/local/bin/ -ENTRYPOINT ["/usr/local/bin/eva01"] \ No newline at end of file +ENTRYPOINT ["/usr/local/bin/eva01"] diff --git a/Dockerfile.staging b/Dockerfile.staging new file mode 100644 index 00000000..e51f7b3a --- /dev/null +++ b/Dockerfile.staging @@ -0,0 +1,22 @@ +FROM lukemathwalker/cargo-chef:latest-rust-1.85 AS chef +WORKDIR /app + +FROM chef AS planner +COPY . . +RUN cargo chef prepare --recipe-path recipe.json + +FROM chef AS builder +COPY --from=planner /app/recipe.json recipe.json +RUN cargo chef cook --release --recipe-path recipe.json --no-default-features --features network-staging + +COPY . . + +RUN cargo build --release --bin eva01 --no-default-features --features network-staging + +FROM debian:bookworm-slim AS runner + +RUN apt-get update && apt-get install -y ca-certificates + +COPY --from=builder /app/target/release/eva01 /usr/local/bin/ + +ENTRYPOINT ["/usr/local/bin/eva01"] diff --git a/README.md b/README.md index 0e5641cd..d0084949 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,11 @@ # Eva01 - the Project 0 Liquidator ## Structure -* `bin` - shell scripts and environment configuration file templare +* `bin` - environment configuration template * `idls` - IDLs for Project 0 integrations (Kamino, Drift, Juplend) * `src` - source code -* eva.Dockerfile - the Docker configuration for building an image to run on Kubernetes +* `Dockerfile` - production (`network-mainnet`) image build +* `Dockerfile.staging` - staging (`network-staging`) image build ### Configuration The [env.template](bin/env.template) file is a template for the required and optional environment variables that are used by Eva. @@ -16,17 +17,35 @@ The [env.template](bin/env.template) file is a template for the required and opt * Protoc: https://grpc.io/docs/protoc-installation/#install-pre-compiled-binaries-any-os; * Rust: `curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh` 1. Clone the Git repo. -1. Create and configure the `.env` file by using [env.template](bin/env.template) as prototype. - - > VSCode: create the VSCode launch configuration and add the configured `.env` to it. +1. Create and configure your environment file by using [env.template](bin/env.template) as prototype. +1. If you run through `run-eva.sh`, copy values into `docker.prod.env` (or change `run-eva.sh` to source a different file). ## Run -1. Source the `.env` file. Example: `source src/eva01/bin/prod.env` -1. Optionally Rotate logs: `mv ~/log/liquidator.log ~/log/liquidator.log.$(date +'%Y%m%dT%H%M%S')` -1. Run the Liquidator: `nohup bash $LIQUIDATOR_SRC_PATH/bin/start.sh >> ~/log/liquidator.log 2>&1 &` +1. Configure `docker.prod.env`. +1. Run from VS Code task `Run (with all checks)` or execute `bash ./run-eva.sh`. +1. Optional background run: `nohup bash ./run-eva.sh >> ~/log/liquidator.log 2>&1 &` > Initial Loading Time The initial loading phase can take some time, depending on your RPC. Eva will load everything needed into the state, including all Marginfi Accounts. Expect the loading time to be between 1-3 minutes depending on the RPC. -> Local Docker: Run `docker build -f -t eva:latest .` to build an image and `docker run --env-file docker.staging.env --rm -v : eva` to run it. \ No newline at end of file +> Local Docker: Run `docker build -f -t eva:latest .` to build an image and `docker run --env-file docker.staging.env --rm -v : eva` to run it. + +## Build Profiles + +Cargo now supports network-specific build features: + +* `network-mainnet` (default) +* `network-staging` + +### Local builds + +* Mainnet: `cargo build --release --bin eva01 --no-default-features --features network-mainnet` +* Staging: `cargo build --release --bin eva01 --no-default-features --features network-staging` + +### Railway builds + +Use different Dockerfiles per Railway environment: + +* Production environment: leave default `Dockerfile` (or set `RAILWAY_DOCKERFILE_PATH=Dockerfile`) +* Staging environment: set `RAILWAY_DOCKERFILE_PATH=Dockerfile.staging` diff --git a/bin/env.template b/bin/env.template index 9f3ec35b..39d3b386 100644 --- a/bin/env.template +++ b/bin/env.template @@ -1,6 +1,3 @@ -# Path to the Eva repo. Only used to run on Linux. -LIQUIDATOR_SRC_PATH= - ## Mandatory part ## RPC_URL= YELLOWSTONE_ENDPOINT= diff --git a/bin/start.sh b/bin/start.sh deleted file mode 100644 index 3bb1f620..00000000 --- a/bin/start.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -if [ -z "$LIQUIDATOR_SRC_PATH" ]; then - echo "ERROR: LIQUIDATOR_SRC_PATH is not set." - exit 1 -else - echo "LIQUIDATOR_SRC_PATH is set to: $LIQUIDATOR_SRC_PATH" -fi - -pushd $LIQUIDATOR_SRC_PATH > /dev/null - -export RUST_LOG=debug,hyper=info,h2::codec=info,eva01::geyser_processor=info,eva01::clock_manager=info -export RUST_BACKTRACDE=1 -cargo run - -popd > /dev/null - - diff --git a/run-eva.sh b/run-eva.sh index 1359960b..a0cba840 100755 --- a/run-eva.sh +++ b/run-eva.sh @@ -15,3 +15,7 @@ cargo fmt # cargo clippy -- -D warnings cargo build --bin eva01 --package eva01 cargo run --bin eva01 --features pretty_logs + +# Staging network build/run example: +# cargo build --bin eva01 --package eva01 --no-default-features --features network-staging +# cargo run --bin eva01 --no-default-features --features "network-staging,pretty_logs" diff --git a/src/main.rs b/src/main.rs index 7b423c23..fb142fca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,12 @@ use std::{ #[cfg(not(feature = "pretty_logs"))] use {env_logger::Target, log::Level, std::io::Write as _}; +#[cfg(all(feature = "network-mainnet", feature = "network-staging"))] +compile_error!("Enable only one network feature: network-mainnet or network-staging"); + +#[cfg(not(any(feature = "network-mainnet", feature = "network-staging")))] +compile_error!("Missing network feature. Enable network-mainnet or network-staging"); + mod cache; mod cache_loader; mod cli; From 8cbbbd44494df1942565c1dbd95cf0f37c25dd3d Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Wed, 22 Apr 2026 15:50:38 +0200 Subject: [PATCH 5/7] significantly improve sim logic and clean up some redundant config vars --- Cargo.lock | 65 +--- Cargo.toml | 1 - bin/env.template | 6 +- src/cache/accounts.rs | 55 +-- src/config.rs | 69 ++-- src/liquidator.rs | 113 +++--- src/main.rs | 3 - src/utils/simulation_cache.rs | 532 ++++++++++++++++++++++++++++- src/utils/swb_cranker.rs | 336 ++++++++++++++---- src/wrappers/liquidator_account.rs | 373 ++------------------ 10 files changed, 994 insertions(+), 559 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df25d6b8..6d0d2ed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -794,12 +794,6 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" -[[package]] -name = "base64ct" -version = "1.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" - [[package]] name = "bincode" version = "1.3.3" @@ -1873,7 +1867,6 @@ dependencies = [ "solana-sdk", "spl-associated-token-account 6.0.0", "spl-token 7.0.0", - "switchboard-on-demand 0.11.3", "switchboard-on-demand-client", "tiny_http", "tokio", @@ -1930,16 +1923,6 @@ dependencies = [ "wide", ] -[[package]] -name = "faster-hex" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7223ae2d2f179b803433d9c830478527e92b8117eab39460edae7f1614d9fb73" -dependencies = [ - "heapless", - "serde", -] - [[package]] name = "fastrand" version = "2.3.0" @@ -2319,15 +2302,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "hash32" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" -dependencies = [ - "byteorder", -] - [[package]] name = "hashbrown" version = "0.11.2" @@ -2391,16 +2365,6 @@ dependencies = [ "http 1.3.1", ] -[[package]] -name = "heapless" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" -dependencies = [ - "hash32 0.3.1", - "stable_deref_trait", -] - [[package]] name = "heck" version = "0.3.3" @@ -3171,14 +3135,11 @@ dependencies = [ "arrayref", "base64 0.22.1", "digest 0.9.0", - "hmac-drbg", "libsecp256k1-core 0.3.0", "libsecp256k1-gen-ecmult 0.3.0", "libsecp256k1-gen-genmult 0.3.0", "rand 0.8.5", "serde", - "sha2 0.9.9", - "typenum", ] [[package]] @@ -3297,7 +3258,7 @@ dependencies = [ "solend-mocks", "spl-transfer-hook-interface 0.9.0", "static_assertions", - "switchboard-on-demand 0.3.8", + "switchboard-on-demand", ] [[package]] @@ -6840,7 +6801,7 @@ checksum = "1c1941b5ef0c3ce8f2ac5dd984d0fb1a97423c4ff2a02eec81e3913f02e2ac2b" dependencies = [ "byteorder", "combine 3.8.1", - "hash32 0.2.1", + "hash32", "libc", "log", "rand 0.8.5", @@ -7432,28 +7393,6 @@ dependencies = [ "spl-token 7.0.0", ] -[[package]] -name = "switchboard-on-demand" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038c5374a2b8caf01dcccc8a95e7789cddea056e0f34a4d67876fdbb743ddc3d" -dependencies = [ - "anyhow", - "arrayref", - "base64ct", - "bincode", - "borsh 1.5.7", - "bytemuck", - "cc", - "faster-hex", - "libsecp256k1 0.7.2", - "rust_decimal", - "serde", - "sha2 0.10.9", - "solana-program", - "switchboard-protos", -] - [[package]] name = "switchboard-on-demand-client" version = "0.4.1-alpha0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 0f4e1a89..81efadb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,6 @@ ctrlc = "3.5.2" yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc", branch = "v2.1" } yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc", branch = "v2.1" } -switchboard-on-demand = "0.11.3" # Includes a fix to the way results are parsed. switchboard-on-demand-client = { git = "https://github.com/abdulrabbani00/switchboard-sdk", rev = "99cf3f29143b81fd3ad17649725a3f994b253fbb" } solana-dex-superagg = { git = "https://github.com/0dotxyz/solana-dex-superagg", rev = "3ab0b4f4de424a0044e983576d113e3b349b3d94" } diff --git a/bin/env.template b/bin/env.template index 39d3b386..5f137aa7 100644 --- a/bin/env.template +++ b/bin/env.template @@ -1,10 +1,8 @@ ## Mandatory part ## -RPC_URL= YELLOWSTONE_ENDPOINT= YELLOWSTONE_X_TOKEN= WALLET_KEYPAIR= COMPUTE_UNIT_PRICE_MICRO_LAMPORTS=1000 -MARGINFI_PROGRAM_ID=MFv2hWf31Z9kbCa1snEPYctwafyhdvnV7FZnsebVacA MARGINFI_GROUP_KEY=4qp6Fx6tnZkY5Wropq9wUYgtFxXKwE6viZxFHg3rdAG8 ADDRESS_LOOKUP_TABLES=HGmknUTUmeovMc9ryERNWG6UFZDFDVr9xrum3ZhyL4fC,5FuKF7C1tJji2mXZuJ14U9oDb37is5mmvYLf4KwojoF1,FEFhAFKz48P3w82Ds5VhvyEDwhRqu2FejmnuxEPZ8wNR @@ -22,6 +20,10 @@ JUP_SWAP_API_KEY='axvs' SLIPPAGE_BPS=100 ## Optional part ## +# Optional override for JSON-RPC endpoint. If not set, derived as +# `${YELLOWSTONE_ENDPOINT}/${YELLOWSTONE_X_TOKEN}` (or just endpoint if token is empty). +# RPC_URL= + # Switchboard feeds that should not be cranked (cause they fail too often) UNSTABLE_SWB_FEEDS=<4YMdFbV8FaUv4FpE2gbZBReSFVzFKNPJZHirHwc2Psza,...> CROSSBAR_API_URL= diff --git a/src/cache/accounts.rs b/src/cache/accounts.rs index 972f4c71..2299ec89 100644 --- a/src/cache/accounts.rs +++ b/src/cache/accounts.rs @@ -34,33 +34,34 @@ impl MarginfiAccountsCache { .cloned() } - pub fn try_get_account_by_index(&self, index: usize) -> Result { - self.accounts - .read() - .map_err(|e| { - anyhow!( - "Failed to lock the Marginfi accounts map for for search by index: {}", - e - ) - })? - .get_index(index) - .map(|(_, account)| account.clone()) - .ok_or(anyhow!( - "Failed to find the Marginfi account with index: {}", - index - )) - } + pub fn try_get_account_batch( + &self, + start_index: usize, + batch_size: usize, + ) -> Result> { + if batch_size == 0 { + return Ok(Vec::new()); + } - pub fn len(&self) -> Result { - Ok(self - .accounts - .read() - .map_err(|e| { - anyhow!( - "Failed to lock the marginfi accounts map for getting it's size: {}", - e - ) - })? - .len()) + let accounts = self.accounts.read().map_err(|e| { + anyhow!( + "Failed to lock the marginfi accounts map for batch snapshot: {}", + e + ) + })?; + + if start_index >= accounts.len() { + return Ok(Vec::new()); + } + + let end_index = (start_index + batch_size).min(accounts.len()); + let mut out = Vec::with_capacity(end_index - start_index); + for index in start_index..end_index { + if let Some((_, account)) = accounts.get_index(index) { + out.push(account.clone()); + } + } + + Ok(out) } } diff --git a/src/config.rs b/src/config.rs index af43eb53..e3472132 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,11 +39,11 @@ pub struct Eva01Config { impl Eva01Config { pub fn new() -> anyhow::Result { //General configuration - let rpc_url = std::env::var("RPC_URL").expect("RPC_URL environment variable is not set"); - let yellowstone_endpoint = std::env::var("YELLOWSTONE_ENDPOINT") .expect("YELLOWSTONE_ENDPOINT environment variable is not set"); let yellowstone_x_token = std::env::var("YELLOWSTONE_X_TOKEN").ok(); + let derived_rpc_url = derive_rpc_url(&yellowstone_endpoint, yellowstone_x_token.as_deref()); + let rpc_url = std::env::var("RPC_URL").unwrap_or(derived_rpc_url); let wallet_keypair_env = std::env::var("WALLET_KEYPAIR") .expect("WALLET_KEYPAIR environment variable is not set"); @@ -56,11 +56,7 @@ impl Eva01Config { .parse() .expect("Invalid COMPUTE_UNIT_PRICE_MICRO_LAMPORTS number"); - let marginfi_program_id = Pubkey::from_str( - &std::env::var("MARGINFI_PROGRAM_ID") - .expect("MARGINFI_PROGRAM_ID environment variable is not set"), - ) - .expect("Invalid MARGINFI_PROGRAM_ID Pubkey"); + let marginfi_program_id = Pubkey::new_from_array(marginfi_type_crate::ID.to_bytes()); let marginfi_group_key = Pubkey::from_str( &std::env::var("MARGINFI_GROUP_KEY") @@ -161,6 +157,14 @@ impl Eva01Config { } } +fn derive_rpc_url(yellowstone_endpoint: &str, yellowstone_x_token: Option<&str>) -> String { + let endpoint = yellowstone_endpoint.trim_end_matches('/'); + match yellowstone_x_token.map(str::trim).filter(|token| !token.is_empty()) { + Some(token) => format!("{endpoint}/{token}"), + None => endpoint.to_string(), + } +} + pub fn load_token_thresholds_from_env() -> anyhow::Result> { match std::env::var("TOKEN_THRESHOLDS") { Ok(s) if !s.trim().is_empty() => { @@ -223,11 +227,9 @@ mod tests { fn setup_general_env(jail: &mut Jail) { let keypair = serde_json::to_string(&Keypair::new().to_bytes().to_vec()).unwrap(); - let rpc_url = "http://dummy:1234"; - let yellowstone_endpoint = "http://dummy:1234"; + let yellowstone_endpoint = "https://dummy"; let yellowstone_x_token = "token"; let compute_unit_price_micro_lamports = "1000"; - let marginfi_program_id = Pubkey::new_unique().to_string(); let marginfi_group_key = Pubkey::new_unique().to_string(); let address_lookup_tables = Pubkey::new_unique().to_string(); let min_profit = "0.01"; @@ -236,7 +238,6 @@ mod tests { let metrics_port = "9898"; let healthcheck_port = "3000"; - jail.set_env("RPC_URL", rpc_url); jail.set_env("YELLOWSTONE_ENDPOINT", yellowstone_endpoint); jail.set_env("YELLOWSTONE_X_TOKEN", yellowstone_x_token); jail.set_env("WALLET_KEYPAIR", &keypair); @@ -244,7 +245,6 @@ mod tests { "COMPUTE_UNIT_PRICE_MICRO_LAMPORTS", compute_unit_price_micro_lamports, ); - jail.set_env("MARGINFI_PROGRAM_ID", &marginfi_program_id); jail.set_env("MARGINFI_GROUP_KEY", &marginfi_group_key); jail.set_env("ADDRESS_LOOKUP_TABLES", &address_lookup_tables); jail.set_env("MIN_PROFIT", min_profit); @@ -320,24 +320,51 @@ mod tests { #[test] #[serial] - #[should_panic(expected = "RPC_URL environment variable is not set")] - fn test_eva01_config_new_missing_env() { - Jail::expect_with(|_jail| { - // RPC_URL is not set in the jail, so it should panic - let _result = Eva01Config::new(); + fn test_eva01_config_new_uses_default_marginfi_program_id() { + Jail::expect_with(|mut jail| { + setup_general_env(&mut jail); + setup_rebalancer_env(&mut jail); + let config = Eva01Config::new().unwrap(); + assert_eq!( + config.marginfi_program_id, + Pubkey::new_from_array(marginfi_type_crate::ID.to_bytes()) + ); Ok(()) }); } #[test] #[serial] - #[should_panic(expected = "Invalid MARGINFI_PROGRAM_ID Pubkey")] - fn test_eva01_config_new_invalid_pubkey_env() { + fn test_eva01_config_new_derives_rpc_url_from_yellowstone() { Jail::expect_with(|mut jail| { setup_general_env(&mut jail); setup_rebalancer_env(&mut jail); - jail.set_env("MARGINFI_PROGRAM_ID", "not_a_pubkey"); - Eva01Config::new().unwrap(); + let config = Eva01Config::new().unwrap(); + assert_eq!(config.rpc_url, "https://dummy/token"); + Ok(()) + }); + } + + #[test] + #[serial] + fn test_eva01_config_new_prefers_explicit_rpc_url_override() { + Jail::expect_with(|mut jail| { + setup_general_env(&mut jail); + setup_rebalancer_env(&mut jail); + jail.set_env("RPC_URL", "https://explicit-rpc.example"); + let config = Eva01Config::new().unwrap(); + assert_eq!(config.rpc_url, "https://explicit-rpc.example"); + Ok(()) + }); + } + + #[test] + #[serial] + #[should_panic(expected = "YELLOWSTONE_ENDPOINT environment variable is not set")] + fn test_eva01_config_new_missing_env() { + Jail::expect_with(|_jail| { + // YELLOWSTONE_ENDPOINT is not set in the jail, so it should panic + let _result = Eva01Config::new(); Ok(()) }); } diff --git a/src/liquidator.rs b/src/liquidator.rs index e1f8a454..1cf75ec9 100644 --- a/src/liquidator.rs +++ b/src/liquidator.rs @@ -11,6 +11,7 @@ use crate::{ rebalancer::Rebalancer, utils::{ format_error_chain, + simulation_cache::is_transient_rpc_anyhow_error, swb_cranker::{SwbCranker, SWB_STALE_HANDLED_ERROR, SWB_STALE_PRICE_ERROR_CODE_NUMBER}, }, wrappers::{ @@ -45,6 +46,7 @@ use std::{ use std::{sync::atomic::Ordering, thread}; const DECLARED_VALUE_RANGE: f64 = 0.2; +const ACCOUNT_SCAN_BATCH_SIZE: usize = 4_096; pub struct Liquidator { liquidator_account: Arc, @@ -108,9 +110,9 @@ impl Liquidator { } pub fn start(&mut self) -> Result<()> { - if let Err(err) = self.simulate_oracles_and_integrations() { + if let Err(err) = self.simulate_oracles_and_integrations(false) { warn!( - "Failed pre-rebalancing simulation round: {}", + "Failed startup pre-rebalancing simulation round: {}", format_error_chain(&err) ); } @@ -127,9 +129,9 @@ impl Liquidator { info!("Running the Liquidation process..."); self.run_liquidation.store(false, Ordering::Relaxed); - if let Err(err) = self.simulate_oracles_and_integrations() { + if let Err(err) = self.simulate_oracles_and_integrations(true) { error!( - "Failed pre-liquidation simulation round: {}", + "Failed pre-liquidation simulation: {}", format_error_chain(&err) ); continue; @@ -137,7 +139,10 @@ impl Liquidator { let mut missing_tokens: HashMap = HashMap::new(); let mut stale_swb_oracles: HashSet = HashSet::new(); - if let Ok(mut accounts) = self.evaluate_all_accounts(&mut stale_swb_oracles) { + let evaluated_accounts = self.evaluate_all_accounts(&mut stale_swb_oracles); + let mut cranked_stale_oracles = false; + + if let Ok(mut accounts) = evaluated_accounts { // Accounts are sorted from the highest profit to the lowest accounts.sort_by(|a, b| a.profit.cmp(&b.profit)); accounts.reverse(); @@ -202,6 +207,7 @@ impl Liquidator { } if !stale_swb_oracles.is_empty() { info!("Cranking Swb Oracles {:#?}", stale_swb_oracles); + cranked_stale_oracles = true; if let Err(err) = self .swb_cranker .crank_oracles(stale_swb_oracles.into_iter().collect()) @@ -210,16 +216,26 @@ impl Liquidator { } info!("Completed cranking Swb Oracles."); }; + } else if let Err(err) = evaluated_accounts { + error!("Failed to evaluate accounts in liquidation: {}", err); + ERROR_COUNT.inc(); } info!("The Liquidation process is complete."); - if let Err(err) = self.simulate_oracles_and_integrations() { - warn!( - "Failed pre-rebalancing simulation round: {}", - format_error_chain(&err) + if cranked_stale_oracles { + if let Err(err) = self.simulate_oracles_and_integrations(false) { + warn!( + "Failed pre-rebalancing simulation: {}", + format_error_chain(&err) + ); + } + } else { + debug!( + "Skipping pre-rebalancing oracle simulation because no stale oracles were cranked" ); } + if let Err(error) = self.rebalancer.run(missing_tokens) { error!("Rebalancing failed: {:?}", error); ERROR_COUNT.inc(); @@ -230,17 +246,26 @@ impl Liquidator { Ok(()) } - fn simulate_oracles_and_integrations(&self) -> Result<()> { - self.liquidator_account - .simulate_refresh_integrations() - .context("simulate_refresh_integrations failed")?; + fn simulate_oracles_and_integrations(&self, refresh_integrations: bool) -> Result<()> { + if refresh_integrations { + if let Err(err) = self.liquidator_account.simulate_refresh_integrations() { + if is_transient_rpc_anyhow_error(&err) { + warn!( + "Transient RPC failure while simulating integrations refresh; proceeding with cached integration state: {}", + format_error_chain(&err) + ); + } else { + return Err(err).context("simulate_refresh_integrations failed"); + } + } + } let swb_oracle_count = self.cache.banks.get_swb_oracles().len(); self.swb_cranker .simulate_oracles(self.cache.as_ref()) .with_context(|| { format!( - "simulate_oracles failed (switchboard feed count: {})", - swb_oracle_count + "simulate_oracles failed (switchboard feed count: {}, refresh_integrations: {})", + swb_oracle_count, refresh_integrations ) }) } @@ -256,42 +281,40 @@ impl Liquidator { let clock = clock_manager::get_clock(&self.cache.clock)?; let evaluation_result = { - let mut index: usize = 0; + let mut next_index: usize = 0; let mut result: Vec = vec![]; - while index < self.cache.marginfi_accounts.len()? { - total_scanned += 1; - match self.cache.marginfi_accounts.try_get_account_by_index(index) { - Ok(account) => { - if account.address == self.liquidator_account.liquidator_address { - index += 1; - continue; - } - match self.process_account(&account, clock.clone(), stale_swb_oracles) { - Ok(acc_opt) => { - if let Some(acc) = acc_opt { - result.push(acc); - } + loop { + let accounts_batch = self + .cache + .marginfi_accounts + .try_get_account_batch(next_index, ACCOUNT_SCAN_BATCH_SIZE)?; + if accounts_batch.is_empty() { + break; + } + next_index += accounts_batch.len(); + + for account in accounts_batch { + total_scanned += 1; + if account.address == self.liquidator_account.liquidator_address { + continue; + } + match self.process_account(&account, clock.clone(), stale_swb_oracles) { + Ok(acc_opt) => { + if let Some(acc) = acc_opt { + result.push(acc); } - Err(err) => { - if !err.to_string().contains(SWB_STALE_HANDLED_ERROR) { - debug!( - "Failed to process account {:?}: {:?}", - account.address, err - ); - ERROR_COUNT.inc(); - } + } + Err(err) => { + if !err.to_string().contains(SWB_STALE_HANDLED_ERROR) { + debug!( + "Failed to process account {:?}: {:?}", + account.address, err + ); + ERROR_COUNT.inc(); } } } - Err(err) => { - error!( - "Failed to get Marginfi account by index {}: {:?}", - index, err - ); - ERROR_COUNT.inc(); - } } - index += 1; } Ok(result) diff --git a/src/main.rs b/src/main.rs index fb142fca..4d8ee6bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -50,9 +50,6 @@ use drift_idl::*; fn main() -> Result<(), Box> { init_logging(); - //FIX MINTING TO WSOL !!!! - //FIX MINTING TO WSOL !!!! - //FIX MINTING TO WSOL !!!! std::panic::set_hook(Box::new(|panic_info| { eprintln!("Panic occurred: {:#?}", panic_info); diff --git a/src/utils/simulation_cache.rs b/src/utils/simulation_cache.rs index 9c44dad3..c0a4e994 100644 --- a/src/utils/simulation_cache.rs +++ b/src/utils/simulation_cache.rs @@ -1,6 +1,48 @@ -use anyhow::{anyhow, Result}; -use solana_account_decoder::UiAccount; -use solana_sdk::{account::Account, pubkey::Pubkey}; +use anyhow::{anyhow, Context, Result}; +use log::{debug, warn}; +use solana_account_decoder::{UiAccount, UiAccountEncoding}; +use solana_client::{ + client_error::{ClientError, ClientErrorKind}, + rpc_client::RpcClient, + rpc_config::{RpcSimulateTransactionAccountsConfig, RpcSimulateTransactionConfig}, + rpc_request::RpcError, +}; +use solana_sdk::{ + account::Account, + address_lookup_table::AddressLookupTableAccount, + commitment_config::CommitmentConfig, + instruction::Instruction, + message::{v0::Message, VersionedMessage}, + pubkey::Pubkey, + signature::Keypair, + signer::Signer, + transaction::{TransactionError, VersionedTransaction}, +}; +use std::{thread, time::Duration}; + +const SIMULATION_LOG_LINE_LIMIT: usize = 40; +const SIMULATION_LOG_CHAR_LIMIT: usize = 8_000; + +#[derive(Clone, Debug)] +pub struct SimulatedInstructionEntry { + pub kind: &'static str, + pub address: Pubkey, + pub instruction: Instruction, +} + +#[derive(Debug, Clone)] +pub struct SimulatedBatchRunSummary { + pub skipped_entries: Vec, + pub refreshed_batches: usize, + pub preferred_batch_size: usize, + pub resized_down: bool, +} + +enum SimulateBatchError { + TooLarge(anyhow::Error), + TooManyAccountLocks(anyhow::Error), + Other(anyhow::Error), +} pub fn decode_and_apply_simulated_accounts( addresses: &[Pubkey], @@ -34,3 +76,487 @@ where Ok(()) } + +pub fn simulate_instruction_batches( + rpc_client: &RpcClient, + signer: &Keypair, + luts: &[AddressLookupTableAccount], + cu_limit_ix: &Instruction, + entries: &[SimulatedInstructionEntry], + initial_batch_size: usize, + max_rpc_retries: usize, + rpc_retry_base_delay: Duration, + mut apply: F, +) -> Result +where + F: FnMut(&Pubkey, Account) -> Result<()>, +{ + if entries.is_empty() { + return Ok(SimulatedBatchRunSummary { + skipped_entries: vec![], + refreshed_batches: 0, + preferred_batch_size: 1, + resized_down: false, + }); + } + + let mut skipped_entries: Vec = vec![]; + let mut refreshed_batches = 0usize; + let mut offset = 0usize; + let mut preferred_batch_size = initial_batch_size.min(entries.len()).max(1); + let mut resized_down = false; + + while offset < entries.len() { + let remaining = entries.len() - offset; + let mut batch_size = preferred_batch_size.min(remaining).max(1); + + loop { + let batch_entries = entries[offset..offset + batch_size].to_vec(); + match simulate_instruction_batch( + rpc_client, + signer, + luts, + cu_limit_ix, + batch_entries, + max_rpc_retries, + rpc_retry_base_delay, + &mut skipped_entries, + &mut apply, + ) { + Ok(batch_refreshed_any) => { + if batch_refreshed_any { + refreshed_batches += 1; + } + offset += batch_size; + break; + } + Err(SimulateBatchError::TooLarge(_err)) if batch_size > 1 => { + let new_batch_size = (batch_size / 2).max(1); + debug!( + "Integrations refresh simulation tx too large with {} instructions; retrying with {} instructions", + batch_size, new_batch_size + ); + batch_size = new_batch_size; + preferred_batch_size = new_batch_size; + resized_down = true; + } + Err(SimulateBatchError::TooManyAccountLocks(_err)) if batch_size > 1 => { + let new_batch_size = (batch_size / 2).max(1); + debug!( + "Integrations refresh simulation hit TooManyAccountLocks with {} instructions; retrying with {} instructions", + batch_size, new_batch_size + ); + batch_size = new_batch_size; + preferred_batch_size = new_batch_size; + resized_down = true; + } + Err(SimulateBatchError::TooLarge(err)) => { + let failed_entry = entries[offset].clone(); + warn!( + "Skipping integration refresh instruction {} for {} because tx is too large even as a single instruction: {}", + failed_entry.kind, + failed_entry.address, + err + ); + skipped_entries.push(failed_entry); + offset += 1; + break; + } + Err(SimulateBatchError::TooManyAccountLocks(err)) => { + let failed_entry = entries[offset].clone(); + warn!( + "Skipping integration refresh instruction {} for {} because TooManyAccountLocks persisted even as a single instruction: {}", + failed_entry.kind, + failed_entry.address, + err + ); + skipped_entries.push(failed_entry); + offset += 1; + break; + } + Err(SimulateBatchError::Other(err)) => { + let batch_summary = + format_instruction_entries(&entries[offset..offset + batch_size], 10); + return Err(err).with_context(|| { + format!( + "Integrations refresh simulation failed for batch [{}..{}) (size {}, entries: {})", + offset, + offset + batch_size, + batch_size, + batch_summary, + ) + }); + } + } + } + } + + if refreshed_batches == 0 { + return Err(anyhow!( + "Integrations refresh simulation failed for all integrations; skipped entries: {}", + format_skipped_instruction_entries(&skipped_entries) + )); + } + + Ok(SimulatedBatchRunSummary { + skipped_entries, + refreshed_batches, + preferred_batch_size, + resized_down, + }) +} + +fn simulate_instruction_batch( + rpc_client: &RpcClient, + signer: &Keypair, + luts: &[AddressLookupTableAccount], + cu_limit_ix: &Instruction, + batch_entries: Vec, + max_rpc_retries: usize, + rpc_retry_base_delay: Duration, + skipped_entries: &mut Vec, + apply: &mut F, +) -> std::result::Result +where + F: FnMut(&Pubkey, Account) -> Result<()>, +{ + let mut entries = batch_entries; + + loop { + if entries.is_empty() { + return Ok(false); + } + + let integration_addresses: Vec = + entries.iter().map(|entry| entry.address).collect(); + let entry_summary = format_instruction_entries(&entries, 10); + let mut ixs: Vec = Vec::with_capacity(entries.len() + 1); + ixs.push(cu_limit_ix.clone()); + ixs.extend(entries.iter().map(|entry| entry.instruction.clone())); + + let recent_blockhash = retry_transient_rpc( + "getLatestBlockhash for integrations refresh simulation", + max_rpc_retries, + rpc_retry_base_delay, + || rpc_client.get_latest_blockhash(), + ) + .map_err(|err| SimulateBatchError::Other(err.into()))?; + + let signer_pk = signer.pubkey(); + let msg = Message::try_compile(&signer_pk, &ixs, luts, recent_blockhash) + .map_err(|err| SimulateBatchError::Other(err.into()))?; + let tx = VersionedTransaction::try_new(VersionedMessage::V0(msg), &[signer]) + .map_err(|err| SimulateBatchError::Other(err.into()))?; + + let simulation = retry_transient_rpc( + "simulateTransaction for integrations refresh simulation", + max_rpc_retries, + rpc_retry_base_delay, + || { + rpc_client.simulate_transaction_with_config( + &tx, + RpcSimulateTransactionConfig { + sig_verify: false, + replace_recent_blockhash: true, + commitment: Some(CommitmentConfig::confirmed()), + accounts: Some(RpcSimulateTransactionAccountsConfig { + encoding: Some(UiAccountEncoding::Base64), + addresses: integration_addresses + .iter() + .map(|pk| pk.to_string()) + .collect(), + }), + ..Default::default() + }, + ) + }, + ) + .map_err(|err| { + if is_tx_too_large_client(&err) { + SimulateBatchError::TooLarge(err.into()) + } else if is_tx_too_many_account_locks_client(&err) { + SimulateBatchError::TooManyAccountLocks(err.into()) + } else { + SimulateBatchError::Other(err.into()) + } + })?; + + if let Some(err) = simulation.value.err.clone() { + if matches!(err, TransactionError::TooManyAccountLocks) { + let logs = format_simulation_logs( + simulation.value.logs.as_deref(), + SIMULATION_LOG_LINE_LIMIT, + SIMULATION_LOG_CHAR_LIMIT, + ); + return Err(SimulateBatchError::TooManyAccountLocks(anyhow!( + "Integrations refresh simulation hit TooManyAccountLocks for batch of {} entries [{}]; logs={}", + entries.len(), + entry_summary, + logs + ))); + } + + if let TransactionError::InstructionError(ix_index, instruction_error) = &err { + let ix_index = usize::from(*ix_index); + if ix_index > 0 && ix_index <= entries.len() { + let failed_entry = entries.remove(ix_index - 1); + warn!( + "Skipping failing integration refresh instruction {} for {} (tx instruction index {}, error {:?})", + failed_entry.kind, + failed_entry.address, + ix_index, + instruction_error + ); + skipped_entries.push(failed_entry); + continue; + } + } + + let logs = format_simulation_logs( + simulation.value.logs.as_deref(), + SIMULATION_LOG_LINE_LIMIT, + SIMULATION_LOG_CHAR_LIMIT, + ); + return Err(SimulateBatchError::Other(anyhow!( + "Integrations refresh simulation failed with transaction error: {:?}; batch_size={}; entries=[{}]; logs={}", + err, + entries.len(), + entry_summary, + logs + ))); + } + + let simulated_accounts = simulation.value.accounts.ok_or_else(|| { + SimulateBatchError::Other(anyhow!( + "Integrations refresh simulation did not return post-simulation accounts" + )) + })?; + + if simulated_accounts.len() != integration_addresses.len() { + return Err(SimulateBatchError::Other(anyhow!( + "Integrations refresh simulation returned {} accounts, expected {}", + simulated_accounts.len(), + integration_addresses.len() + ))); + } + + decode_and_apply_simulated_accounts( + &integration_addresses, + &simulated_accounts, + "simulateTransaction integrations refresh", + |address, account| apply(address, account), + ) + .map_err(SimulateBatchError::Other)?; + + return Ok(true); + } +} + +fn retry_transient_rpc( + operation_name: &str, + max_retries: usize, + retry_base_delay: Duration, + mut operation: F, +) -> std::result::Result +where + F: FnMut() -> std::result::Result, +{ + let max_attempts = max_retries.max(1); + let mut attempt: usize = 1; + loop { + match operation() { + Ok(value) => return Ok(value), + Err(err) => { + if attempt >= max_attempts || !is_transient_rpc_client_error(&err) { + return Err(err); + } + + let multiplier = 1_u32.checked_shl((attempt - 1) as u32).unwrap_or(u32::MAX); + let backoff_duration = retry_base_delay.saturating_mul(multiplier); + warn!( + "Transient RPC error during {} (attempt {}/{}): {}. Retrying in {:?}", + operation_name, attempt, max_attempts, err, backoff_duration + ); + thread::sleep(backoff_duration); + attempt += 1; + } + } + } +} + +pub fn format_skipped_instruction_entries(entries: &[SimulatedInstructionEntry]) -> String { + if entries.is_empty() { + return "none".to_string(); + } + + entries + .iter() + .map(|entry| format!("{}:{}", entry.kind, entry.address)) + .collect::>() + .join(", ") +} + +pub fn format_instruction_entries( + entries: &[SimulatedInstructionEntry], + max_entries: usize, +) -> String { + if entries.is_empty() { + return "none".to_string(); + } + + let mut out = entries + .iter() + .take(max_entries) + .map(|entry| format!("{}:{}", entry.kind, entry.address)) + .collect::>(); + + if entries.len() > max_entries { + out.push(format!( + "...<{} additional entries>", + entries.len() - max_entries + )); + } + + out.join(", ") +} + +fn format_simulation_logs(logs: Option<&[String]>, max_lines: usize, max_chars: usize) -> String { + let Some(logs) = logs else { + return "none".to_string(); + }; + if logs.is_empty() { + return "none".to_string(); + } + + let mut lines: Vec = logs.iter().take(max_lines).cloned().collect(); + if logs.len() > max_lines { + lines.push(format!( + "...<{} additional log lines truncated>", + logs.len() - max_lines + )); + } + + let mut joined = lines.join(" | "); + if joined.len() > max_chars { + joined.truncate(max_chars); + joined.push_str("..."); + } + + joined +} + +pub fn is_tx_too_large_client(err: &ClientError) -> bool { + match err.kind() { + ClientErrorKind::RpcError(rpc) => match rpc { + RpcError::RpcResponseError { code, message, .. } => { + *code == -32602 && message.contains("too large") + } + RpcError::RpcRequestError(msg) | RpcError::ForUser(msg) => { + // Some nodes may proxy this as a plain string + msg.contains("too large") + } + _ => false, + }, + _ => false, + } +} + +fn is_tx_too_many_account_locks_client(err: &ClientError) -> bool { + match err.kind() { + ClientErrorKind::RpcError(rpc) => match rpc { + RpcError::RpcResponseError { message, .. } => { + message.contains("TooManyAccountLocks") + || message + .to_ascii_lowercase() + .contains("too many account locks") + } + RpcError::RpcRequestError(msg) | RpcError::ForUser(msg) => { + msg.contains("TooManyAccountLocks") + || msg.to_ascii_lowercase().contains("too many account locks") + } + _ => false, + }, + _ => false, + } +} + +pub fn is_transient_rpc_anyhow_error(err: &anyhow::Error) -> bool { + err.chain().any(|cause| { + if let Some(client_error) = cause.downcast_ref::() { + return is_transient_rpc_client_error(client_error); + } + + is_transient_rpc_message(&cause.to_string()) + }) +} + +fn is_transient_rpc_client_error(err: &ClientError) -> bool { + match err.kind() { + ClientErrorKind::Io(io_err) => matches!( + io_err.kind(), + std::io::ErrorKind::TimedOut + | std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::UnexpectedEof + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::WouldBlock + | std::io::ErrorKind::Interrupted + ), + ClientErrorKind::RpcError(rpc_err) => match rpc_err { + RpcError::RpcResponseError { code, message, .. } => { + matches!(*code, 408 | 429 | 500 | 502 | 503 | 504 | -32005) + || is_transient_rpc_message(message) + } + RpcError::RpcRequestError(message) | RpcError::ForUser(message) => { + is_transient_rpc_message(message) + } + RpcError::ParseError(message) => is_transient_rpc_message(message), + }, + _ => is_transient_rpc_message(&err.to_string()), + } +} + +fn is_transient_rpc_message(message: &str) -> bool { + let normalized = message.to_ascii_lowercase(); + normalized.contains("connection closed before message completed") + || normalized.contains("operation timed out") + || normalized.contains("request timeout") + || normalized.contains("408 request timeout") + || normalized.contains("429 too many requests") + || normalized.contains("502 bad gateway") + || normalized.contains("503 service unavailable") + || normalized.contains("504 gateway timeout") + || normalized.contains("temporarily unavailable") + || normalized.contains("connection reset") + || normalized.contains("broken pipe") + || normalized.contains("deadline has elapsed") + || normalized.contains("timeout") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_transient_rpc_anyhow_error_true_for_timeout_io() { + let err = ClientError { + request: None, + kind: ClientErrorKind::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "operation timed out", + )), + }; + let anyhow_err = anyhow!(err); + assert!(is_transient_rpc_anyhow_error(&anyhow_err)); + } + + #[test] + fn test_is_transient_rpc_anyhow_error_false_for_non_transient() { + let err = ClientError { + request: None, + kind: ClientErrorKind::Custom("deterministic failure".to_string()), + }; + let anyhow_err = anyhow!(err); + assert!(!is_transient_rpc_anyhow_error(&anyhow_err)); + } +} diff --git a/src/utils/swb_cranker.rs b/src/utils/swb_cranker.rs index b27fad13..101a13d5 100644 --- a/src/utils/swb_cranker.rs +++ b/src/utils/swb_cranker.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Context, Result}; use base64::{prelude::BASE64_STANDARD, Engine}; +use futures::stream::{self, StreamExt, TryStreamExt}; use log::warn; use serde::Deserialize; use serde_json::{json, Value}; @@ -23,6 +24,11 @@ use solana_sdk::{ signer::Signer, transaction::VersionedTransaction, }; +use std::{ + collections::HashMap, + sync::Mutex, + time::{Duration, Instant}, +}; use switchboard_on_demand_client::{ CrossbarClient, FetchUpdateManyParams, Gateway, PullFeed, QueueAccountData, SbContext, }; @@ -39,6 +45,9 @@ const JITO_SIMULATE_BUNDLE_METHOD: &str = "simulateBundle"; const RAW_SIMULATE_BUNDLE_RESPONSE_LOG_LIMIT: usize = 8_000; const SIMULATION_LOG_LINE_LIMIT: usize = 30; const SIMULATION_LOG_CHAR_LIMIT: usize = 8_000; +const SIM_BUILD_TX_CONCURRENCY: usize = 4; +const SIM_TX_FALLBACK_CONCURRENCY: usize = 16; +const ORACLE_QUARANTINE_DURATION: Duration = Duration::from_secs(60 * 60); struct SimulateBundleTx { encoded_tx: String, @@ -69,6 +78,7 @@ pub struct SwbCranker { crossbar: Option, payer: Keypair, all_swb_oracles: Vec, + oracle_quarantine: Mutex>, } impl SwbCranker { @@ -78,7 +88,7 @@ impl SwbCranker { let tokio_rt = Builder::new_multi_thread() .thread_name("SwbCranker") - .worker_threads(2) + .worker_threads(4) .enable_all() .build()?; @@ -115,10 +125,16 @@ impl SwbCranker { crossbar, payer, all_swb_oracles, + oracle_quarantine: Mutex::new(HashMap::new()), }) } pub fn crank_oracles(&self, swb_oracles: Vec) -> Result<()> { + let swb_oracles = self.filter_quarantined_oracles(&swb_oracles, "crank"); + if swb_oracles.is_empty() { + return Ok(()); + } + // Run simulations to get more details on potential failures, if crossbar is available. if let Some(crossbar) = self.crossbar.as_ref() { let result = self @@ -129,8 +145,47 @@ impl SwbCranker { } } - for chunk in swb_oracles.chunks(CHUNK_SIZE) { - self.crank_oracles_internal(chunk.to_vec())?; + for (chunk_index, chunk) in swb_oracles.chunks(CHUNK_SIZE).enumerate() { + let chunk_oracles = chunk.to_vec(); + if let Err(err) = self.crank_oracles_internal(chunk_oracles.clone()) { + warn!( + "SWB crank failed for chunk {} ({} feeds): {}. Retrying feeds individually.", + chunk_index, + chunk_oracles.len(), + err + ); + + let mut recovered_count = 0usize; + let mut failed_individual: Vec<(Pubkey, anyhow::Error)> = Vec::new(); + for oracle in chunk_oracles { + match self.crank_oracles_internal(vec![oracle]) { + Ok(()) => recovered_count += 1, + Err(single_err) => failed_individual.push((oracle, single_err)), + } + } + + if failed_individual.is_empty() { + continue; + } + + if recovered_count > 0 { + let failed_oracles: Vec = failed_individual + .iter() + .map(|(oracle, _)| *oracle) + .collect(); + self.quarantine_oracles( + &failed_oracles, + "crank", + "individual crank failures after partial recovery", + ); + } else { + warn!( + "SWB crank chunk {} failed for all feeds even individually ({} feeds). Skipping this chunk without quarantine.", + chunk_index, + failed_individual.len() + ); + } + } } Ok(()) } @@ -140,30 +195,60 @@ impl SwbCranker { return Ok(()); } - let bundle_txs: Vec = self - .all_swb_oracles + let active_oracles = self.filter_quarantined_oracles(&self.all_swb_oracles, "simulation"); + if active_oracles.is_empty() { + warn!("All SWB oracles are currently quarantined; skipping simulation."); + return Ok(()); + } + + let chunked_oracles: Vec> = active_oracles .chunks(CHUNK_SIZE) - .enumerate() - .map(|(chunk_index, chunk)| { - let chunk_oracles = chunk.to_vec(); - let tx = self - .build_crank_transaction(chunk_oracles.clone()) - .with_context(|| { - format!( - "failed to build SWB simulation transaction for chunk {} ({} feeds): {:?}", - chunk_index, - chunk_oracles.len(), - chunk_oracles - ) - })?; - let encoded_tx = BASE64_STANDARD.encode(bincode::serialize(&tx)?); - Ok(SimulateBundleTx { - encoded_tx, - oracle_addresses: chunk_oracles, - transaction: tx, + .map(|chunk| chunk.to_vec()) + .collect(); + + let mut build_outcomes = self.tokio_rt.block_on(async { + stream::iter(chunked_oracles.into_iter().enumerate()) + .map(|(chunk_index, chunk_oracles)| async move { + let tx_result = self + .build_crank_transaction_async(chunk_oracles.clone()) + .await; + (chunk_index, chunk_oracles, tx_result) }) - }) - .collect::>>()?; + .buffer_unordered(SIM_BUILD_TX_CONCURRENCY) + .collect::>() + .await + }); + build_outcomes.sort_by_key(|(chunk_index, _, _)| *chunk_index); + + let mut bundle_txs: Vec = Vec::new(); + for (chunk_index, chunk_oracles, tx_result) in build_outcomes { + match tx_result { + Ok(tx) => { + let encoded_tx = + BASE64_STANDARD.encode(bincode::serialize(&tx).with_context(|| { + format!( + "failed to serialize SWB simulation transaction for chunk {}", + chunk_index + ) + })?); + bundle_txs.push(SimulateBundleTx { + encoded_tx, + oracle_addresses: chunk_oracles, + transaction: tx, + }); + } + Err(err) => { + let recovered_txs = + self.recover_failed_simulation_chunk(chunk_index, chunk_oracles, &err)?; + bundle_txs.extend(recovered_txs); + } + } + } + + if bundle_txs.is_empty() { + warn!("No buildable SWB simulation transactions for this round; skipping oracle simulation."); + return Ok(()); + } let (simulation_result, raw_response) = self.simulate_bundle(&bundle_txs)?; @@ -225,17 +310,19 @@ impl SwbCranker { } } - // Keep bundle simulation as the authoritative bundle-level check, but capture account - // states via simulateTransaction because some RPCs omit bundle pre/post account payloads. + // Keep bundle simulation as the authoritative bundle-level check and + // capture post-execution accounts via simulateTransaction. let tx_accounts = self - .simulate_transactions_for_accounts(&bundle_txs) + .tokio_rt + .block_on(self.simulate_transactions_for_accounts(&bundle_txs)) .context("failed to capture simulated accounts via simulateTransaction")?; + let account_source = "simulateTransaction"; for (bundle_tx, post_execution_accounts) in bundle_txs.iter().zip(tx_accounts.iter()) { decode_and_apply_simulated_accounts( &bundle_tx.oracle_addresses, post_execution_accounts, - "simulateTransaction", + account_source, |oracle_address, account| cache.oracles.try_update(oracle_address, account), )?; } @@ -261,7 +348,15 @@ impl SwbCranker { } fn build_crank_transaction(&self, swb_oracles: Vec) -> Result { - let (crank_ix, crank_lut) = self.tokio_rt.block_on(PullFeed::fetch_update_consensus_ix( + self.tokio_rt + .block_on(self.build_crank_transaction_async(swb_oracles)) + } + + async fn build_crank_transaction_async( + &self, + swb_oracles: Vec, + ) -> Result { + let (crank_ix, crank_lut) = PullFeed::fetch_update_consensus_ix( SbContext::new(), &self.non_blocking_rpc_client, FetchUpdateManyParams { @@ -272,11 +367,13 @@ impl SwbCranker { num_signatures: Some(1), ..Default::default() }, - ))?; + ) + .await?; let blockhash = self - .rpc_client - .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())? + .non_blocking_rpc_client + .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) + .await? .0; let tx = VersionedTransaction::try_new( @@ -292,26 +389,147 @@ impl SwbCranker { Ok(tx) } + fn filter_quarantined_oracles(&self, oracles: &[Pubkey], context: &str) -> Vec { + let now = Instant::now(); + let mut quarantine_guard = match self.oracle_quarantine.lock() { + Ok(guard) => guard, + Err(poisoned) => { + warn!( + "SWB oracle quarantine lock poisoned while filtering for {}. Continuing with recovered state.", + context + ); + poisoned.into_inner() + } + }; + + quarantine_guard.retain(|_, until| *until > now); + + let mut active_oracles: Vec = Vec::with_capacity(oracles.len()); + let mut skipped_count = 0usize; + for oracle in oracles { + if quarantine_guard.contains_key(oracle) { + skipped_count += 1; + } else { + active_oracles.push(*oracle); + } + } + + if skipped_count > 0 { + warn!( + "Skipping {} quarantined SWB feeds for {} (cooldown {}s).", + skipped_count, + context, + ORACLE_QUARANTINE_DURATION.as_secs() + ); + } + + active_oracles + } + + fn quarantine_oracles(&self, oracles: &[Pubkey], context: &str, reason: &str) { + if oracles.is_empty() { + return; + } + let until = Instant::now() + ORACLE_QUARANTINE_DURATION; + + let mut quarantine_guard = match self.oracle_quarantine.lock() { + Ok(guard) => guard, + Err(poisoned) => { + warn!( + "SWB oracle quarantine lock poisoned while quarantining for {}. Continuing with recovered state.", + context + ); + poisoned.into_inner() + } + }; + + for oracle in oracles { + quarantine_guard.insert(*oracle, until); + } + + warn!( + "Quarantined {} SWB feeds for {} ({}s cooldown): {:?}", + oracles.len(), + context, + ORACLE_QUARANTINE_DURATION.as_secs(), + oracles + ); + warn!("SWB quarantine reason for {}: {}", context, reason); + } + + fn recover_failed_simulation_chunk( + &self, + chunk_index: usize, + chunk_oracles: Vec, + batch_error: &anyhow::Error, + ) -> Result> { + warn!( + "SWB simulation build failed for chunk {} ({} feeds). Trying individual feeds. Batch error: {}", + chunk_index, + chunk_oracles.len(), + batch_error + ); + + let mut recovered_txs: Vec = Vec::new(); + let mut failed_oracles: Vec<(Pubkey, String)> = Vec::new(); + + for oracle in chunk_oracles { + match self.build_crank_transaction(vec![oracle]) { + Ok(tx) => { + let encoded_tx = + BASE64_STANDARD.encode(bincode::serialize(&tx).with_context(|| { + format!("failed to serialize recovered tx for {}", oracle) + })?); + recovered_txs.push(SimulateBundleTx { + encoded_tx, + oracle_addresses: vec![oracle], + transaction: tx, + }); + } + Err(err) => { + failed_oracles.push((oracle, err.to_string())); + } + } + } + + if failed_oracles.is_empty() { + return Ok(recovered_txs); + } + + if recovered_txs.is_empty() { + warn!( + "SWB simulation chunk {} could not recover any feed individually ({} feeds). Skipping this chunk this round without quarantine.", + chunk_index, + failed_oracles.len() + ); + return Ok(Vec::new()); + } + + let failed_feed_keys: Vec = + failed_oracles.iter().map(|(oracle, _)| *oracle).collect(); + let first_error = failed_oracles + .first() + .map(|(_, err)| err.as_str()) + .unwrap_or("unknown"); + self.quarantine_oracles( + &failed_feed_keys, + "simulation", + &format!( + "chunk {} partial recovery: {} failed feeds; first error: {}", + chunk_index, + failed_feed_keys.len(), + first_error + ), + ); + + Ok(recovered_txs) + } + fn simulate_bundle( &self, bundle_txs: &[SimulateBundleTx], ) -> Result<(RpcSimulateBundleResult, Value)> { let encoded_txs: Vec = bundle_txs.iter().map(|tx| tx.encoded_tx.clone()).collect(); - let pre_execution_accounts_configs: Vec = - (0..bundle_txs.len()).map(|_| Value::Null).collect(); - let post_execution_accounts_configs: Vec = bundle_txs - .iter() - .map(|tx| { - json!({ - "encoding": "base64", - "addresses": tx - .oracle_addresses - .iter() - .map(|pk| pk.to_string()) - .collect::>() - }) - }) - .collect(); let request = RpcRequest::Custom { method: JITO_SIMULATE_BUNDLE_METHOD, @@ -321,8 +539,6 @@ impl SwbCranker { { "encodedTransactions": encoded_txs, "config": { - "preExecutionAccountsConfigs": pre_execution_accounts_configs, - "postExecutionAccountsConfigs": post_execution_accounts_configs, "transactionEncoding": "base64", "skipSigVerify": true, "replaceRecentBlockhash": true @@ -355,14 +571,12 @@ impl SwbCranker { Ok((parsed, raw_result)) } - fn simulate_transactions_for_accounts( + async fn simulate_transactions_for_accounts( &self, bundle_txs: &[SimulateBundleTx], ) -> Result>>> { - bundle_txs - .iter() - .enumerate() - .map(|(chunk_index, bundle_tx)| { + let mut indexed = stream::iter(bundle_txs.iter().enumerate()) + .map(|(chunk_index, bundle_tx)| async move { let accounts_config = RpcSimulateTransactionAccountsConfig { encoding: Some(UiAccountEncoding::Base64), addresses: bundle_tx @@ -381,8 +595,9 @@ impl SwbCranker { }; let response = self - .rpc_client + .non_blocking_rpc_client .simulate_transaction_with_config(&bundle_tx.transaction, config) + .await .with_context(|| { format!( "simulateTransaction RPC failed for chunk {} ({} feeds): {:?}", @@ -429,9 +644,14 @@ impl SwbCranker { )); } - Ok(accounts) + Ok((chunk_index, accounts)) }) - .collect() + .buffer_unordered(SIM_TX_FALLBACK_CONCURRENCY) + .try_collect::>() + .await?; + + indexed.sort_by_key(|(chunk_index, _)| *chunk_index); + Ok(indexed.into_iter().map(|(_, accounts)| accounts).collect()) } } diff --git a/src/wrappers/liquidator_account.rs b/src/wrappers/liquidator_account.rs index 264f62aa..1574c38a 100644 --- a/src/wrappers/liquidator_account.rs +++ b/src/wrappers/liquidator_account.rs @@ -16,7 +16,11 @@ use crate::{ }, metrics::{LIQUIDATION_ATTEMPTS, LIQUIDATION_LATENCY_SECONDS, LIQUIDATION_SUCCESSES}, utils::{ - self, marginfi_account_by_authority, simulation_cache::decode_and_apply_simulated_accounts, + self, marginfi_account_by_authority, + simulation_cache::{ + format_skipped_instruction_entries, is_tx_too_large_client, + simulate_instruction_batches, SimulatedInstructionEntry, + }, swb_cranker::is_stale_swb_price_error, }, wrappers::oracle::OracleWrapper, @@ -31,15 +35,8 @@ use marginfi_type_crate::{ pdas::derive_drift_spot_market, types::{validate_asset_tags, Bank}, }; -use solana_account_decoder::UiAccountEncoding; use solana_client::{ - client_error::{ClientError, ClientErrorKind}, - rpc_client::RpcClient, - rpc_config::{ - RpcSendTransactionConfig, RpcSimulateTransactionAccountsConfig, - RpcSimulateTransactionConfig, - }, - rpc_request::RpcError, + client_error::ClientError, rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig, }; use solana_program::pubkey::Pubkey; @@ -54,7 +51,7 @@ use solana_sdk::{ signature::{Keypair, Signature}, signer::{Signer, SignerError}, system_instruction::transfer, - transaction::{TransactionError, VersionedTransaction}, + transaction::VersionedTransaction, }; use std::{ collections::HashSet, @@ -65,6 +62,8 @@ use std::{ pub const PROFIT_SHARE: f64 = 0.085; const DEFAULT_INTEGRATION_REFRESH_BATCH_HINT: usize = 12; +const INTEGRATION_REFRESH_RPC_MAX_RETRIES: usize = 3; +const INTEGRATION_REFRESH_RPC_RETRY_BASE_DELAY_MS: u64 = 250; #[derive(Debug)] pub enum LiquidationError { @@ -114,18 +113,7 @@ pub struct LiquidatorAccount { integration_refresh_batch_hint: Mutex, } -#[derive(Clone, Debug)] -struct IntegrationRefreshEntry { - kind: &'static str, - address: Pubkey, - instruction: Instruction, -} - -enum RefreshBatchError { - TooLarge(anyhow::Error), - TooManyAccountLocks(anyhow::Error), - Other(anyhow::Error), -} +type IntegrationRefreshEntry = SimulatedInstructionEntry; impl LiquidatorAccount { pub fn new( @@ -302,8 +290,9 @@ impl LiquidatorAccount { I80F48::from_num(self.get_token_balance_for_mint(&liab_mint).unwrap()); if liab_token_balance < dust_liab_threshold { - tokens_in_shortage.insert(liab_mint); - info!("No tokens: {}", liab_mint); + if tokens_in_shortage.insert(liab_mint) { + warn!("No tokens: {}", liab_mint); + } return Err(LiquidationError::NotEnoughFunds); } @@ -902,300 +891,47 @@ impl LiquidatorAccount { if entries.is_empty() { return Ok(()); } - - let mut skipped_entries: Vec = vec![]; - let mut refreshed_batches = 0usize; - let mut offset = 0usize; let stored_batch_hint = *self.integration_refresh_batch_hint.lock().unwrap(); - let mut preferred_batch_size = stored_batch_hint.min(entries.len()).max(1); - let mut resized_down = false; - - while offset < entries.len() { - let remaining = entries.len() - offset; - let mut batch_size = preferred_batch_size.min(remaining).max(1); - - loop { - let batch_entries = entries[offset..offset + batch_size].to_vec(); - match self.simulate_refresh_integrations_batch( - &luts, - batch_entries, - &mut skipped_entries, - ) { - Ok(batch_refreshed_any) => { - if batch_refreshed_any { - refreshed_batches += 1; - } - offset += batch_size; - break; - } - Err(RefreshBatchError::TooLarge(_err)) if batch_size > 1 => { - let new_batch_size = (batch_size / 2).max(1); - debug!( - "Integrations refresh simulation tx too large with {} instructions; retrying with {} instructions", - batch_size, new_batch_size - ); - batch_size = new_batch_size; - preferred_batch_size = new_batch_size; - resized_down = true; - } - Err(RefreshBatchError::TooManyAccountLocks(_err)) if batch_size > 1 => { - let new_batch_size = (batch_size / 2).max(1); - debug!( - "Integrations refresh simulation hit TooManyAccountLocks with {} instructions; retrying with {} instructions", - batch_size, new_batch_size - ); - batch_size = new_batch_size; - preferred_batch_size = new_batch_size; - resized_down = true; - } - Err(RefreshBatchError::TooLarge(err)) => { - let failed_entry = entries[offset].clone(); - warn!( - "Skipping integration refresh instruction {} for {} because tx is too large even as a single instruction: {}", - failed_entry.kind, - failed_entry.address, - err - ); - skipped_entries.push(failed_entry); - offset += 1; - break; - } - Err(RefreshBatchError::TooManyAccountLocks(err)) => { - let failed_entry = entries[offset].clone(); - warn!( - "Skipping integration refresh instruction {} for {} because TooManyAccountLocks persisted even as a single instruction: {}", - failed_entry.kind, - failed_entry.address, - err - ); - skipped_entries.push(failed_entry); - offset += 1; - break; - } - Err(RefreshBatchError::Other(err)) => { - let batch_summary = - format_integration_entries(&entries[offset..offset + batch_size], 10); - return Err(err).with_context(|| { - format!( - "Integrations refresh simulation failed for batch [{}..{}) (size {}, entries: {})", - offset, - offset + batch_size, - batch_size, - batch_summary, - ) - }); - } - } - } - } + let summary = simulate_instruction_batches( + &self.rpc_client, + &self.signer, + &luts, + &self.cu_limit_ix, + &entries, + stored_batch_hint, + INTEGRATION_REFRESH_RPC_MAX_RETRIES, + Duration::from_millis(INTEGRATION_REFRESH_RPC_RETRY_BASE_DELAY_MS), + |address, account| self.cache.oracles.try_update(address, account), + ) + .context("simulate_instruction_batches for integrations refresh failed")?; - if refreshed_batches == 0 { - return Err(anyhow!( - "Integrations refresh simulation failed for all integrations; skipped entries: {}", - format_skipped_integration_entries(&skipped_entries) - )); - } + debug!( + "Integrations refresh simulation completed: refreshed_batches={}, skipped_entries={}", + summary.refreshed_batches, + summary.skipped_entries.len() + ); - if !skipped_entries.is_empty() { + if !summary.skipped_entries.is_empty() { warn!( "Integrations refresh simulation completed while skipping {} failing integrations: {}", - skipped_entries.len(), - format_skipped_integration_entries(&skipped_entries) + summary.skipped_entries.len(), + format_skipped_instruction_entries(&summary.skipped_entries) ); } - if resized_down { + if summary.resized_down { let mut batch_hint = self.integration_refresh_batch_hint.lock().unwrap(); - if preferred_batch_size < *batch_hint { + if summary.preferred_batch_size < *batch_hint { info!( "Reduced integrations refresh batch-size hint from {} to {} after simulation backoff", - *batch_hint, preferred_batch_size + *batch_hint, summary.preferred_batch_size ); - *batch_hint = preferred_batch_size; + *batch_hint = summary.preferred_batch_size; } } Ok(()) } - - fn simulate_refresh_integrations_batch( - &self, - luts: &[AddressLookupTableAccount], - batch_entries: Vec, - skipped_entries: &mut Vec, - ) -> std::result::Result { - let mut entries = batch_entries; - - loop { - if entries.is_empty() { - return Ok(false); - } - - let integration_addresses: Vec = - entries.iter().map(|entry| entry.address).collect(); - let entry_summary = format_integration_entries(&entries, 10); - let mut ixs: Vec = Vec::with_capacity(entries.len() + 1); - ixs.push(self.cu_limit_ix.clone()); - ixs.extend(entries.iter().map(|entry| entry.instruction.clone())); - - let recent_blockhash = self - .rpc_client - .get_latest_blockhash() - .map_err(|err| RefreshBatchError::Other(err.into()))?; - let signer_pk = self.signer.pubkey(); - let msg = Message::try_compile(&signer_pk, &ixs, luts, recent_blockhash) - .map_err(|err| RefreshBatchError::Other(err.into()))?; - let tx = VersionedTransaction::try_new(VersionedMessage::V0(msg), &[&self.signer]) - .map_err(|err| RefreshBatchError::Other(err.into()))?; - - let simulation = self - .rpc_client - .simulate_transaction_with_config( - &tx, - RpcSimulateTransactionConfig { - sig_verify: false, - replace_recent_blockhash: true, - commitment: Some(CommitmentConfig::confirmed()), - accounts: Some(RpcSimulateTransactionAccountsConfig { - encoding: Some(UiAccountEncoding::Base64), - addresses: integration_addresses - .iter() - .map(|pk| pk.to_string()) - .collect(), - }), - ..Default::default() - }, - ) - .map_err(|err| { - if is_tx_too_large_client(&err) { - RefreshBatchError::TooLarge(err.into()) - } else if is_tx_too_many_account_locks_client(&err) { - RefreshBatchError::TooManyAccountLocks(err.into()) - } else { - RefreshBatchError::Other(err.into()) - } - })?; - - if let Some(err) = simulation.value.err.clone() { - if matches!(err, TransactionError::TooManyAccountLocks) { - let logs = format_simulation_logs(simulation.value.logs.as_deref(), 40, 8_000); - return Err(RefreshBatchError::TooManyAccountLocks(anyhow!( - "Integrations refresh simulation hit TooManyAccountLocks for batch of {} entries [{}]; logs={}", - entries.len(), - entry_summary, - logs - ))); - } - - if let TransactionError::InstructionError(ix_index, instruction_error) = &err { - let ix_index = usize::from(*ix_index); - if ix_index > 0 && ix_index <= entries.len() { - let failed_entry = entries.remove(ix_index - 1); - warn!( - "Skipping failing integration refresh instruction {} for {} (tx instruction index {}, error {:?})", - failed_entry.kind, - failed_entry.address, - ix_index, - instruction_error - ); - skipped_entries.push(failed_entry); - continue; - } - } - - let logs = format_simulation_logs(simulation.value.logs.as_deref(), 40, 8_000); - return Err(RefreshBatchError::Other(anyhow!( - "Integrations refresh simulation failed with transaction error: {:?}; batch_size={}; entries=[{}]; logs={}", - err, - entries.len(), - entry_summary, - logs - ))); - } - - let simulated_accounts = simulation.value.accounts.ok_or_else(|| { - RefreshBatchError::Other(anyhow!( - "Integrations refresh simulation did not return post-simulation accounts" - )) - })?; - - if simulated_accounts.len() != integration_addresses.len() { - return Err(RefreshBatchError::Other(anyhow!( - "Integrations refresh simulation returned {} accounts, expected {}", - simulated_accounts.len(), - integration_addresses.len() - ))); - } - - decode_and_apply_simulated_accounts( - &integration_addresses, - &simulated_accounts, - "simulateTransaction integrations refresh", - |address, account| self.cache.oracles.try_update(address, account), - ) - .map_err(RefreshBatchError::Other)?; - - return Ok(true); - } - } -} - -fn format_skipped_integration_entries(entries: &[IntegrationRefreshEntry]) -> String { - if entries.is_empty() { - return "none".to_string(); - } - - entries - .iter() - .map(|entry| format!("{}:{}", entry.kind, entry.address)) - .collect::>() - .join(", ") -} - -fn format_integration_entries(entries: &[IntegrationRefreshEntry], max_entries: usize) -> String { - if entries.is_empty() { - return "none".to_string(); - } - - let mut out = entries - .iter() - .take(max_entries) - .map(|entry| format!("{}:{}", entry.kind, entry.address)) - .collect::>(); - - if entries.len() > max_entries { - out.push(format!( - "...<{} additional entries>", - entries.len() - max_entries - )); - } - - out.join(", ") -} - -fn format_simulation_logs(logs: Option<&[String]>, max_lines: usize, max_chars: usize) -> String { - let Some(logs) = logs else { - return "none".to_string(); - }; - if logs.is_empty() { - return "none".to_string(); - } - - let mut lines: Vec = logs.iter().take(max_lines).cloned().collect(); - if logs.len() > max_lines { - lines.push(format!( - "...<{} additional log lines truncated>", - logs.len() - max_lines - )); - } - - let mut joined = lines.join(" | "); - if joined.len() > max_chars { - joined.truncate(max_chars); - joined.push_str("..."); - } - - joined } fn contains_stale_oracles(stale_oracles: &HashSet, account_oracles: &[Pubkey]) -> bool { @@ -1264,38 +1000,3 @@ mod tests { assert!(contains_stale_oracles(&stale_oracles, &account_oracles)); } } - -pub fn is_tx_too_large_client(err: &ClientError) -> bool { - match err.kind() { - ClientErrorKind::RpcError(rpc) => match rpc { - RpcError::RpcResponseError { code, message, .. } => { - *code == -32602 && message.contains("too large") - } - RpcError::RpcRequestError(msg) | RpcError::ForUser(msg) => { - // Some nodes may proxy this as a plain string - msg.contains("too large") - } - _ => false, - }, - _ => false, - } -} - -fn is_tx_too_many_account_locks_client(err: &ClientError) -> bool { - match err.kind() { - ClientErrorKind::RpcError(rpc) => match rpc { - RpcError::RpcResponseError { message, .. } => { - message.contains("TooManyAccountLocks") - || message - .to_ascii_lowercase() - .contains("too many account locks") - } - RpcError::RpcRequestError(msg) | RpcError::ForUser(msg) => { - msg.contains("TooManyAccountLocks") - || msg.to_ascii_lowercase().contains("too many account locks") - } - _ => false, - }, - _ => false, - } -} From 70d4bc3aab5ad5edf337109d761746cde13b90a8 Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Wed, 22 Apr 2026 16:01:24 +0200 Subject: [PATCH 6/7] clean up marginfi program id usage --- src/cache.rs | 5 +-- src/cache_loader.rs | 13 +++---- src/cli/entrypoints.rs | 7 +--- src/config.rs | 24 +++---------- src/geyser.rs | 16 +++------ src/marginfi_ixs.rs | 58 ++++++++++++------------------ src/utils/mod.rs | 3 +- src/wrappers/liquidator_account.rs | 29 +++------------ 8 files changed, 44 insertions(+), 111 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index de0d63d1..d5f793fb 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -44,7 +44,6 @@ const NEW_ADDRESSES_MAX: usize = 20usize; pub struct Cache { pub signer_pk: Pubkey, - pub marginfi_program_id: Pubkey, pub marginfi_group_address: Pubkey, pub marginfi_accounts: MarginfiAccountsCache, pub banks: BanksCache, @@ -74,16 +73,14 @@ pub struct DriftSpotMarket { impl Cache { pub fn new( signer_pk: Pubkey, - marginfi_program_id: Pubkey, marginfi_group_address: Pubkey, clock: Arc>, ) -> Self { let (global_fee_state_key, _) = - Pubkey::find_program_address(&[FEE_STATE_SEED.as_bytes()], &marginfi_program_id); + Pubkey::find_program_address(&[FEE_STATE_SEED.as_bytes()], &marginfi_type_crate::ID); let luts = Arc::new(Mutex::new(vec![])); Self { signer_pk, - marginfi_program_id, marginfi_group_address, marginfi_accounts: MarginfiAccountsCache::default(), banks: BanksCache::default(), diff --git a/src/cache_loader.rs b/src/cache_loader.rs index 50230b22..1dd69251 100644 --- a/src/cache_loader.rs +++ b/src/cache_loader.rs @@ -105,10 +105,8 @@ impl CacheLoader { fn load_marginfi_accounts(&self, cache: &mut Cache) -> anyhow::Result<()> { info!("Loading marginfi accounts, this may take a few minutes, please be patient!"); let start = std::time::Instant::now(); - let marginfi_accounts_pubkeys = self.load_marginfi_account_addresses( - &cache.marginfi_program_id, - &cache.marginfi_group_address, - )?; + let marginfi_accounts_pubkeys = + self.load_marginfi_account_addresses(&cache.marginfi_group_address)?; info!("Loading marginfi accounts..."); let marginfi_accounts = batch_get_multiple_accounts( @@ -145,12 +143,11 @@ impl CacheLoader { fn load_marginfi_account_addresses( &self, - marginfi_program_id: &Pubkey, marginfi_group_address: &Pubkey, ) -> anyhow::Result> { info!("Loading marginfi account addresses..."); let marginfi_account_addresses = &self.rpc_client.get_program_accounts_with_config( - marginfi_program_id, + &marginfi_type_crate::ID, RpcProgramAccountsConfig { account_config: RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), @@ -195,11 +192,11 @@ impl CacheLoader { Arc::new(Keypair::new()), ); - let program: Program> = anchor_client.program(cache.marginfi_program_id)?; + let program: Program> = anchor_client.program(marginfi_type_crate::ID)?; info!("Loading banks..."); let bank_accounts = program.rpc().get_program_accounts_with_config( - &cache.marginfi_program_id, + &marginfi_type_crate::ID, RpcProgramAccountsConfig { account_config: RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), diff --git a/src/cli/entrypoints.rs b/src/cli/entrypoints.rs index 6ffae27c..2b1434d4 100644 --- a/src/cli/entrypoints.rs +++ b/src/cli/entrypoints.rs @@ -34,12 +34,7 @@ pub fn run_liquidator(config: Eva01Config, stop_liquidator: Arc) -> let mut clock_manager = ClockManager::new(clock.clone(), config.rpc_url.clone())?; info!("Loading Cache..."); - let mut cache = Cache::new( - wallet_pubkey, - config.marginfi_program_id, - config.marginfi_group_key, - clock.clone(), - ); + let mut cache = Cache::new(wallet_pubkey, config.marginfi_group_key, clock.clone()); let cache_loader = CacheLoader::new( &config.wallet_keypair, diff --git a/src/config.rs b/src/config.rs index e3472132..4905477c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,7 +16,6 @@ pub struct Eva01Config { pub yellowstone_x_token: Option, pub wallet_keypair: Vec, pub compute_unit_price_micro_lamports: u64, - pub marginfi_program_id: Pubkey, pub marginfi_group_key: Pubkey, pub address_lookup_tables: Vec, pub min_profit: f64, @@ -56,8 +55,6 @@ impl Eva01Config { .parse() .expect("Invalid COMPUTE_UNIT_PRICE_MICRO_LAMPORTS number"); - let marginfi_program_id = Pubkey::new_from_array(marginfi_type_crate::ID.to_bytes()); - let marginfi_group_key = Pubkey::from_str( &std::env::var("MARGINFI_GROUP_KEY") .expect("MARGINFI_GROUP_KEY environment variable is not set"), @@ -135,7 +132,6 @@ impl Eva01Config { yellowstone_x_token, wallet_keypair, compute_unit_price_micro_lamports, - marginfi_program_id, marginfi_group_key, address_lookup_tables, min_profit, @@ -159,7 +155,10 @@ impl Eva01Config { fn derive_rpc_url(yellowstone_endpoint: &str, yellowstone_x_token: Option<&str>) -> String { let endpoint = yellowstone_endpoint.trim_end_matches('/'); - match yellowstone_x_token.map(str::trim).filter(|token| !token.is_empty()) { + match yellowstone_x_token + .map(str::trim) + .filter(|token| !token.is_empty()) + { Some(token) => format!("{endpoint}/{token}"), None => endpoint.to_string(), } @@ -318,21 +317,6 @@ mod tests { }); } - #[test] - #[serial] - fn test_eva01_config_new_uses_default_marginfi_program_id() { - Jail::expect_with(|mut jail| { - setup_general_env(&mut jail); - setup_rebalancer_env(&mut jail); - let config = Eva01Config::new().unwrap(); - assert_eq!( - config.marginfi_program_id, - Pubkey::new_from_array(marginfi_type_crate::ID.to_bytes()) - ); - Ok(()) - }); - } - #[test] #[serial] fn test_eva01_config_new_derives_rpc_url_from_yellowstone() { diff --git a/src/geyser.rs b/src/geyser.rs index 7746d298..cec76e12 100644 --- a/src/geyser.rs +++ b/src/geyser.rs @@ -133,7 +133,6 @@ pub struct GeyserService { endpoint: String, x_token: Option, tracked_accounts: HashMap, - marginfi_program_id: Pubkey, marginfi_group_pk: Pubkey, geyser_tx: Sender, tokio_rt: Runtime, @@ -160,7 +159,6 @@ impl GeyserService { endpoint: config.yellowstone_endpoint, x_token: config.yellowstone_x_token, tracked_accounts, - marginfi_program_id: config.marginfi_program_id, marginfi_group_pk: config.marginfi_group_key, geyser_tx, tokio_rt, @@ -178,10 +176,7 @@ impl GeyserService { while !self.stop.load(Ordering::Relaxed) { info!("Connecting to Geyser..."); - let sub_req = Self::build_geyser_subscribe_request( - &tracked_accounts_vec, - &self.marginfi_program_id, - ); + let sub_req = Self::build_geyser_subscribe_request(&tracked_accounts_vec); let mut client = self.tokio_rt.block_on( GeyserGrpcClient::build_from_shared(self.endpoint.clone())? .x_token(self.x_token.clone())? @@ -215,7 +210,7 @@ impl GeyserService { continue ); - if account.owner == self.marginfi_program_id + if account.owner == marginfi_type_crate::ID && account_update.data.len() == MARGIN_ACCOUNT_SIZE { let marginfi_account = ward!( @@ -276,10 +271,7 @@ impl GeyserService { } /// Builds a geyser subscription request payload - fn build_geyser_subscribe_request( - tracked_accounts: &[Pubkey], - marginfi_program_id: &Pubkey, - ) -> SubscribeRequest { + fn build_geyser_subscribe_request(tracked_accounts: &[Pubkey]) -> SubscribeRequest { let mut request = SubscribeRequest { ..Default::default() }; @@ -290,7 +282,7 @@ impl GeyserService { }; let marginfi_account_subscription = SubscribeRequestFilterAccounts { - owner: vec![marginfi_program_id.to_string()], + owner: vec![marginfi_type_crate::ID.to_string()], ..Default::default() }; diff --git a/src/marginfi_ixs.rs b/src/marginfi_ixs.rs index 3042eb1f..d422c7de 100644 --- a/src/marginfi_ixs.rs +++ b/src/marginfi_ixs.rs @@ -33,7 +33,6 @@ use crate::{ }; pub fn make_init_liquidation_record_ix( - marginfi_program_id: Pubkey, liquidatee_account: Pubkey, fee_payer: Pubkey, ) -> (Instruction, Pubkey) { @@ -42,7 +41,7 @@ pub fn make_init_liquidation_record_ix( LIQUIDATION_RECORD_SEED.as_bytes(), liquidatee_account.as_ref(), ], - &marginfi_program_id, + &marginfi_type_crate::ID, ); let mut accounts = marginfi::accounts::InitLiquidationRecord { marginfi_account: liquidatee_account, @@ -55,7 +54,7 @@ pub fn make_init_liquidation_record_ix( ( Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::MarginfiAccountInitLiqRecord.data(), }, @@ -64,7 +63,6 @@ pub fn make_init_liquidation_record_ix( } pub fn make_start_liquidate_ix( - marginfi_program_id: Pubkey, liquidatee_account: Pubkey, liquidator_account: Pubkey, liquidation_record: Pubkey, @@ -92,14 +90,13 @@ pub fn make_start_liquidate_ix( participating_accounts.extend(accounts.iter().map(|a| a.pubkey)); Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::StartLiquidation.data(), } } pub fn make_deposit_ix( - marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, signer: Pubkey, @@ -122,7 +119,7 @@ pub fn make_deposit_ix( mark_signer(&mut accounts, signer); Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::LendingAccountDeposit { amount, @@ -134,7 +131,6 @@ pub fn make_deposit_ix( #[allow(clippy::too_many_arguments)] pub fn make_repay_ix( - marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, signer: Pubkey, @@ -160,7 +156,7 @@ pub fn make_repay_ix( participating_accounts.extend(accounts.iter().map(|a| a.pubkey)); Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::LendingAccountRepay { amount, @@ -172,7 +168,6 @@ pub fn make_repay_ix( #[allow(clippy::too_many_arguments)] pub fn make_withdraw_ix( - marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, signer: Pubkey, @@ -191,7 +186,7 @@ pub fn make_withdraw_ix( authority: signer, bank_liquidity_vault_authority: find_bank_liquidity_vault_authority( &bank.address, - &marginfi_program_id, + &marginfi_type_crate::ID, ), bank: bank.address, group: marginfi_group, @@ -216,7 +211,7 @@ pub fn make_withdraw_ix( } Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::LendingAccountWithdraw { amount, @@ -227,7 +222,6 @@ pub fn make_withdraw_ix( } pub fn make_end_liquidate_ix( - marginfi_program_id: Pubkey, liquidatee_account: Pubkey, liquidator_account: Pubkey, liquidation_record: Pubkey, @@ -252,7 +246,7 @@ pub fn make_end_liquidate_ix( participating_accounts.extend(accounts.iter().map(|a| a.pubkey)); Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::EndLiquidation.data(), } @@ -266,13 +260,12 @@ fn maybe_add_bank_mint(accounts: &mut Vec, mint: Pubkey, token_prog } pub fn make_create_ix( - marginfi_program_id: Pubkey, marginfi_group: Pubkey, marginfi_account: Pubkey, signer: Pubkey, ) -> Instruction { Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts: marginfi::accounts::MarginfiAccountInitialize { marginfi_group, marginfi_account, @@ -296,14 +289,12 @@ fn mark_signer( pub fn initialize_marginfi_account( rpc_client: &RpcClient, - marginfi_program_id: Pubkey, marginfi_group: Pubkey, signer_keypair: &Keypair, ) -> anyhow::Result { let marginfi_account_key = Keypair::new(); let ix = make_create_ix( - marginfi_program_id, marginfi_group, marginfi_account_key.pubkey(), signer_keypair.pubkey(), @@ -336,7 +327,6 @@ pub fn initialize_marginfi_account( #[allow(clippy::too_many_arguments)] pub fn make_kamino_withdraw_ix( - marginfi_program_id: Pubkey, group: Pubkey, marginfi_account: Pubkey, authority: Pubkey, @@ -351,7 +341,7 @@ pub fn make_kamino_withdraw_ix( ) -> Instruction { let (reserve_farm_state, obligation_farm_user_state) = if kamino_reserve.reserve.farm_collateral == Pubkey::default() { - (Some(marginfi_program_id), Some(marginfi_program_id)) + (Some(marginfi_type_crate::ID), Some(marginfi_type_crate::ID)) } else { ( Some(kamino_reserve.reserve.farm_collateral), @@ -374,7 +364,7 @@ pub fn make_kamino_withdraw_ix( destination_token_account: mint_wrapper.token, liquidity_vault_authority: find_bank_liquidity_vault_authority( &bank.address, - &marginfi_program_id, + &marginfi_type_crate::ID, ), liquidity_vault: bank.bank.liquidity_vault, integration_acc_2: kamino_obligation, @@ -404,7 +394,7 @@ pub fn make_kamino_withdraw_ix( participating_accounts.extend(accounts.iter().map(|a| a.pubkey)); Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::KaminoWithdraw { amount, @@ -416,7 +406,6 @@ pub fn make_kamino_withdraw_ix( #[allow(clippy::too_many_arguments)] pub fn make_drift_withdraw_ix( - marginfi_program_id: Pubkey, group: Pubkey, marginfi_account: Pubkey, authority: Pubkey, @@ -433,7 +422,7 @@ pub fn make_drift_withdraw_ix( let drift_oracle = if bank.bank.mint == Pubkey::from_str_const("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v") { - Some(marginfi_program_id) + Some(marginfi_type_crate::ID) } else { Some(drift_spot_market.market.oracle) }; @@ -446,7 +435,7 @@ pub fn make_drift_withdraw_ix( drift_oracle, liquidity_vault_authority: find_bank_liquidity_vault_authority( &bank.address, - &marginfi_program_id, + &marginfi_type_crate::ID, ), liquidity_vault: bank.bank.liquidity_vault, destination_token_account: mint_wrapper.token, @@ -457,22 +446,22 @@ pub fn make_drift_withdraw_ix( drift_spot_market_vault: drift_spot_market.market.vault, drift_reward_oracle: reward_spot_market .map(|m| m.market.oracle) - .or(Some(marginfi_program_id)), + .or(Some(marginfi_type_crate::ID)), drift_reward_spot_market: reward_spot_market .map(|m| m.address) - .or(Some(marginfi_program_id)), + .or(Some(marginfi_type_crate::ID)), drift_reward_mint: reward_spot_market .map(|m| m.market.mint) - .or(Some(marginfi_program_id)), + .or(Some(marginfi_type_crate::ID)), drift_reward_oracle_2: reward_spot_market_2 .map(|m| m.market.oracle) - .or(Some(marginfi_program_id)), + .or(Some(marginfi_type_crate::ID)), drift_reward_spot_market_2: reward_spot_market_2 .map(|m| m.address) - .or(Some(marginfi_program_id)), + .or(Some(marginfi_type_crate::ID)), drift_reward_mint_2: reward_spot_market_2 .map(|m| m.market.mint) - .or(Some(marginfi_program_id)), + .or(Some(marginfi_type_crate::ID)), drift_signer: derive_drift_signer().0, mint: bank.bank.mint, drift_program: Drift::id(), @@ -492,7 +481,7 @@ pub fn make_drift_withdraw_ix( participating_accounts.extend(accounts.iter().map(|a| a.pubkey)); Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::DriftWithdraw { amount, @@ -504,7 +493,6 @@ pub fn make_drift_withdraw_ix( #[allow(clippy::too_many_arguments)] pub fn make_juplend_withdraw_ix( - marginfi_program_id: Pubkey, group: Pubkey, marginfi_account: Pubkey, authority: Pubkey, @@ -524,7 +512,7 @@ pub fn make_juplend_withdraw_ix( destination_token_account: mint_wrapper.token, liquidity_vault_authority: find_bank_liquidity_vault_authority( &bank.address, - &marginfi_program_id, + &marginfi_type_crate::ID, ), mint: bank.bank.mint, f_token_mint: lending_state.f_token_mint, @@ -557,7 +545,7 @@ pub fn make_juplend_withdraw_ix( participating_accounts.extend(accounts.iter().map(|a| a.pubkey)); Instruction { - program_id: marginfi_program_id, + program_id: marginfi_type_crate::ID, accounts, data: marginfi::instruction::JuplendWithdraw { amount, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 09ef3a07..63300691 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -252,11 +252,10 @@ pub fn format_error_chain(err: &Error) -> String { pub fn marginfi_account_by_authority( authority: Pubkey, rpc_client: &RpcClient, - marginfi_program_id: Pubkey, marginfi_group_id: Pubkey, ) -> anyhow::Result> { let marginfi_account_address = rpc_client.get_program_accounts_with_config( - &marginfi_program_id, + &marginfi_type_crate::ID, RpcProgramAccountsConfig { account_config: RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), diff --git a/src/wrappers/liquidator_account.rs b/src/wrappers/liquidator_account.rs index 1574c38a..8cb6978e 100644 --- a/src/wrappers/liquidator_account.rs +++ b/src/wrappers/liquidator_account.rs @@ -104,7 +104,6 @@ pub struct PreparedLiquidatableAccount { pub struct LiquidatorAccount { pub liquidator_address: Pubkey, pub signer: Keypair, - program_id: Pubkey, group: Pubkey, preferred_mint_bank: Pubkey, rpc_client: RpcClient, @@ -126,12 +125,8 @@ impl LiquidatorAccount { let rpc_client = RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::confirmed()); - let accounts = marginfi_account_by_authority( - signer.pubkey(), - &rpc_client, - config.marginfi_program_id, - marginfi_group_id, - )?; + let accounts = + marginfi_account_by_authority(signer.pubkey(), &rpc_client, marginfi_group_id)?; info!( "Found {} MarginFi accounts for the provided signer: {:?}", accounts.len(), @@ -140,12 +135,8 @@ impl LiquidatorAccount { let liquidator_address = if accounts.is_empty() { info!("No MarginFi account found for the provided signer. Creating it..."); - let liquidator_marginfi_account = initialize_marginfi_account( - &rpc_client, - config.marginfi_program_id, - marginfi_group_id, - &signer, - )?; + let liquidator_marginfi_account = + initialize_marginfi_account(&rpc_client, marginfi_group_id, &signer)?; while cache .marginfi_accounts @@ -166,7 +157,6 @@ impl LiquidatorAccount { Ok(Self { liquidator_address, signer, - program_id: config.marginfi_program_id, group: marginfi_group_id, preferred_mint_bank, rpc_client, @@ -184,7 +174,7 @@ impl LiquidatorAccount { let signer_pk = self.signer.pubkey(); let (init_ix, liquidation_record) = - make_init_liquidation_record_ix(self.program_id, liquidatee_account.address, signer_pk); + make_init_liquidation_record_ix(liquidatee_account.address, signer_pk); let recent_blockhash = self .rpc_client @@ -356,7 +346,6 @@ impl LiquidatorAccount { // TODO: think about posting an swb_crank ix here let start_ix = make_start_liquidate_ix( - self.program_id, liquidatee_account_address, signer_pk, liquidation_record, @@ -421,7 +410,6 @@ impl LiquidatorAccount { let withdraw_ix = match asset_bank_wrapper.bank.config.asset_tag { ASSET_TAG_DEFAULT | ASSET_TAG_SOL => make_withdraw_ix( - self.program_id, self.group, liquidatee_account_address, signer_pk, @@ -447,7 +435,6 @@ impl LiquidatorAccount { ixs.push(refresh_obligation_ix); make_kamino_withdraw_ix( - self.program_id, self.group, liquidatee_account_address, signer_pk, @@ -467,7 +454,6 @@ impl LiquidatorAccount { .map_err(LiquidationError::from_anyhow)?; make_drift_withdraw_ix( - self.program_id, self.group, liquidatee_account_address, signer_pk, @@ -489,7 +475,6 @@ impl LiquidatorAccount { .map_err(LiquidationError::from_anyhow)?; make_juplend_withdraw_ix( - self.program_id, self.group, liquidatee_account_address, signer_pk, @@ -518,7 +503,6 @@ impl LiquidatorAccount { .try_get_account(&liab_mint) .map_err(LiquidationError::from_anyhow)?; let repay_ix = make_repay_ix( - self.program_id, self.group, liquidatee_account_address, signer_pk, @@ -531,7 +515,6 @@ impl LiquidatorAccount { ixs.push(repay_ix); let end_ix = make_end_liquidate_ix( - self.program_id, liquidatee_account_address, signer_pk, liquidation_record, @@ -678,7 +661,6 @@ impl LiquidatorAccount { let mint_wrapper = self.cache.mints.try_get_account(&bank.bank.mint)?; let withdraw_ix = make_withdraw_ix( - self.program_id, self.group, marginfi_account, signer_pk, @@ -732,7 +714,6 @@ impl LiquidatorAccount { let mint = bank.bank.mint; let token_account = self.cache.tokens.try_get_token_for_mint(&mint)?; let deposit_ix = make_deposit_ix( - self.program_id, self.group, marginfi_account, signer_pk, From dafa6fd79606f40c7dac1ac62c134c0864a46c4b Mon Sep 17 00:00:00 2001 From: Ilia Zyrin Date: Wed, 22 Apr 2026 16:04:22 +0200 Subject: [PATCH 7/7] re-enable clippy --- run-eva.sh | 2 +- src/utils/simulation_cache.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/run-eva.sh b/run-eva.sh index a0cba840..851b7515 100755 --- a/run-eva.sh +++ b/run-eva.sh @@ -12,7 +12,7 @@ export RUST_BACKTRACE=0 # Run all steps cargo fmt -# cargo clippy -- -D warnings +cargo clippy -- -D warnings cargo build --bin eva01 --package eva01 cargo run --bin eva01 --features pretty_logs diff --git a/src/utils/simulation_cache.rs b/src/utils/simulation_cache.rs index c0a4e994..cfbfd5be 100644 --- a/src/utils/simulation_cache.rs +++ b/src/utils/simulation_cache.rs @@ -77,6 +77,7 @@ where Ok(()) } +#[allow(clippy::too_many_arguments)] pub fn simulate_instruction_batches( rpc_client: &RpcClient, signer: &Keypair, @@ -206,6 +207,7 @@ where }) } +#[allow(clippy::too_many_arguments)] fn simulate_instruction_batch( rpc_client: &RpcClient, signer: &Keypair,