Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions benches/elasticsearch-frozen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,48 @@ python3 phase_b_frozen.py # cold frozen-search latency -> results
bash phase_d_recompact.sh bench-standard-default # snapshot zstd-3 -> s4 recompact zstd-19 -> verify restore
```

## 5. Revision phases (2026-06-19) — RTT / sidecar / HA / break-even

These are **non-destructive additions** layered on top of A–D; they don't change
the A–D outputs. All are env-parameterised (defaults reproduce the canonical
`localhost:9200`/`:9000` stack; override `ES_URL` / `MINIO_URL` etc. for an
isolated stack). Raw outputs + a one-page summary land in `results/`.

```bash
# B3 break-even model (pure arithmetic on the measured saved_ratio; no infra)
python3 breakeven.py --s4-host-usd-month 70 --instances 2 --out results/breakeven.json

# B1 cold latency under injected backend RTT (needs a toxiproxy in front of the
# object store + a dedicated S4 instance upstream of it + ES clients tdirect/ts4z3;
# the .sh prints the exact prereqs). Per-connection latency proxy is used on
# purpose — NOT a global `tc netem` (that would perturb co-tenant processes).
bash phase_b1_rtt.sh # -> results/rtt-injection.json

# B2 .s4index sidecar cold-path overhead (backend GETs per cold query, S4 vs a
# passthrough-codec baseline; counts ops from S4's structured op log — run the
# S4 instances with structured logging on stdout).
python3 phase_b2_sidecar.py # -> results/sidecar-overhead.json

# B4 HA failover smoke (starts 2 stateless S4 instances + an nginx round-robin
# upstream, registers a repo through the LB, kills one instance).
bash phase_b4_ha.sh # -> results/ha-failover.json

# B5 recompact concurrency: documented-not-tested (running recompact concurrently
# with ES snapshot/_cleanup on the same repo is unsafe by the tool's own TOCTOU
# admission — see results/recompact-concurrency.json; no script, by design).
```

> **nginx + SigV4:** the B4 LB must preserve the client Host header
> (`proxy_set_header Host $http_host`) — AWS SigV4 signs Host, so rewriting it to
> the upstream name returns 403 SignatureDoesNotMatch. `phase_b4_ha.sh` does this.

See `results/REVISION-NOTES.md` for what each phase found and what's still TODO.

## Cleanup

```bash
docker rm -f esfrozen-es esfrozen-minio
pkill -f 'target/release/s4 .*--port=801'
docker rm -f esfrozen-es esfrozen-minio esfrozen-toxiproxy esfrozen-nginx
pkill -f 'target/release/s4 .*--port=80'
```

## Notes / gotchas (encountered building this)
Expand Down
152 changes: 152 additions & 0 deletions benches/elasticsearch-frozen/breakeven.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""Break-even model for fronting an Elasticsearch frozen-tier S3 repository with S4.

S4 saves storage but costs a host (the gateway is a separate line item not modeled
by the raw byte counts). This computes how large the frozen repository has to be
before the storage saving covers the host cost.

Model
-----
Monthly storage saving: saved_ratio * size_TB * PRICE_PER_TB_MONTH ($/mo)
Monthly host cost: host_usd_month * instances ($/mo)

Break-even (net zero):
saved_ratio * TB * PRICE_PER_TB_MONTH = host_usd_month * instances
=> break_even_TB = (host_usd_month * instances) / (saved_ratio * PRICE_PER_TB_MONTH)

PRICE_PER_TB_MONTH defaults to 23 ($0.023/GB-month S3 Standard, us-east-1, * 1000).
`saved_ratio` is the *measured* repository-byte saving from this harness
(phase_a.json), NOT a codec micro-benchmark figure.

Everything here is arithmetic on the measured saved_ratio + an explicit,
parameterised host price. No new measurement is invented. Output is raw JSON to
stdout (and optionally a file) plus a human-readable summary on stderr.
"""
import argparse
import json
import sys

# Measured repository-byte savings from benches/.../results/phase_a.json
# (S4 zstd-3 vs direct, the default PUT codec). These are the authoritative
# 2026-06-18 numbers; keep them in sync with phase_a.json.
SAVED_RATIO = {
"standard-default": 0.270, # 1440.8 -> 1051.2 MB
"best_compression": 0.148, # 1057.6 -> 901.3 MB
"logsdb": 0.222, # 660.9 -> 514.3 MB
}
# zstd-19 via `s4 recompact` on the standard-default repo (phase_d evidence).
SAVED_RATIO_RECOMPACT = {
"standard-default-zstd19": 0.332, # 1440.8 -> 962.0 MB
}


def break_even_tb(host_usd_month, instances, saved_ratio, price_per_tb_month):
denom = saved_ratio * price_per_tb_month
if denom <= 0:
return float("inf")
return (host_usd_month * instances) / denom


def net_savings_per_month(size_tb, saved_ratio, price_per_tb_month,
host_usd_month, instances):
storage_saved = saved_ratio * size_tb * price_per_tb_month
host_cost = host_usd_month * instances
return storage_saved - host_cost, storage_saved, host_cost


def main():
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--s4-host-usd-month", type=float, default=70.0,
help="Monthly cost of ONE S4 gateway host (default 70; "
"~a t3.large/c-family on-demand, parameterise to your fleet)")
ap.add_argument("--instances", type=int, default=1,
help="S4 instances in the fleet (HA uses >=2 stateless instances "
"behind a load balancer; sidecars live in S3 so instances are "
"stateless). Default 1 (non-HA).")
ap.add_argument("--price-per-tb-month", type=float, default=23.0,
help="S3 storage $/TB-month (default 23 = $0.023/GB-mo Standard)")
ap.add_argument("--scales-tb", type=float, nargs="+", default=[50, 100, 500, 1000],
help="Repository sizes (TB) to report net savings for "
"(default 50 100 500 1000)")
ap.add_argument("--out", help="Also write the JSON to this path")
args = ap.parse_args()

scenarios = {}
all_ratios = {**SAVED_RATIO, **SAVED_RATIO_RECOMPACT}
for name, ratio in all_ratios.items():
be1 = break_even_tb(args.s4_host_usd_month, 1, ratio, args.price_per_tb_month)
beN = break_even_tb(args.s4_host_usd_month, args.instances, ratio,
args.price_per_tb_month)
scales = {}
for tb in args.scales_tb:
net1, saved1, host1 = net_savings_per_month(
tb, ratio, args.price_per_tb_month, args.s4_host_usd_month, 1)
netN, savedN, hostN = net_savings_per_month(
tb, ratio, args.price_per_tb_month, args.s4_host_usd_month, args.instances)
scales[f"{int(tb)}TB"] = {
"non_ha_1_instance": {
"storage_saved_usd_mo": round(saved1, 2),
"host_cost_usd_mo": round(host1, 2),
"net_usd_mo": round(net1, 2),
"net_usd_yr": round(net1 * 12, 2),
"net_positive": net1 > 0,
},
f"ha_{args.instances}_instances": {
"storage_saved_usd_mo": round(savedN, 2),
"host_cost_usd_mo": round(hostN, 2),
"net_usd_mo": round(netN, 2),
"net_usd_yr": round(netN * 12, 2),
"net_positive": netN > 0,
},
}
scenarios[name] = {
"saved_ratio": ratio,
"break_even_TB_non_ha_1_instance": round(be1, 2),
f"break_even_TB_ha_{args.instances}_instances": round(beN, 2),
"scales": scales,
}

out = {
"model": "break_even_TB = host_usd_month * instances / (saved_ratio * price_per_TB_month)",
"params": {
"s4_host_usd_month": args.s4_host_usd_month,
"instances": args.instances,
"price_per_tb_month": args.price_per_tb_month,
"scales_tb": args.scales_tb,
},
"source": "saved_ratio from benches/elasticsearch-frozen/results/phase_a.json "
"(2026-06-18); host price is the parameterised --s4-host-usd-month",
"scenarios": scenarios,
}

js = json.dumps(out, indent=2)
print(js)
if args.out:
with open(args.out, "w") as f:
f.write(js + "\n")

# Human summary on stderr (doesn't pollute the JSON on stdout).
def fmt(name, sc):
be1 = sc["break_even_TB_non_ha_1_instance"]
beN = sc[f"break_even_TB_ha_{args.instances}_instances"]
print(f" {name:26s} saved={sc['saved_ratio']*100:4.1f}% "
f"break-even: {be1:6.2f} TB (1 inst) / {beN:6.2f} TB ({args.instances} inst)",
file=sys.stderr)
print(f"\nbreak-even @ ${args.s4_host_usd_month:.0f}/mo per host, "
f"${args.price_per_tb_month:.0f}/TB-mo S3:", file=sys.stderr)
for name, sc in scenarios.items():
fmt(name, sc)
print("\nnet savings (HA = {} instances):".format(args.instances), file=sys.stderr)
for name, sc in scenarios.items():
for scale in ("500TB", "1000TB"):
if scale in sc["scales"]:
ha = sc["scales"][scale][f"ha_{args.instances}_instances"]
print(f" {name:26s} {scale:>7}: ${ha['net_usd_mo']:>10,.0f}/mo "
f"= ${ha['net_usd_yr']:>12,.0f}/yr net "
f"({'positive' if ha['net_positive'] else 'NEGATIVE'})",
file=sys.stderr)


if __name__ == "__main__":
main()
195 changes: 195 additions & 0 deletions benches/elasticsearch-frozen/phase_b1_rtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""Phase B1: cold frozen-search latency under injected backend RTT.

The headline cold-search numbers in phase_b.json are measured against a
co-located MinIO with effectively zero network RTT, so their *absolute* values
(2-4 ms for analytics, ~1.7-7.2 s for cold top-N+sort) are a property of THIS
host, not of any real S3 deployment. The transferable metric is S4's *relative*
overhead. This phase injects a one-way delay on the S4<->backend path and shows
the relative overhead stays ~RTT-invariant: S4 adds a roughly fixed fraction
regardless of how slow the backend leg is.

Method
------
A toxiproxy proxy sits in front of the object store. We compare two repository
clients that BOTH traverse that proxy (so both eat the same injected RTT):
- tdirect : ES -> toxiproxy -> MinIO (no S4)
- ts4z3 : ES -> S4 (zstd-3) -> toxiproxy -> MinIO
For each injected one-way delay we re-mount the snapshot, clear the shared
cache before every query (cold), and record server-side `took`. Relative
overhead = (s4 - direct) / direct.

Env (defaults reproduce the canonical local run):
ES_URL default http://localhost:9200
TOXI_URL default http://localhost:8474 (toxiproxy admin API)
TOXI_PROXY default minio (proxy name to add latency to)
REPO_TDIRECT default tdirect (s3 client traversing toxiproxy, no S4)
REPO_TS4Z3 default ts4z3 (s3 client: S4 zstd-3 upstream=toxiproxy)
BUCKET_TDIRECT default esrev-direct
BUCKET_TS4Z3 default esrev-s4z3
RTT_MS default "0,5,20,50" one-way delays (ms) to sweep
INDEX default bench-standard-default
COLD_REPS default 5
"""
import json, os, time, statistics, subprocess, urllib.request, urllib.error

ES = os.environ.get("ES_URL", "http://localhost:9200")
TOXI = os.environ.get("TOXI_URL", "http://localhost:8474")
PROXY = os.environ.get("TOXI_PROXY", "minio")
REPO_TDIRECT_CLIENT = os.environ.get("REPO_TDIRECT", "tdirect")
REPO_TS4_CLIENT = os.environ.get("REPO_TS4Z3", "ts4z3")
BUCKET_TDIRECT = os.environ.get("BUCKET_TDIRECT", "esrev-direct")
BUCKET_TS4 = os.environ.get("BUCKET_TS4Z3", "esrev-s4z3")
RTT_MS = [int(x) for x in os.environ.get("RTT_MS", "0,5,20,50").split(",")]
INDEX = os.environ.get("INDEX", "bench-standard-default")
COLD_REPS = int(os.environ.get("COLD_REPS", "5"))
MINIO = os.environ.get("MINIO_URL", "http://localhost:9100")
ENV = dict(os.environ, AWS_ACCESS_KEY_ID="minioadmin", AWS_SECRET_ACCESS_KEY="minioadmin",
AWS_REGION="us-east-1", AWS_REQUEST_CHECKSUM_CALCULATION="when_required",
AWS_RESPONSE_CHECKSUM_VALIDATION="when_required")


def es(method, path, body=None):
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(f"{ES}{path}", data=data,
headers={"Content-Type": "application/json"}, method=method)
try:
with urllib.request.urlopen(req) as r:
return json.load(r)
except urllib.error.HTTPError as e:
return {"_error": e.code, "_body": e.read().decode()[:300]}


def toxi(method, path, body=None):
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(f"{TOXI}{path}", data=data,
headers={"Content-Type": "application/json"}, method=method)
try:
with urllib.request.urlopen(req) as r:
return json.load(r) if r.length != 0 else {}
except urllib.error.HTTPError as e:
return {"_error": e.code, "_body": e.read().decode()[:300]}


def set_latency(one_way_ms):
# remove any existing latency toxic, then add the new one (both directions).
toxi("DELETE", f"/proxies/{PROXY}/toxics/latency_down")
toxi("DELETE", f"/proxies/{PROXY}/toxics/latency_up")
if one_way_ms > 0:
toxi("POST", f"/proxies/{PROXY}/toxics",
{"name": "latency_down", "type": "latency", "stream": "downstream",
"attributes": {"latency": one_way_ms, "jitter": 0}})
toxi("POST", f"/proxies/{PROXY}/toxics",
{"name": "latency_up", "type": "latency", "stream": "upstream",
"attributes": {"latency": one_way_ms, "jitter": 0}})


def aws(*args):
return subprocess.run(["aws", "--endpoint-url", MINIO, *args], env=ENV,
capture_output=True, text=True)


QUERIES = {
"term_rare(status:500)": {"size": 0, "track_total_hits": True,
"query": {"term": {"http.response.status_code": 500}}},
"agg(date_hist+terms)": {"size": 0,
"aggs": {"svc": {"terms": {"field": "service.name", "size": 10}},
"t": {"date_histogram": {"field": "@timestamp", "fixed_interval": "1h"}}}},
"fulltext(message:items)": {"size": 0, "track_total_hits": True,
"query": {"match": {"message": "items"}}},
"topN(level:ERROR sort ts)": {"size": 20, "query": {"term": {"log.level": "ERROR"}},
"sort": [{"@timestamp": "desc"}]},
}


def clear_cache():
es("POST", "/_searchable_snapshots/cache/clear")


def run_query(idx, body):
r = es("GET", f"/{idx}/_search", body)
return r.get("took", -1)


def mount(repo_name, client, bucket, label):
# fresh snapshot through this client onto its bucket, then frozen-mount.
es("DELETE", f"/_snapshot/{repo_name}")
aws("s3", "rm", f"s3://{bucket}", "--recursive")
es("PUT", f"/_snapshot/{repo_name}", {"type": "s3", "settings": {
"bucket": bucket, "client": client,
"max_snapshot_bytes_per_sec": "-1", "max_restore_bytes_per_sec": "-1"}})
snap = f"b1-{INDEX}"
es("PUT", f"/_snapshot/{repo_name}/{snap}?wait_for_completion=true",
{"indices": INDEX, "include_global_state": False})
frozen = f"frozenb1-{label}-{INDEX}".lower().replace(" ", "").replace("-", "x")
es("DELETE", f"/{frozen}")
m = es("POST", f"/_snapshot/{repo_name}/{snap}/_mount?wait_for_completion=true&storage=shared_cache",
{"index": INDEX, "renamed_index": frozen})
if "_error" in m:
print(f"MOUNT FAIL {label}: {m}", flush=True)
return None
for _ in range(30):
h = es("GET", f"/_cluster/health/{frozen}?wait_for_status=yellow&timeout=5s")
if h.get("status") in ("yellow", "green"):
break
time.sleep(1)
return frozen


results = []
for rtt in RTT_MS:
set_latency(rtt)
time.sleep(0.5)
per_arm = {}
for client, bucket, label in [(REPO_TDIRECT_CLIENT, BUCKET_TDIRECT, "direct"),
(REPO_TS4_CLIENT, BUCKET_TS4, "S4 zstd-3")]:
repo = f"b1_{label.replace(' ', '').replace('-', '')}".lower()
frozen = mount(repo, client, bucket, label)
if frozen is None:
per_arm[label] = None
continue
qmeds = {}
for qname, qbody in QUERIES.items():
cold = []
for _ in range(COLD_REPS):
clear_cache(); time.sleep(0.3)
t = run_query(frozen, qbody)
if t >= 0:
cold.append(t)
qmeds[qname] = round(statistics.median(cold), 1) if cold else None
per_arm[label] = qmeds
es("DELETE", f"/{frozen}")
clear_cache()
# compute relative overhead per query
direct = per_arm.get("direct") or {}
s4 = per_arm.get("S4 zstd-3") or {}
for qname in QUERIES:
d = direct.get(qname)
s = s4.get(qname)
rel = None
if d and s and d > 0:
rel = round((s - d) / d * 100, 1)
rec = {"rtt_one_way_ms": rtt, "index": INDEX, "query": qname,
"direct_cold_ms_median": d, "s4_cold_ms_median": s,
"rel_overhead_pct": rel}
results.append(rec)
print(f"rtt={rtt:3d}ms {qname:26s} direct={str(d):>7} s4={str(s):>7} "
f"rel={'' if rel is None else f'{rel:+.1f}%'}", flush=True)

# clean up latency toxics
set_latency(0)

out = {
"measurement": "B1 - cold frozen-search latency under injected backend RTT",
"method": ("toxiproxy injects symmetric one-way delay on the object-store leg; "
"both arms (direct, S4 zstd-3) traverse it so they eat the same RTT. "
"Shared cache cleared before every query (cold). Relative overhead = "
"(s4 - direct)/direct. Local MinIO base, no AWS billing."),
"host": "AMD Ryzen 9 9950X, ES 9.4.2, MinIO RELEASE.2025-09-07, S4 v1.2.2",
"rtt_levels_one_way_ms": RTT_MS,
"rows": results,
}
os.makedirs("./results", exist_ok=True)
with open("./results/rtt-injection.json", "w") as f:
json.dump(out, f, indent=2)
print("\nwrote results/rtt-injection.json", flush=True)
Loading
Loading