From 88d31034bee511a8dc45398c442d49ba3d17c192 Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Tue, 2 Jun 2026 06:00:22 +0530 Subject: [PATCH 1/5] feat(inference): tiered KV cache offload (native + mooncake, CPU/disk) Replace the flat `inference.kv_cache_offload` (single `cpu_bytes`) with a typed, backend-swappable, tiered config discriminated on `type`: - native: vLLM `OffloadingConnector` (cpu) / `TieringOffloadingSpec` (cpu+disk, fs_python secondary). Fully self-contained. - mooncake: per-node standalone-store (`mooncake_master` + `mooncake_client`, RDMA), cpu and/or disk tiers. Composable `cpu`/`disk` tiers (cpu required, disk-only rejected); `disk.num_bytes` is mooncake-only (native fs tier is filesystem-bounded; warns if set on native). Centralize the vLLM `kv_transfer_config` build in `InferenceConfig.build_kv_transfer_config` (NIXL transfer + offload composed via `MultiConnector`); both SLURM templates stop hand-rolling the JSON and launch the node-local Mooncake store. New `src/prime_rl/inference/mooncake.py` launches the master+client and writes `MOONCAKE_CONFIG_PATH`; `server.py` brings it up for local runs. Verified: - Single-node (Qwen3-0.6B, H200): all four configs boot + serve; native-cpu and mooncake-rdma both round-trip KV (store ~1.1GB / reload ~205MB). - Multi-node 3-node SLURM disaggregated (Qwen3-30B-A3B): NIXL P/D transfer serves coherent output; with mooncake offload, MultiConnector[Nixl, MooncakeStore] + per-node RDMA store serves coherent output. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/inference.md | 32 ++- .../src/prime_rl/configs/inference.py | 139 ++++++++-- pyproject.toml | 1 + src/prime_rl/entrypoints/inference.py | 2 +- src/prime_rl/entrypoints/rl.py | 8 +- src/prime_rl/inference/mooncake.py | 255 ++++++++++++++++++ src/prime_rl/inference/vllm/server.py | 31 ++- src/prime_rl/templates/inference.sbatch.j2 | 33 +-- .../templates/multi_node_rl.sbatch.j2 | 40 ++- uv.lock | 17 +- 10 files changed, 485 insertions(+), 73 deletions(-) create mode 100644 src/prime_rl/inference/mooncake.py diff --git a/docs/inference.md b/docs/inference.md index 0a62d5b3bf..474131d297 100644 --- a/docs/inference.md +++ b/docs/inference.md @@ -192,16 +192,38 @@ X-Session-ID = "trajectory_id" # this is the default - each rollout has a unique ### KV Cache Offload -Maximizing KV-Cache space is crucial to support high-concurrency workloads. We allow you to offload the KV cache to CPU memory, which can increase the space 10-fold in some cases. You can configure the amount of CPU memory to use for the KV cache by setting `inference.deployment.kv_cache_offload.cpu_bytes`. +Maximizing KV-Cache space is crucial to support high-concurrency workloads. You can offload the KV cache to CPU memory (and, behind it, disk) by setting `inference.kv_cache_offload`. It is a discriminated config with two composable tiers, `cpu` and `disk`: a `cpu` tier is always required, and an optional `disk` tier is layered behind it (GPU → DRAM → disk). Disk-only is not supported. + +The `type` field selects the backend: + +- `native` — vLLM's built-in offloading. CPU-only uses `OffloadingConnector`; CPU+disk uses `TieringOffloadingSpec` (a CPU primary tier with a filesystem secondary tier). Fully self-contained — no extra processes. +- `mooncake` — a per-node [Mooncake](https://github.com/kvcache-ai/Mooncake) distributed store. The launcher starts a node-local `mooncake_master` + `mooncake_client`; the client owns the pool (a DRAM segment, plus a file-backed disk tier when `disk` is set). ```toml +# Native CPU offload (reserves 128GB of CPU KV cache for this instance) [inference.kv_cache_offload] -cpu_bytes = 128_000_000_000 # 128GB -``` +type = "native" +[inference.kv_cache_offload.cpu] +num_bytes = 128_000_000_000 # 128GB -This will reserve 128GB of CPU memory per worker. If you use dp=8, this will reserve 1TB of CPU memory per node. +# Native CPU + disk tiering (self-contained) +[inference.kv_cache_offload] +type = "native" +[inference.kv_cache_offload.cpu] +num_bytes = 128_000_000_000 +[inference.kv_cache_offload.disk] +path = "/scratch/kv" # disk capacity is bounded by the filesystem + +# Mooncake CPU + disk (per-node distributed store, RDMA) +[inference.kv_cache_offload] +type = "mooncake" +[inference.kv_cache_offload.cpu] +num_bytes = 128_000_000_000 +[inference.kv_cache_offload.disk] +path = "/scratch/kv" +``` -We aim to support more offloading options in the future, such as multi-tier offloading to also utilize disk-based KV cache, or distributed storage options like Mooncake Connector. +For `native`, `cpu.num_bytes` is the aggregate CPU KV pool for the instance (vLLM shards it across workers). For `mooncake`, `cpu.num_bytes` is the per-node store client's DRAM segment (the store uses RDMA, so it requires an RDMA-capable fabric). Enabling offload automatically enables prefix caching. ### Optimized P/D disaggregation deployment diff --git a/packages/prime-rl-configs/src/prime_rl/configs/inference.py b/packages/prime-rl-configs/src/prime_rl/configs/inference.py index 122363af51..84aa764235 100644 --- a/packages/prime-rl-configs/src/prime_rl/configs/inference.py +++ b/packages/prime-rl-configs/src/prime_rl/configs/inference.py @@ -1,3 +1,4 @@ +import warnings from argparse import Namespace from pathlib import Path from typing import Annotated, Any, Literal, TypeAlias @@ -86,9 +87,91 @@ class WeightBroadcastConfig(BaseConfig): """Weight broadcast transport.""" -class KVCacheOffloadConfig(BaseConfig): - cpu_bytes: int = Field(1_000_000_000, gt=0) - """CPU bytes available for KV cache offloading per worker.""" +class CPUOffloadTier(BaseConfig): + num_bytes: int = Field(..., gt=0) + """CPU/DRAM offload capacity. For the ``native`` backend this is vLLM's aggregate ``cpu_bytes_to_use`` (scaled across workers internally). For the ``mooncake`` backend this is the per-node store client's DRAM segment (``-global_segment_size``).""" + + +class DiskOffloadTier(BaseConfig): + path: Path + """Filesystem root for the disk tier. For ``native`` this is the ``fs_python`` secondary tier's ``root_dir``; for ``mooncake`` it is the store client's ``MOONCAKE_OFFLOAD_FILE_STORAGE_PATH``.""" + + num_bytes: int | None = Field(None, gt=0) + """Advisory per-node disk pool size. Neither backend enforces a hard disk quota — the native ``fs_python`` tier and Mooncake's file offload are both bounded by the filesystem at ``path``.""" + + +class BaseKVCacheOffloadConfig(BaseConfig): + cpu: CPUOffloadTier | None = None + """CPU/DRAM offload tier. Always required — disk-only offload is not supported.""" + + disk: DiskOffloadTier | None = None + """Optional disk tier, layered behind the CPU tier (GPU → DRAM → disk).""" + + @model_validator(mode="after") + def valid_tiers(self): + # Both backends support only two shapes: cpu-only or cpu+disk. Native disk + # tiering needs a CPU primary tier; Mooncake standalone-store needs a DRAM + # staging tier. Disk-only is rejected for both. + if self.cpu is None: + raise ValueError("inference.kv_cache_offload requires a cpu tier (disk-only offload is not supported).") + return self + + +class NativeKVCacheOffloadConfig(BaseKVCacheOffloadConfig): + type: Literal["native"] = "native" + """vLLM-native offloading. cpu-only uses ``OffloadingConnector`` + ``CPUOffloadingSpec``; cpu+disk uses ``TieringOffloadingSpec`` (CPU primary tier + ``fs_python`` disk secondary). Fully self-contained — no external processes.""" + + @model_validator(mode="after") + def warn_ignored_disk_capacity(self): + if self.disk is not None and self.disk.num_bytes is not None: + warnings.warn( + "inference.kv_cache_offload.disk.num_bytes is ignored by the native backend: the " + "fs_python disk tier has no quota and is bounded by the filesystem at disk.path. " + "Set num_bytes only with the mooncake backend.", + stacklevel=1, + ) + return self + + def to_connector_dict(self) -> dict[str, Any]: + assert self.cpu is not None + extra: dict[str, Any] = {"cpu_bytes_to_use": int(self.cpu.num_bytes)} + if self.disk is not None: + extra["spec_name"] = "TieringOffloadingSpec" + extra["secondary_tiers"] = [{"type": "fs_python", "root_dir": str(self.disk.path)}] + return { + "kv_connector": "OffloadingConnector", + "kv_role": "kv_both", + "kv_connector_extra_config": extra, + } + + +class MooncakeKVCacheOffloadConfig(BaseKVCacheOffloadConfig): + type: Literal["mooncake"] = "mooncake" + """Mooncake distributed store offloading (standalone-store topology). A per-node ``mooncake_master`` + ``mooncake_client`` own the pool and vLLM workers are clients. The cpu tier sets the client's DRAM segment; the optional disk tier adds the client's SSD tier.""" + + master_server_address: str | None = None + """Mooncake master address (``host:port``). If None, the launcher starts a node-local master.""" + + metadata_server: str | None = None + """Mooncake metadata-server connection string. If None, the launcher hosts an HTTP metadata server on the node-local master.""" + + device_name: str = "" + """RDMA device name(s) for the store (empty = auto-detect).""" + + def to_connector_dict(self) -> dict[str, Any]: + # Sizes/addresses/tiers are realized via the MOONCAKE_CONFIG_PATH JSON and the + # mooncake_client launch; the worker reads preferred_segment from the + # MOONCAKE_PREFERRED_SEGMENT env the launcher exports. + return { + "kv_connector": "MooncakeStoreConnector", + "kv_role": "kv_both", + "kv_connector_extra_config": {}, + } + + +KVCacheOffloadConfig: TypeAlias = Annotated[ + NativeKVCacheOffloadConfig | MooncakeKVCacheOffloadConfig, Field(discriminator="type") +] # Valid vLLM max_lora_rank values (from vllm/config/lora.py) @@ -261,7 +344,10 @@ class InferenceConfig(BaseConfig): weight_broadcast: WeightBroadcastConfig = WeightBroadcastConfig() kv_cache_offload: KVCacheOffloadConfig | None = None - """CPU KV cache offload for inference workers. Standard inference uses vLLM's ``OffloadingConnector``. Disaggregated P/D deployments combine it with NIXL through ``MultiConnector`` in the SLURM launcher.""" + """KV cache offload for inference workers, as composable CPU/disk tiers. Discriminated on ``type``: ``native`` (vLLM ``OffloadingConnector``/``TieringOffloadingSpec``, self-contained) or ``mooncake`` (per-node Mooncake distributed store). Disaggregated P/D combines the chosen connector with NIXL through ``MultiConnector``.""" + + use_pd_kv_transfer: bool = False + """Auto-set for disaggregated P/D: emit the NIXL transfer connector. Persisted into the per-node config (which drops ``deployment``) so the connector is still built per worker. Not meant to be set by hand.""" enable_return_routed_experts: bool = False """Return routed experts in responses. Forwarded as ``--enable-return-routed-experts``.""" @@ -307,6 +393,7 @@ def auto_setup_kv_cache_offload(self): def auto_setup_disaggregated(self): """Auto-configure inference for disaggregated P/D: enable EP and compute DP.""" if self.deployment.type == "disaggregated": + self.use_pd_kv_transfer = True if "enable_expert_parallel" not in self.model_fields_set: self.enable_expert_parallel = True if "enable_eplb" not in self.model_fields_set: @@ -368,6 +455,35 @@ def auto_setup_api_server_count(self): self.api_server_count = 1 # LoRA requires only one API server return self + def build_kv_transfer_config(self) -> dict[str, Any] | None: + """Build the single vLLM ``kv_transfer_config`` from the transfer + offload connectors. + + Disaggregated P/D always uses NIXL for prefill→decode transfer. KV cache offload (if + configured) contributes its own connector. When both are present they are composed via + ``MultiConnector``. Returns None when neither applies. + """ + connectors: list[dict[str, Any]] = [] + if self.use_pd_kv_transfer: + connectors.append( + { + "kv_connector": "NixlConnector", + "kv_role": "kv_both", + "kv_connector_extra_config": {"num_threads": 1}, + } + ) + if self.kv_cache_offload is not None: + connectors.append(self.kv_cache_offload.to_connector_dict()) + + if not connectors: + return None + if len(connectors) == 1: + return connectors[0] + return { + "kv_connector": "MultiConnector", + "kv_role": "kv_both", + "kv_connector_extra_config": {"connectors": connectors}, + } + def to_vllm(self) -> Namespace: """Convert InferenceConfig to vLLM-compatible Namespace.""" namespace = Namespace() @@ -411,18 +527,9 @@ def to_vllm(self) -> Namespace: # Set `logprobs_mode` to `processed_logprobs` by default rsetattr(namespace, "logprobs_mode", "processed_logprobs") - if self.kv_cache_offload is not None: - rsetattr( - namespace, - "kv_transfer_config", - { - "kv_connector": "OffloadingConnector", - "kv_role": "kv_both", - "kv_connector_extra_config": { - "cpu_bytes_to_use": int(self.kv_cache_offload.cpu_bytes), - }, - }, - ) + kv_transfer_config = self.build_kv_transfer_config() + if kv_transfer_config is not None: + rsetattr(namespace, "kv_transfer_config", kv_transfer_config) # Pass prime-rl-specific flags through vLLM's additional_config dict; # workers read these via get_current_vllm_config().additional_config. diff --git a/pyproject.toml b/pyproject.toml index 9e0dd3def0..ca3639b3aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "torchdata>=0.11.0", "transformers", "vllm>=0.22.0", + "mooncake-transfer-engine>=0.3.10.post2", "wandb>=0.26.1", "ring-flash-attn>=0.1.8", "prime>=0.6.4", diff --git a/src/prime_rl/entrypoints/inference.py b/src/prime_rl/entrypoints/inference.py index a14ae26e63..563880fed3 100644 --- a/src/prime_rl/entrypoints/inference.py +++ b/src/prime_rl/entrypoints/inference.py @@ -46,6 +46,7 @@ def write_slurm_script(config: InferenceConfig, config_path: Path, script_path: port=config.server.port, disaggregated=is_disaggregated, kv_offload=config.kv_cache_offload is not None, + kv_offload_mooncake=config.kv_cache_offload is not None and config.kv_cache_offload.type == "mooncake", ) is_multi_node = config.deployment.type == "multi_node" @@ -64,7 +65,6 @@ def write_slurm_script(config: InferenceConfig, config_path: Path, script_path: use_deep_gemm=config.use_deep_gemm, prefill_env_overrides=config.deployment.prefill_env_overrides, decode_env_overrides=config.deployment.decode_env_overrides, - kv_offload_cpu_bytes=int(config.kv_cache_offload.cpu_bytes) if config.kv_cache_offload else 0, ) elif is_multi_node: template_vars.update( diff --git a/src/prime_rl/entrypoints/rl.py b/src/prime_rl/entrypoints/rl.py index 21afe03752..655cbd3ef3 100644 --- a/src/prime_rl/entrypoints/rl.py +++ b/src/prime_rl/entrypoints/rl.py @@ -371,9 +371,8 @@ def write_slurm_script(config: RLConfig, config_dir: Path, script_path: Path) -> decode_env_overrides=infer_deploy.decode_env_overrides, dp_per_node=config.deployment.gpus_per_node // config.inference.parallel.tp, kv_offload=config.inference.kv_cache_offload is not None, - kv_offload_cpu_bytes=int(config.inference.kv_cache_offload.cpu_bytes) - if config.inference.kv_cache_offload - else 0, + kv_offload_mooncake=config.inference.kv_cache_offload is not None + and config.inference.kv_cache_offload.type == "mooncake", use_nccl_broadcast=config.weight_broadcast is not None and config.weight_broadcast.type == "nccl", ranks_filter=",".join(map(str, config.trainer.log.ranks_filter)), ) @@ -396,6 +395,9 @@ def write_slurm_script(config: RLConfig, config_dir: Path, script_path: Path) -> inference_data_parallel_rpc_port=config.inference.data_parallel_rpc_port if config.inference else 29600, dp_per_node=(config.deployment.gpus_per_node // config.inference.parallel.tp) if config.inference else 1, kv_offload=config.inference is not None and config.inference.kv_cache_offload is not None, + kv_offload_mooncake=config.inference is not None + and config.inference.kv_cache_offload is not None + and config.inference.kv_cache_offload.type == "mooncake", use_nccl_broadcast=config.weight_broadcast is not None and config.weight_broadcast.type == "nccl", ranks_filter=",".join(map(str, config.trainer.log.ranks_filter)), ) diff --git a/src/prime_rl/inference/mooncake.py b/src/prime_rl/inference/mooncake.py new file mode 100644 index 0000000000..a9e9436d1b --- /dev/null +++ b/src/prime_rl/inference/mooncake.py @@ -0,0 +1,255 @@ +"""Node-local Mooncake store launcher for KV cache offload (standalone-store topology). + +Each node runs its own ``mooncake_master`` + one ``mooncake_client``; the node's vLLM +workers connect to that local store as clients (contributing no segment of their own). The +client owns the pool: a DRAM segment sized by the cpu tier, plus a file-backed disk tier +when a disk tier is configured. See ``MooncakeKVCacheOffloadConfig``. + +This module both (a) is imported by the local launch path in ``inference/vllm/server.py`` +and (b) exposes a small CLI used by the SLURM templates to bring the store up per node. +""" + +from __future__ import annotations + +import json +import os +import shutil +import socket +import subprocess +import sys +import time +from dataclasses import dataclass +from pathlib import Path + +from prime_rl.configs.inference import MooncakeKVCacheOffloadConfig + +DEFAULT_MASTER_RPC_PORT = 50051 +DEFAULT_METADATA_HTTP_PORT = 8080 +DEFAULT_CLIENT_PORT = 50052 +# Per-worker transfer staging buffer in the store client (mirrors Mooncake's default). +DEFAULT_LOCAL_BUFFER_SIZE = 4 * 1024**3 +# Mooncake transfer protocol (RDMA only). +MOONCAKE_PROTOCOL = "rdma" + + +def _bin(name: str) -> str: + """Resolve a Mooncake console-script, preferring the one in this interpreter's venv.""" + candidate = Path(sys.executable).parent / name + if candidate.exists(): + return str(candidate) + return shutil.which(name) or name + + +def _local_ip() -> str: + # Match the address the vLLM worker registers with the store (it uses vllm's get_ip()). + from vllm.utils.network_utils import get_ip + + return get_ip() + + +def _wait_for_port(host: str, port: int, timeout: float = 30.0) -> None: + """Block until ``host:port`` accepts a TCP connection, or raise on timeout.""" + deadline = time.monotonic() + timeout + last_err: OSError | None = None + while time.monotonic() < deadline: + try: + with socket.create_connection((host, port), timeout=1.0): + return + except OSError as e: + last_err = e + time.sleep(0.2) + raise TimeoutError(f"Mooncake endpoint {host}:{port} did not come up within {timeout}s: {last_err}") + + +@dataclass +class MooncakeStore: + """Handles for a running node-local Mooncake store and the env it needs. + + With one client per node there is exactly one segment in the store, so the master + routes all puts/gets to it — no ``preferred_segment`` pinning is needed (that hint only + matters when several client segments coexist in one pool). + """ + + master: subprocess.Popen + client: subprocess.Popen + config_path: Path + env: dict[str, str] + + def apply_env(self) -> None: + """Export the env vars the vLLM worker process needs (config path, segment, hash seed).""" + os.environ.update(self.env) + + def shutdown(self) -> None: + for proc in (self.client, self.master): + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + + +def _resolve_addresses(cfg: MooncakeKVCacheOffloadConfig) -> tuple[str, str, str, int]: + """Return (master_server_address, metadata_server, client_host, client_port). + + Defaults launch a node-local master that also hosts the HTTP metadata server. + """ + ip = _local_ip() + master_addr = cfg.master_server_address or f"{ip}:{DEFAULT_MASTER_RPC_PORT}" + metadata = cfg.metadata_server or f"http://{ip}:{DEFAULT_METADATA_HTTP_PORT}/metadata" + return master_addr, metadata, ip, DEFAULT_CLIENT_PORT + + +def write_config_file(cfg: MooncakeKVCacheOffloadConfig, master_addr: str, metadata: str, path: Path) -> None: + """Write the ``MOONCAKE_CONFIG_PATH`` JSON read per-worker by ``MooncakeStoreConfig.from_file``. + + Standalone-store: workers contribute no segment (``global_segment_size == 0``); the + node-local client owns the pool. ``enable_offload`` is on when a disk tier exists. + """ + config = { + "mode": "standalone-store", + "global_segment_size": 0, + "local_buffer_size": DEFAULT_LOCAL_BUFFER_SIZE, + "protocol": MOONCAKE_PROTOCOL, + "device_name": cfg.device_name, + "master_server_address": master_addr, + "metadata_server": metadata, + "enable_offload": cfg.disk is not None, + } + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(config, indent=2)) + + +def _offload_env(cfg: MooncakeKVCacheOffloadConfig) -> dict[str, str]: + """Offload (disk-tier) env shared by the master and client when a disk tier is set. + + The master serves the fsdir to the client (the client does ``get_fsdir`` from it), and + the client writes the offloaded block files there — so both processes need the path. + """ + if cfg.disk is None: + return {} + cfg.disk.path.mkdir(parents=True, exist_ok=True) + env = { + "MOONCAKE_OFFLOAD_FSDIR": str(cfg.disk.path), + "MOONCAKE_OFFLOAD_FILE_STORAGE_PATH": str(cfg.disk.path), + "MOONCAKE_OFFLOAD_ENABLE_EVICTION": "true", + } + if cfg.disk.num_bytes is not None: + env["MOONCAKE_OFFLOAD_TOTAL_SIZE_LIMIT_BYTES"] = str(int(cfg.disk.num_bytes)) + return env + + +def _launch_master(cfg: MooncakeKVCacheOffloadConfig, ip: str, log_dir: Path, host_metadata: bool) -> subprocess.Popen: + args = [_bin("mooncake_master"), f"-rpc_port={DEFAULT_MASTER_RPC_PORT}"] + if host_metadata: + args += [ + "-enable_http_metadata_server", + f"-http_metadata_server_host={ip}", + f"-http_metadata_server_port={DEFAULT_METADATA_HTTP_PORT}", + ] + if cfg.disk is not None: + # The master owns the file-storage backend config the client mounts against. + args += ["-enable_offload=true", f"-root_fs_dir={cfg.disk.path}"] + log = (log_dir / "mooncake_master.log").open("w") + return subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT, env={**os.environ, **_offload_env(cfg)}) + + +def _launch_client( + cfg: MooncakeKVCacheOffloadConfig, ip: str, master_addr: str, metadata: str, log_dir: Path +) -> subprocess.Popen: + assert cfg.cpu is not None # enforced by config validation + args = [ + _bin("mooncake_client"), + f"-host={ip}", + f"-port={DEFAULT_CLIENT_PORT}", + f"-master_server_address={master_addr}", + f"-metadata_server={metadata}", + f"-protocol={MOONCAKE_PROTOCOL}", + f"-global_segment_size={int(cfg.cpu.num_bytes)}", + ] + if cfg.device_name: + args.append(f"-device_names={cfg.device_name}") + if cfg.disk is not None: + args.append("-enable_offload=true") + log = (log_dir / "mooncake_client.log").open("w") + return subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT, env={**os.environ, **_offload_env(cfg)}) + + +def start_mooncake_store(cfg: MooncakeKVCacheOffloadConfig, output_dir: Path) -> MooncakeStore: + """Launch the node-local master + client, write the config JSON, and return handles + env. + + The caller must export the returned env (``apply_env``) before the vLLM engine process + starts (``PYTHONHASHSEED`` must precede the interpreter that hashes KV blocks). + """ + log_dir = output_dir + log_dir.mkdir(parents=True, exist_ok=True) + + master_addr, metadata, client_ip, client_port = _resolve_addresses(cfg) + host_metadata = cfg.metadata_server is None # we host it on the master unless overridden + + master = None + if cfg.master_server_address is None: + master = _launch_master(cfg, client_ip, log_dir, host_metadata=host_metadata) + master_host, master_port = master_addr.split(":") + _wait_for_port(master_host, int(master_port)) + if host_metadata: + _wait_for_port(client_ip, DEFAULT_METADATA_HTTP_PORT) + + client = _launch_client(cfg, client_ip, master_addr, metadata, log_dir) + _wait_for_port(client_ip, client_port) + + config_path = output_dir / "mooncake_config.json" + write_config_file(cfg, master_addr, metadata, config_path) + + env = { + "MOONCAKE_CONFIG_PATH": str(config_path), + # Reproducible KV block hashes across the store client and vLLM worker processes. + "PYTHONHASHSEED": "0", + } + # If no separate master was launched (external master), there is no master Popen; use the + # client handle as a stand-in so shutdown() stays simple. + return MooncakeStore( + master=master if master is not None else client, + client=client, + config_path=config_path, + env=env, + ) + + +def _main() -> None: + """CLI used by the SLURM templates: launch the node-local store and print env exports. + + Usage: ``python -m prime_rl.inference.mooncake --config --output-dir ``. + Launches the node-local master + client (which outlive this process) and prints shell + ``export`` lines for the template to ``eval`` before starting vLLM on that node. + """ + import argparse + import tomllib + + from pydantic import TypeAdapter + + from prime_rl.configs.inference import KVCacheOffloadConfig + + parser = argparse.ArgumentParser(description="Launch a node-local Mooncake store.") + parser.add_argument("--config", type=Path, required=True, help="Path to the inference TOML.") + parser.add_argument("--output-dir", type=Path, required=True, help="Directory for the store config + logs.") + args = parser.parse_args() + + raw = tomllib.loads(args.config.read_text()) + offload_raw = raw.get("kv_cache_offload") + if offload_raw is None: + raise ValueError("inference config has no kv_cache_offload section") + offload = TypeAdapter(KVCacheOffloadConfig).validate_python(offload_raw) + if not isinstance(offload, MooncakeKVCacheOffloadConfig): + raise ValueError("mooncake CLI requires inference.kv_cache_offload.type == 'mooncake'") + + store = start_mooncake_store(offload, args.output_dir) + # Write the env as a sourceable file so the SLURM template can `source` it without + # capturing stdout (which may carry unrelated import logging). + env_sh = args.output_dir / "env.sh" + env_sh.write_text("".join(f"export {key}={value}\n" for key, value in store.env.items())) + print(f"wrote {env_sh}") + + +if __name__ == "__main__": + _main() diff --git a/src/prime_rl/inference/vllm/server.py b/src/prime_rl/inference/vllm/server.py index f411695c78..c146d21fef 100644 --- a/src/prime_rl/inference/vllm/server.py +++ b/src/prime_rl/inference/vllm/server.py @@ -192,6 +192,19 @@ def server(config: InferenceConfig, vllm_extra: dict[str, Any] | None = None): if config.lora_target_modules and not any("expert" in m for m in config.lora_target_modules): os.environ["PRIME_NO_MOE_LORA"] = "1" + # Launch the node-local Mooncake store for local runs. The SLURM launcher does this per + # node and exports MOONCAKE_CONFIG_PATH itself, so skip when it is already set. The store + # env (incl. PYTHONHASHSEED) must be applied before the engine/worker processes spawn. + from prime_rl.configs.inference import MooncakeKVCacheOffloadConfig + + mooncake_store = None + if isinstance(config.kv_cache_offload, MooncakeKVCacheOffloadConfig) and "MOONCAKE_CONFIG_PATH" not in os.environ: + from prime_rl.inference.mooncake import start_mooncake_store + + mooncake_store = start_mooncake_store(config.kv_cache_offload, config.output_dir / "mooncake") + mooncake_store.apply_env() + logger.info(f"Started node-local Mooncake store (config: {mooncake_store.config_path})") + namespace = config.to_vllm() if vllm_extra: for key, value in vllm_extra.items(): @@ -206,11 +219,15 @@ def server(config: InferenceConfig, vllm_extra: dict[str, Any] | None = None): # Set the worker extension class based on the broadcast backend args.worker_extension_cls = WORKER_EXTENSION_CLS[config.weight_broadcast.type] - if args.headless or args.api_server_count < 1: - run_headless(args) - else: - if args.api_server_count > 1: - run_multi_api_server(args) + try: + if args.headless or args.api_server_count < 1: + run_headless(args) else: - # Single API server (this process). - uvloop.run(run_server(args)) + if args.api_server_count > 1: + run_multi_api_server(args) + else: + # Single API server (this process). + uvloop.run(run_server(args)) + finally: + if mooncake_store is not None: + mooncake_store.shutdown() diff --git a/src/prime_rl/templates/inference.sbatch.j2 b/src/prime_rl/templates/inference.sbatch.j2 index 5ba6a37631..91eadc40c7 100755 --- a/src/prime_rl/templates/inference.sbatch.j2 +++ b/src/prime_rl/templates/inference.sbatch.j2 @@ -130,9 +130,13 @@ srun bash -s <<'CLEANUP_SH' # which pkill -f does not see because -f only matches the cmdline. pkill -9 "vllm" 2>/dev/null pkill -9 "vllm::.*" 2>/dev/null + # Mooncake store (master + client) left over from a prior offload run. -x matches + # the exact process name, so it never hits the worktree path (e.g. mooncake-baby). + pkill -9 -x mooncake_master 2>/dev/null + pkill -9 -x mooncake_client 2>/dev/null sleep 2 rm -rf /dev/shm/vllm-* /dev/shm/vllm_* /tmp/vllm-* /tmp/vllm_* /tmp/torch-* /tmp/torchelastic_* 2>/dev/null - procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::" | grep -v grep | wc -l) + procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::|mooncake_" | grep -v grep | wc -l) gpu=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits | awk "{s+=\$1} END {print s}") echo "[node-cleanup] $(hostname) procs=$procs gpu_mem=${gpu}MiB" CLEANUP_SH @@ -161,6 +165,12 @@ srun --kill-on-bad-exit=1 bash -c ' {%- if kv_offload %} ulimit -l unlimited {%- endif %} +{%- if kv_offload_mooncake %} + # Launch the node-local Mooncake store master + client, then source the env vars + # MOONCAKE_CONFIG_PATH and PYTHONHASHSEED so the vLLM process picks them up. + uv run python -m prime_rl.inference.mooncake --config $CONFIG_PATH --output-dir $OUTPUT_DIR/mooncake/node_${INFER_NODE_RANK} + source $OUTPUT_DIR/mooncake/node_${INFER_NODE_RANK}/env.sh +{%- endif %} {% if disaggregated -%} # NVSHMEM libs for DeepEP — ensure the rpath target exists on all nodes @@ -187,17 +197,8 @@ srun --kill-on-bad-exit=1 bash -c ' export VLLM_NIXL_SIDE_CHANNEL_HOST=$LOCAL_IP export VLLM_NIXL_SIDE_CHANNEL_PORT=5600 -{%- if kv_offload %} - PREFILL_KV_CFG='"'"'{"kv_connector":"MultiConnector","kv_role":"kv_both","kv_connector_extra_config":{"connectors":[{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}},{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"cpu_bytes_to_use":{{ kv_offload_cpu_bytes }}}}]}}'"'"' -{%- else %} - PREFILL_KV_CFG='"'"'{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}}'"'"' -{%- endif %} - -{%- if kv_offload %} - DECODE_KV_CFG='"'"'{"kv_connector":"MultiConnector","kv_role":"kv_both","kv_connector_extra_config":{"connectors":[{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}},{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"cpu_bytes_to_use":{{ kv_offload_cpu_bytes }}}}]}}'"'"' -{%- else %} - DECODE_KV_CFG='"'"'{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}}'"'"' -{%- endif %} + # The kv_transfer_config (NIXL transfer + any KV offload connector) is built in + # InferenceConfig.to_vllm() from the per-node config, so it is no longer assembled here. DECODE_COMPILE_CFG='"'"'{"cudagraph_mode":"FULL_DECODE_ONLY"}'"'"' @@ -222,9 +223,9 @@ srun --kill-on-bad-exit=1 bash -c ' ROLE_EXTRA="\"all2all_backend\": \"deepep_high_throughput\"" if [ "$ROLE_RANK" -eq 0 ]; then - VLLM_EXTRA="{\"data_parallel_address\": \"$LOCAL_IP\", \"data_parallel_hybrid_lb\": true, \"kv_transfer_config\": $PREFILL_KV_CFG, $ROLE_EXTRA}" + VLLM_EXTRA="{\"data_parallel_address\": \"$LOCAL_IP\", \"data_parallel_hybrid_lb\": true, $ROLE_EXTRA}" else - VLLM_EXTRA="{\"data_parallel_address\": \"$HEAD_HOST\", \"data_parallel_start_rank\": $START_RANK, \"data_parallel_hybrid_lb\": true, \"kv_transfer_config\": $PREFILL_KV_CFG, $ROLE_EXTRA}" + VLLM_EXTRA="{\"data_parallel_address\": \"$HEAD_HOST\", \"data_parallel_start_rank\": $START_RANK, \"data_parallel_hybrid_lb\": true, $ROLE_EXTRA}" fi else # ── Decode node ── @@ -244,9 +245,9 @@ srun --kill-on-bad-exit=1 bash -c ' ROLE_EXTRA="\"all2all_backend\": \"deepep_low_latency\", \"compilation_config\": $DECODE_COMPILE_CFG" if [ "$ROLE_RANK" -eq 0 ]; then - VLLM_EXTRA="{\"data_parallel_address\": \"$LOCAL_IP\", \"data_parallel_hybrid_lb\": true, \"kv_transfer_config\": $DECODE_KV_CFG, $ROLE_EXTRA}" + VLLM_EXTRA="{\"data_parallel_address\": \"$LOCAL_IP\", \"data_parallel_hybrid_lb\": true, $ROLE_EXTRA}" else - VLLM_EXTRA="{\"data_parallel_address\": \"$HEAD_HOST\", \"data_parallel_start_rank\": $START_RANK, \"data_parallel_hybrid_lb\": true, \"kv_transfer_config\": $DECODE_KV_CFG, $ROLE_EXTRA}" + VLLM_EXTRA="{\"data_parallel_address\": \"$HEAD_HOST\", \"data_parallel_start_rank\": $START_RANK, \"data_parallel_hybrid_lb\": true, $ROLE_EXTRA}" fi fi diff --git a/src/prime_rl/templates/multi_node_rl.sbatch.j2 b/src/prime_rl/templates/multi_node_rl.sbatch.j2 index 4b923060a7..5bd2340b4b 100755 --- a/src/prime_rl/templates/multi_node_rl.sbatch.j2 +++ b/src/prime_rl/templates/multi_node_rl.sbatch.j2 @@ -172,9 +172,13 @@ srun bash -s <<'CLEANUP_SH' # which pkill -f does not see because -f only matches the cmdline. pkill -9 "vllm" 2>/dev/null pkill -9 "vllm::.*" 2>/dev/null + # Mooncake store (master + client) left over from a prior offload run. -x matches + # the exact process name, so it never hits the worktree path (e.g. mooncake-baby). + pkill -9 -x mooncake_master 2>/dev/null + pkill -9 -x mooncake_client 2>/dev/null sleep 2 rm -rf /dev/shm/vllm-* /dev/shm/vllm_* /tmp/vllm-* /tmp/vllm_* /tmp/torch-* /tmp/torchelastic_* 2>/dev/null - procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::" | grep -v grep | wc -l) + procs=$(ps -eo comm,args | grep -E "python|torchrun|vllm|vllm::|mooncake_" | grep -v grep | wc -l) gpu=$(nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits | awk "{s+=\$1} END {print s}") echo "[node-cleanup] $(hostname) procs=$procs gpu_mem=${gpu}MiB" CLEANUP_SH @@ -214,6 +218,12 @@ if [ "$SLURM_PROCID" -lt "$NUM_INFER_NODES" ]; then {%- if kv_offload %} ulimit -l unlimited {%- endif %} +{%- if kv_offload_mooncake %} + # Launch the node-local Mooncake store master + client, then source the env vars + # MOONCAKE_CONFIG_PATH and PYTHONHASHSEED so the vLLM process picks them up. + uv run python -m prime_rl.inference.mooncake --config $CONFIG_DIR/inference.toml --output-dir $OUTPUT_DIR/mooncake/node_${INFER_NODE_RANK} + source $OUTPUT_DIR/mooncake/node_${INFER_NODE_RANK}/env.sh +{%- endif %} REPLICA_IDX=$((INFER_NODE_RANK / NODES_PER_INFER_REPLICA)) RANK_IN_REPLICA=$((INFER_NODE_RANK % NODES_PER_INFER_REPLICA)) @@ -237,17 +247,9 @@ if [ "$SLURM_PROCID" -lt "$NUM_INFER_NODES" ]; then export VLLM_NIXL_SIDE_CHANNEL_HOST=$LOCAL_IP export VLLM_NIXL_SIDE_CHANNEL_PORT=5600 -{%- if kv_offload %} - PREFILL_KV_CFG='"'"'{"kv_connector":"MultiConnector","kv_role":"kv_both","kv_connector_extra_config":{"connectors":[{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}},{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"cpu_bytes_to_use":{{ kv_offload_cpu_bytes }}}}]}}'"'"' -{%- else %} - PREFILL_KV_CFG='"'"'{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}}'"'"' -{%- endif %} + # The kv_transfer_config (NIXL transfer + any KV offload connector) is built in + # InferenceConfig.to_vllm() from the per-node config, so it is no longer assembled here. -{%- if kv_offload %} - DECODE_KV_CFG='"'"'{"kv_connector":"MultiConnector","kv_role":"kv_both","kv_connector_extra_config":{"connectors":[{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}},{"kv_connector":"OffloadingConnector","kv_role":"kv_both","kv_connector_extra_config":{"cpu_bytes_to_use":{{ kv_offload_cpu_bytes }}}}]}}'"'"' -{%- else %} - DECODE_KV_CFG='"'"'{"kv_connector":"NixlConnector","kv_role":"kv_both","kv_connector_extra_config":{"num_threads":1}}'"'"' -{%- endif %} DECODE_COMPILE_CFG='"'"'{"cudagraph_mode":"FULL_DECODE_ONLY"}'"'"' REPLICA_BASE=$((REPLICA_IDX * NODES_PER_INFER_REPLICA)) @@ -284,23 +286,13 @@ if [ "$SLURM_PROCID" -lt "$NUM_INFER_NODES" ]; then | tee -a $OUTPUT_DIR/logs/inference/node_${INFER_NODE_RANK}.log if [ "$DP" -gt 1 ]; then - if [ "$ROLE" = "prefill" ]; then - KV_CFG_TO_USE="$PREFILL_KV_CFG" - else - KV_CFG_TO_USE="$DECODE_KV_CFG" - fi - if [ "$ROLE_RANK" -eq 0 ]; then - VLLM_EXTRA="{\"data_parallel_address\": \"$LOCAL_IP\", \"data_parallel_hybrid_lb\": true, \"kv_transfer_config\": $KV_CFG_TO_USE, $ROLE_EXTRA}" + VLLM_EXTRA="{\"data_parallel_address\": \"$LOCAL_IP\", \"data_parallel_hybrid_lb\": true, $ROLE_EXTRA}" else - VLLM_EXTRA="{\"data_parallel_address\": \"$HEAD_HOST\", \"data_parallel_start_rank\": $START_RANK, \"data_parallel_hybrid_lb\": true, \"kv_transfer_config\": $KV_CFG_TO_USE, $ROLE_EXTRA}" + VLLM_EXTRA="{\"data_parallel_address\": \"$HEAD_HOST\", \"data_parallel_start_rank\": $START_RANK, \"data_parallel_hybrid_lb\": true, $ROLE_EXTRA}" fi else - if [ "$ROLE" = "prefill" ]; then - VLLM_EXTRA="{\"kv_transfer_config\": $PREFILL_KV_CFG, $ROLE_EXTRA}" - else - VLLM_EXTRA="{\"kv_transfer_config\": $DECODE_KV_CFG, $ROLE_EXTRA}" - fi + VLLM_EXTRA="{$ROLE_EXTRA}" fi # Start vllm-router on the first node of each replica (PD mode) diff --git a/uv.lock b/uv.lock index 6e6375c968..a3e998ef50 100644 --- a/uv.lock +++ b/uv.lock @@ -11,7 +11,7 @@ supported-markers = [ ] [options] -exclude-newer = "2026-05-25T22:45:49.773564147Z" +exclude-newer = "2026-05-25T23:50:03.973690224Z" exclude-newer-span = "P7D" [options.exclude-newer-package] @@ -2481,6 +2481,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7c/b5/12f940a41940fd83b9e50ce46124695b68a80feab02612779f8fb584db09/modelexpress-0.3.0-py3-none-any.whl", hash = "sha256:6f93d8de74903f6c11ebf9b4621fa02e3c493a5e05207c38fb2c98395a774618", size = 44333, upload-time = "2026-04-17T20:17:29.478Z" }, ] +[[package]] +name = "mooncake-transfer-engine" +version = "0.3.11.post1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, + { name = "requests", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, +] +wheels = [ + { url = "https://files.pythonhosted.org/packages/82/6c/92d517ba157a02b2612b06a00c7e3f366d1050e41cc50e1bdefa382a7dc0/mooncake_transfer_engine-0.3.11.post1-cp312-cp312-manylinux_2_35_x86_64.whl", hash = "sha256:5ea50146d0d65f0274a406db49a570d187ad04760e054a49627bf84158038ac0", size = 42661963, upload-time = "2026-05-24T12:08:40.453Z" }, + { url = "https://files.pythonhosted.org/packages/3a/a4/a187adcd485ff27bdbdb5c2b4d9cf210427bc74bcaacfc8226409db17535/mooncake_transfer_engine-0.3.11.post1-cp312-cp312-manylinux_2_39_aarch64.whl", hash = "sha256:1ccad9f44cf1a67f4e0494bd02f505503139ab606ecbe76cd6050d7a069247d5", size = 18089789, upload-time = "2026-05-24T16:19:01.828Z" }, +] + [[package]] name = "mpmath" version = "1.3.0" @@ -3457,6 +3470,7 @@ dependencies = [ { name = "jaxtyping", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "liger-kernel", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "loguru", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, + { name = "mooncake-transfer-engine", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "numpy", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "nvidia-ml-py", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, { name = "openai", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux') or (platform_machine == 'x86_64' and sys_platform == 'linux')" }, @@ -3601,6 +3615,7 @@ requires-dist = [ { name = "mini-swe-agent-plus-rlm", marker = "extra == 'envs'", editable = "deps/research-environments/environments/mini_swe_agent_plus_rlm" }, { name = "mmlu-pro", marker = "extra == 'envs'", editable = "deps/research-environments/environments/mmlu_pro" }, { name = "modelexpress", marker = "extra == 'modelexpress'", specifier = "==0.3.0" }, + { name = "mooncake-transfer-engine", specifier = ">=0.3.10.post2" }, { name = "nixl", marker = "extra == 'disagg'" }, { name = "nixl-cu12", marker = "platform_machine == 'x86_64' and extra == 'disagg'", url = "https://github.com/PrimeIntellect-ai/prime-rl/releases/download/v0.5.0/nixl_cu12-0.10.1-cp312-cp312-linux_x86_64.whl" }, { name = "numpy", specifier = ">=2.2.6" }, From c13f9e0aca3e10c7fdb338dba8ed204b4629b8a0 Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Tue, 2 Jun 2026 06:31:03 +0530 Subject: [PATCH 2/5] chore(inference): drop unused mooncake disk num_bytes and metadata_server - DiskOffloadTier.num_bytes: removed. Neither backend enforces a disk byte quota (capacity is bounded by the filesystem at disk.path), so it was a no-op knob. - MooncakeKVCacheOffloadConfig.metadata_server: removed. The launcher always hosts the HTTP metadata server on the node-local master and auto-derives the URL from the master host. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/prime_rl/configs/inference.py | 20 +--------- src/prime_rl/inference/mooncake.py | 37 +++++++++---------- 2 files changed, 18 insertions(+), 39 deletions(-) diff --git a/packages/prime-rl-configs/src/prime_rl/configs/inference.py b/packages/prime-rl-configs/src/prime_rl/configs/inference.py index 84aa764235..9f88e2185b 100644 --- a/packages/prime-rl-configs/src/prime_rl/configs/inference.py +++ b/packages/prime-rl-configs/src/prime_rl/configs/inference.py @@ -1,4 +1,3 @@ -import warnings from argparse import Namespace from pathlib import Path from typing import Annotated, Any, Literal, TypeAlias @@ -94,10 +93,7 @@ class CPUOffloadTier(BaseConfig): class DiskOffloadTier(BaseConfig): path: Path - """Filesystem root for the disk tier. For ``native`` this is the ``fs_python`` secondary tier's ``root_dir``; for ``mooncake`` it is the store client's ``MOONCAKE_OFFLOAD_FILE_STORAGE_PATH``.""" - - num_bytes: int | None = Field(None, gt=0) - """Advisory per-node disk pool size. Neither backend enforces a hard disk quota — the native ``fs_python`` tier and Mooncake's file offload are both bounded by the filesystem at ``path``.""" + """Filesystem root for the disk tier. For ``native`` this is the ``fs_python`` secondary tier's ``root_dir``; for ``mooncake`` it is the store client's ``MOONCAKE_OFFLOAD_FILE_STORAGE_PATH``. Capacity is bounded by the filesystem at ``path`` (neither backend enforces a byte quota).""" class BaseKVCacheOffloadConfig(BaseConfig): @@ -121,17 +117,6 @@ class NativeKVCacheOffloadConfig(BaseKVCacheOffloadConfig): type: Literal["native"] = "native" """vLLM-native offloading. cpu-only uses ``OffloadingConnector`` + ``CPUOffloadingSpec``; cpu+disk uses ``TieringOffloadingSpec`` (CPU primary tier + ``fs_python`` disk secondary). Fully self-contained — no external processes.""" - @model_validator(mode="after") - def warn_ignored_disk_capacity(self): - if self.disk is not None and self.disk.num_bytes is not None: - warnings.warn( - "inference.kv_cache_offload.disk.num_bytes is ignored by the native backend: the " - "fs_python disk tier has no quota and is bounded by the filesystem at disk.path. " - "Set num_bytes only with the mooncake backend.", - stacklevel=1, - ) - return self - def to_connector_dict(self) -> dict[str, Any]: assert self.cpu is not None extra: dict[str, Any] = {"cpu_bytes_to_use": int(self.cpu.num_bytes)} @@ -152,9 +137,6 @@ class MooncakeKVCacheOffloadConfig(BaseKVCacheOffloadConfig): master_server_address: str | None = None """Mooncake master address (``host:port``). If None, the launcher starts a node-local master.""" - metadata_server: str | None = None - """Mooncake metadata-server connection string. If None, the launcher hosts an HTTP metadata server on the node-local master.""" - device_name: str = "" """RDMA device name(s) for the store (empty = auto-detect).""" diff --git a/src/prime_rl/inference/mooncake.py b/src/prime_rl/inference/mooncake.py index a9e9436d1b..ab8251752f 100644 --- a/src/prime_rl/inference/mooncake.py +++ b/src/prime_rl/inference/mooncake.py @@ -92,11 +92,13 @@ def shutdown(self) -> None: def _resolve_addresses(cfg: MooncakeKVCacheOffloadConfig) -> tuple[str, str, str, int]: """Return (master_server_address, metadata_server, client_host, client_port). - Defaults launch a node-local master that also hosts the HTTP metadata server. + The metadata server is always the HTTP server hosted by the master (auto-detected to the + master host); the node-local master hosts it unless an external master is configured. """ ip = _local_ip() master_addr = cfg.master_server_address or f"{ip}:{DEFAULT_MASTER_RPC_PORT}" - metadata = cfg.metadata_server or f"http://{ip}:{DEFAULT_METADATA_HTTP_PORT}/metadata" + master_host = master_addr.split(":")[0] + metadata = f"http://{master_host}:{DEFAULT_METADATA_HTTP_PORT}/metadata" return master_addr, metadata, ip, DEFAULT_CLIENT_PORT @@ -129,24 +131,21 @@ def _offload_env(cfg: MooncakeKVCacheOffloadConfig) -> dict[str, str]: if cfg.disk is None: return {} cfg.disk.path.mkdir(parents=True, exist_ok=True) - env = { + return { "MOONCAKE_OFFLOAD_FSDIR": str(cfg.disk.path), "MOONCAKE_OFFLOAD_FILE_STORAGE_PATH": str(cfg.disk.path), "MOONCAKE_OFFLOAD_ENABLE_EVICTION": "true", } - if cfg.disk.num_bytes is not None: - env["MOONCAKE_OFFLOAD_TOTAL_SIZE_LIMIT_BYTES"] = str(int(cfg.disk.num_bytes)) - return env - - -def _launch_master(cfg: MooncakeKVCacheOffloadConfig, ip: str, log_dir: Path, host_metadata: bool) -> subprocess.Popen: - args = [_bin("mooncake_master"), f"-rpc_port={DEFAULT_MASTER_RPC_PORT}"] - if host_metadata: - args += [ - "-enable_http_metadata_server", - f"-http_metadata_server_host={ip}", - f"-http_metadata_server_port={DEFAULT_METADATA_HTTP_PORT}", - ] + + +def _launch_master(cfg: MooncakeKVCacheOffloadConfig, ip: str, log_dir: Path) -> subprocess.Popen: + args = [ + _bin("mooncake_master"), + f"-rpc_port={DEFAULT_MASTER_RPC_PORT}", + "-enable_http_metadata_server", + f"-http_metadata_server_host={ip}", + f"-http_metadata_server_port={DEFAULT_METADATA_HTTP_PORT}", + ] if cfg.disk is not None: # The master owns the file-storage backend config the client mounts against. args += ["-enable_offload=true", f"-root_fs_dir={cfg.disk.path}"] @@ -185,15 +184,13 @@ def start_mooncake_store(cfg: MooncakeKVCacheOffloadConfig, output_dir: Path) -> log_dir.mkdir(parents=True, exist_ok=True) master_addr, metadata, client_ip, client_port = _resolve_addresses(cfg) - host_metadata = cfg.metadata_server is None # we host it on the master unless overridden master = None if cfg.master_server_address is None: - master = _launch_master(cfg, client_ip, log_dir, host_metadata=host_metadata) + master = _launch_master(cfg, client_ip, log_dir) master_host, master_port = master_addr.split(":") _wait_for_port(master_host, int(master_port)) - if host_metadata: - _wait_for_port(client_ip, DEFAULT_METADATA_HTTP_PORT) + _wait_for_port(client_ip, DEFAULT_METADATA_HTTP_PORT) client = _launch_client(cfg, client_ip, master_addr, metadata, log_dir) _wait_for_port(client_ip, client_port) From cb7b4d4950fb9d5206e08a01ed68361397ba18b2 Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Tue, 2 Jun 2026 06:56:41 +0530 Subject: [PATCH 3/5] refactor(inference): launch Mooncake store in sbatch bash; add CHANGELOG MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR review: replace the Python launcher CLI (+ env.sh round-trip) with an explicit bash launch in the SLURM templates, and document the breaking config change. - New `templates/_mooncake_store.sh.j2` partial (shared via `{% include %}` by `inference.sbatch.j2` and `multi_node_rl.sbatch.j2`): launches the node-local `mooncake_master` + `mooncake_client`, writes the config JSON, and exports `PYTHONHASHSEED` / `MOONCAKE_CONFIG_PATH` inline — visible in the rendered script. Removes the hacky orphan-process CLI, the `print`, and the unquoted-`env.sh` round-trip. - Delete `src/prime_rl/inference/mooncake.py` and the `server.py` store-launch hook. - Mooncake offload is now SLURM-only: `inference_local` errors (when no store env is present) and an `RLConfig` validator rejects local RL + mooncake. Native offload is unchanged and still works locally. Entrypoints pass `kv_offload_{cpu_bytes,disk_path, device_name}` template vars. - CHANGELOG.md: document the `inference.kv_cache_offload.cpu_bytes` -> discriminated `type` breaking change (per .cursor/BUGBOT.md). Verified single-node SLURM (Qwen3-0.6B): store launches via the template bash, MooncakeStoreConnector (standalone-store) loads, and the server serves coherent output. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 1 + .../src/prime_rl/configs/rl.py | 14 + src/prime_rl/entrypoints/inference.py | 25 +- src/prime_rl/entrypoints/rl.py | 19 +- src/prime_rl/inference/mooncake.py | 252 ------------------ src/prime_rl/inference/vllm/server.py | 31 +-- src/prime_rl/templates/_mooncake_store.sh.j2 | 30 +++ src/prime_rl/templates/inference.sbatch.j2 | 5 +- .../templates/multi_node_rl.sbatch.j2 | 5 +- 9 files changed, 89 insertions(+), 293 deletions(-) delete mode 100644 src/prime_rl/inference/mooncake.py create mode 100644 src/prime_rl/templates/_mooncake_store.sh.j2 diff --git a/CHANGELOG.md b/CHANGELOG.md index b3167b9f0d..9da33aa490 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Documenting **breaking** configuration changes — renamed, removed, or moved fields that require users to update existing configs. +- **`inference.kv_cache_offload.cpu_bytes` removed → discriminated `type` config**: The flat `[inference.kv_cache_offload]` block with a single `cpu_bytes` field is replaced by a backend-discriminated union with composable `cpu`/`disk` tiers. Migrate native CPU offload from `[inference.kv_cache_offload]\ncpu_bytes = N` to `[inference.kv_cache_offload]\ntype = "native"` plus `[inference.kv_cache_offload.cpu]\nnum_bytes = N`. A `type = "mooncake"` backend (per-node distributed store; multi-node/SLURM only) and an optional `[inference.kv_cache_offload.disk]\npath = "..."` tier (layered behind cpu) are also available. `extra="forbid"` rejects the old `cpu_bytes` key, so existing configs must migrate. (2026-06-02) - **Orchestrator async-pipeline rewrite** (collection of removals/renames). The orchestrator was rewritten to overlap train/eval rollouts on a shared concurrency limiter; several config fields were removed or renamed. - **`orchestrator.seed` removed**: was only consumed by the deleted buffer; no replacement. - **`orchestrator.eval.eval_base_model` → `orchestrator.eval.skip_first_step`** (semantics inverted): `eval_base_model = true` becomes `skip_first_step = false` (the default — run the step-0 eval before any train rollouts). No alias; configs setting `eval_base_model` must rename. diff --git a/packages/prime-rl-configs/src/prime_rl/configs/rl.py b/packages/prime-rl-configs/src/prime_rl/configs/rl.py index a10e6b0291..795c9e1b31 100644 --- a/packages/prime-rl-configs/src/prime_rl/configs/rl.py +++ b/packages/prime-rl-configs/src/prime_rl/configs/rl.py @@ -448,6 +448,20 @@ def validate_router_replay_without_kv_offload(self): ) return self + @model_validator(mode="after") + def validate_mooncake_offload_requires_slurm(self): + if ( + self.slurm is None + and self.inference is not None + and self.inference.kv_cache_offload is not None + and self.inference.kv_cache_offload.type == "mooncake" + ): + raise ValueError( + "Mooncake KV offload requires SLURM — the per-node store is launched by the sbatch " + "template. Use inference.kv_cache_offload.type='native' for local runs." + ) + return self + @model_validator(mode="after") def auto_setup_deployment(self): if self.deployment.type == "single_node": # single-node diff --git a/src/prime_rl/entrypoints/inference.py b/src/prime_rl/entrypoints/inference.py index 563880fed3..decb08b2d5 100644 --- a/src/prime_rl/entrypoints/inference.py +++ b/src/prime_rl/entrypoints/inference.py @@ -36,6 +36,9 @@ def write_slurm_script(config: InferenceConfig, config_path: Path, script_path: is_disaggregated = config.deployment.type == "disaggregated" dp_per_node = config.deployment.gpus_per_node // config.parallel.tp + offload = config.kv_cache_offload + is_mooncake = offload is not None and offload.type == "mooncake" + template_vars = dict( **config.slurm.template_vars, config_path=config_path, @@ -45,8 +48,11 @@ def write_slurm_script(config: InferenceConfig, config_path: Path, script_path: num_nodes=getattr(config.deployment, "num_nodes", 1), port=config.server.port, disaggregated=is_disaggregated, - kv_offload=config.kv_cache_offload is not None, - kv_offload_mooncake=config.kv_cache_offload is not None and config.kv_cache_offload.type == "mooncake", + kv_offload=offload is not None, + kv_offload_mooncake=is_mooncake, + kv_offload_cpu_bytes=int(offload.cpu.num_bytes) if is_mooncake else 0, + kv_offload_disk_path=str(offload.disk.path) if (is_mooncake and offload.disk is not None) else "", + kv_offload_device_name=offload.device_name if is_mooncake else "", ) is_multi_node = config.deployment.type == "multi_node" @@ -117,10 +123,25 @@ def inference_slurm(config: InferenceConfig): def inference_local(config: InferenceConfig): """Run inference locally.""" + import os + from prime_rl.inference.server import setup_vllm_env logger = setup_logger("info") + # Mooncake offload relies on the per-node store the sbatch template launches (which also + # exports MOONCAKE_CONFIG_PATH). When that env is absent, this is a bare local run with no + # store, so fail fast instead of letting the worker error on a missing config. + if ( + config.kv_cache_offload is not None + and config.kv_cache_offload.type == "mooncake" + and "MOONCAKE_CONFIG_PATH" not in os.environ + ): + raise ValueError( + "Mooncake KV offload requires SLURM — the per-node store (master + client) is launched " + "by the sbatch template. Use inference.kv_cache_offload.type='native' for local runs." + ) + if config.dry_run: logger.success("Dry run complete. To start inference locally, remove --dry-run from your command.") return diff --git a/src/prime_rl/entrypoints/rl.py b/src/prime_rl/entrypoints/rl.py index 655cbd3ef3..d98749f9e8 100644 --- a/src/prime_rl/entrypoints/rl.py +++ b/src/prime_rl/entrypoints/rl.py @@ -336,6 +336,16 @@ def write_slurm_script(config: RLConfig, config_dir: Path, script_path: Path) -> env = Environment(loader=FileSystemLoader(config.slurm.template_path.parent), keep_trailing_newline=True) template = env.get_template(config.slurm.template_path.name) + offload = config.inference.kv_cache_offload if config.inference is not None else None + is_mooncake = offload is not None and offload.type == "mooncake" + mooncake_vars = dict( + kv_offload=offload is not None, + kv_offload_mooncake=is_mooncake, + kv_offload_cpu_bytes=int(offload.cpu.num_bytes) if is_mooncake else 0, + kv_offload_disk_path=str(offload.disk.path) if (is_mooncake and offload.disk is not None) else "", + kv_offload_device_name=offload.device_name if is_mooncake else "", + ) + if config.deployment.type == "single_node": script = template.render( **config.slurm.template_vars, @@ -370,9 +380,7 @@ def write_slurm_script(config: RLConfig, config_dir: Path, script_path: Path) -> prefill_env_overrides=infer_deploy.prefill_env_overrides, decode_env_overrides=infer_deploy.decode_env_overrides, dp_per_node=config.deployment.gpus_per_node // config.inference.parallel.tp, - kv_offload=config.inference.kv_cache_offload is not None, - kv_offload_mooncake=config.inference.kv_cache_offload is not None - and config.inference.kv_cache_offload.type == "mooncake", + **mooncake_vars, use_nccl_broadcast=config.weight_broadcast is not None and config.weight_broadcast.type == "nccl", ranks_filter=",".join(map(str, config.trainer.log.ranks_filter)), ) @@ -394,10 +402,7 @@ def write_slurm_script(config: RLConfig, config_dir: Path, script_path: Path) -> inference_enable_expert_parallel=config.inference.enable_expert_parallel if config.inference else False, inference_data_parallel_rpc_port=config.inference.data_parallel_rpc_port if config.inference else 29600, dp_per_node=(config.deployment.gpus_per_node // config.inference.parallel.tp) if config.inference else 1, - kv_offload=config.inference is not None and config.inference.kv_cache_offload is not None, - kv_offload_mooncake=config.inference is not None - and config.inference.kv_cache_offload is not None - and config.inference.kv_cache_offload.type == "mooncake", + **mooncake_vars, use_nccl_broadcast=config.weight_broadcast is not None and config.weight_broadcast.type == "nccl", ranks_filter=",".join(map(str, config.trainer.log.ranks_filter)), ) diff --git a/src/prime_rl/inference/mooncake.py b/src/prime_rl/inference/mooncake.py deleted file mode 100644 index ab8251752f..0000000000 --- a/src/prime_rl/inference/mooncake.py +++ /dev/null @@ -1,252 +0,0 @@ -"""Node-local Mooncake store launcher for KV cache offload (standalone-store topology). - -Each node runs its own ``mooncake_master`` + one ``mooncake_client``; the node's vLLM -workers connect to that local store as clients (contributing no segment of their own). The -client owns the pool: a DRAM segment sized by the cpu tier, plus a file-backed disk tier -when a disk tier is configured. See ``MooncakeKVCacheOffloadConfig``. - -This module both (a) is imported by the local launch path in ``inference/vllm/server.py`` -and (b) exposes a small CLI used by the SLURM templates to bring the store up per node. -""" - -from __future__ import annotations - -import json -import os -import shutil -import socket -import subprocess -import sys -import time -from dataclasses import dataclass -from pathlib import Path - -from prime_rl.configs.inference import MooncakeKVCacheOffloadConfig - -DEFAULT_MASTER_RPC_PORT = 50051 -DEFAULT_METADATA_HTTP_PORT = 8080 -DEFAULT_CLIENT_PORT = 50052 -# Per-worker transfer staging buffer in the store client (mirrors Mooncake's default). -DEFAULT_LOCAL_BUFFER_SIZE = 4 * 1024**3 -# Mooncake transfer protocol (RDMA only). -MOONCAKE_PROTOCOL = "rdma" - - -def _bin(name: str) -> str: - """Resolve a Mooncake console-script, preferring the one in this interpreter's venv.""" - candidate = Path(sys.executable).parent / name - if candidate.exists(): - return str(candidate) - return shutil.which(name) or name - - -def _local_ip() -> str: - # Match the address the vLLM worker registers with the store (it uses vllm's get_ip()). - from vllm.utils.network_utils import get_ip - - return get_ip() - - -def _wait_for_port(host: str, port: int, timeout: float = 30.0) -> None: - """Block until ``host:port`` accepts a TCP connection, or raise on timeout.""" - deadline = time.monotonic() + timeout - last_err: OSError | None = None - while time.monotonic() < deadline: - try: - with socket.create_connection((host, port), timeout=1.0): - return - except OSError as e: - last_err = e - time.sleep(0.2) - raise TimeoutError(f"Mooncake endpoint {host}:{port} did not come up within {timeout}s: {last_err}") - - -@dataclass -class MooncakeStore: - """Handles for a running node-local Mooncake store and the env it needs. - - With one client per node there is exactly one segment in the store, so the master - routes all puts/gets to it — no ``preferred_segment`` pinning is needed (that hint only - matters when several client segments coexist in one pool). - """ - - master: subprocess.Popen - client: subprocess.Popen - config_path: Path - env: dict[str, str] - - def apply_env(self) -> None: - """Export the env vars the vLLM worker process needs (config path, segment, hash seed).""" - os.environ.update(self.env) - - def shutdown(self) -> None: - for proc in (self.client, self.master): - if proc.poll() is None: - proc.terminate() - try: - proc.wait(timeout=10) - except subprocess.TimeoutExpired: - proc.kill() - - -def _resolve_addresses(cfg: MooncakeKVCacheOffloadConfig) -> tuple[str, str, str, int]: - """Return (master_server_address, metadata_server, client_host, client_port). - - The metadata server is always the HTTP server hosted by the master (auto-detected to the - master host); the node-local master hosts it unless an external master is configured. - """ - ip = _local_ip() - master_addr = cfg.master_server_address or f"{ip}:{DEFAULT_MASTER_RPC_PORT}" - master_host = master_addr.split(":")[0] - metadata = f"http://{master_host}:{DEFAULT_METADATA_HTTP_PORT}/metadata" - return master_addr, metadata, ip, DEFAULT_CLIENT_PORT - - -def write_config_file(cfg: MooncakeKVCacheOffloadConfig, master_addr: str, metadata: str, path: Path) -> None: - """Write the ``MOONCAKE_CONFIG_PATH`` JSON read per-worker by ``MooncakeStoreConfig.from_file``. - - Standalone-store: workers contribute no segment (``global_segment_size == 0``); the - node-local client owns the pool. ``enable_offload`` is on when a disk tier exists. - """ - config = { - "mode": "standalone-store", - "global_segment_size": 0, - "local_buffer_size": DEFAULT_LOCAL_BUFFER_SIZE, - "protocol": MOONCAKE_PROTOCOL, - "device_name": cfg.device_name, - "master_server_address": master_addr, - "metadata_server": metadata, - "enable_offload": cfg.disk is not None, - } - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(json.dumps(config, indent=2)) - - -def _offload_env(cfg: MooncakeKVCacheOffloadConfig) -> dict[str, str]: - """Offload (disk-tier) env shared by the master and client when a disk tier is set. - - The master serves the fsdir to the client (the client does ``get_fsdir`` from it), and - the client writes the offloaded block files there — so both processes need the path. - """ - if cfg.disk is None: - return {} - cfg.disk.path.mkdir(parents=True, exist_ok=True) - return { - "MOONCAKE_OFFLOAD_FSDIR": str(cfg.disk.path), - "MOONCAKE_OFFLOAD_FILE_STORAGE_PATH": str(cfg.disk.path), - "MOONCAKE_OFFLOAD_ENABLE_EVICTION": "true", - } - - -def _launch_master(cfg: MooncakeKVCacheOffloadConfig, ip: str, log_dir: Path) -> subprocess.Popen: - args = [ - _bin("mooncake_master"), - f"-rpc_port={DEFAULT_MASTER_RPC_PORT}", - "-enable_http_metadata_server", - f"-http_metadata_server_host={ip}", - f"-http_metadata_server_port={DEFAULT_METADATA_HTTP_PORT}", - ] - if cfg.disk is not None: - # The master owns the file-storage backend config the client mounts against. - args += ["-enable_offload=true", f"-root_fs_dir={cfg.disk.path}"] - log = (log_dir / "mooncake_master.log").open("w") - return subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT, env={**os.environ, **_offload_env(cfg)}) - - -def _launch_client( - cfg: MooncakeKVCacheOffloadConfig, ip: str, master_addr: str, metadata: str, log_dir: Path -) -> subprocess.Popen: - assert cfg.cpu is not None # enforced by config validation - args = [ - _bin("mooncake_client"), - f"-host={ip}", - f"-port={DEFAULT_CLIENT_PORT}", - f"-master_server_address={master_addr}", - f"-metadata_server={metadata}", - f"-protocol={MOONCAKE_PROTOCOL}", - f"-global_segment_size={int(cfg.cpu.num_bytes)}", - ] - if cfg.device_name: - args.append(f"-device_names={cfg.device_name}") - if cfg.disk is not None: - args.append("-enable_offload=true") - log = (log_dir / "mooncake_client.log").open("w") - return subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT, env={**os.environ, **_offload_env(cfg)}) - - -def start_mooncake_store(cfg: MooncakeKVCacheOffloadConfig, output_dir: Path) -> MooncakeStore: - """Launch the node-local master + client, write the config JSON, and return handles + env. - - The caller must export the returned env (``apply_env``) before the vLLM engine process - starts (``PYTHONHASHSEED`` must precede the interpreter that hashes KV blocks). - """ - log_dir = output_dir - log_dir.mkdir(parents=True, exist_ok=True) - - master_addr, metadata, client_ip, client_port = _resolve_addresses(cfg) - - master = None - if cfg.master_server_address is None: - master = _launch_master(cfg, client_ip, log_dir) - master_host, master_port = master_addr.split(":") - _wait_for_port(master_host, int(master_port)) - _wait_for_port(client_ip, DEFAULT_METADATA_HTTP_PORT) - - client = _launch_client(cfg, client_ip, master_addr, metadata, log_dir) - _wait_for_port(client_ip, client_port) - - config_path = output_dir / "mooncake_config.json" - write_config_file(cfg, master_addr, metadata, config_path) - - env = { - "MOONCAKE_CONFIG_PATH": str(config_path), - # Reproducible KV block hashes across the store client and vLLM worker processes. - "PYTHONHASHSEED": "0", - } - # If no separate master was launched (external master), there is no master Popen; use the - # client handle as a stand-in so shutdown() stays simple. - return MooncakeStore( - master=master if master is not None else client, - client=client, - config_path=config_path, - env=env, - ) - - -def _main() -> None: - """CLI used by the SLURM templates: launch the node-local store and print env exports. - - Usage: ``python -m prime_rl.inference.mooncake --config --output-dir ``. - Launches the node-local master + client (which outlive this process) and prints shell - ``export`` lines for the template to ``eval`` before starting vLLM on that node. - """ - import argparse - import tomllib - - from pydantic import TypeAdapter - - from prime_rl.configs.inference import KVCacheOffloadConfig - - parser = argparse.ArgumentParser(description="Launch a node-local Mooncake store.") - parser.add_argument("--config", type=Path, required=True, help="Path to the inference TOML.") - parser.add_argument("--output-dir", type=Path, required=True, help="Directory for the store config + logs.") - args = parser.parse_args() - - raw = tomllib.loads(args.config.read_text()) - offload_raw = raw.get("kv_cache_offload") - if offload_raw is None: - raise ValueError("inference config has no kv_cache_offload section") - offload = TypeAdapter(KVCacheOffloadConfig).validate_python(offload_raw) - if not isinstance(offload, MooncakeKVCacheOffloadConfig): - raise ValueError("mooncake CLI requires inference.kv_cache_offload.type == 'mooncake'") - - store = start_mooncake_store(offload, args.output_dir) - # Write the env as a sourceable file so the SLURM template can `source` it without - # capturing stdout (which may carry unrelated import logging). - env_sh = args.output_dir / "env.sh" - env_sh.write_text("".join(f"export {key}={value}\n" for key, value in store.env.items())) - print(f"wrote {env_sh}") - - -if __name__ == "__main__": - _main() diff --git a/src/prime_rl/inference/vllm/server.py b/src/prime_rl/inference/vllm/server.py index c146d21fef..f411695c78 100644 --- a/src/prime_rl/inference/vllm/server.py +++ b/src/prime_rl/inference/vllm/server.py @@ -192,19 +192,6 @@ def server(config: InferenceConfig, vllm_extra: dict[str, Any] | None = None): if config.lora_target_modules and not any("expert" in m for m in config.lora_target_modules): os.environ["PRIME_NO_MOE_LORA"] = "1" - # Launch the node-local Mooncake store for local runs. The SLURM launcher does this per - # node and exports MOONCAKE_CONFIG_PATH itself, so skip when it is already set. The store - # env (incl. PYTHONHASHSEED) must be applied before the engine/worker processes spawn. - from prime_rl.configs.inference import MooncakeKVCacheOffloadConfig - - mooncake_store = None - if isinstance(config.kv_cache_offload, MooncakeKVCacheOffloadConfig) and "MOONCAKE_CONFIG_PATH" not in os.environ: - from prime_rl.inference.mooncake import start_mooncake_store - - mooncake_store = start_mooncake_store(config.kv_cache_offload, config.output_dir / "mooncake") - mooncake_store.apply_env() - logger.info(f"Started node-local Mooncake store (config: {mooncake_store.config_path})") - namespace = config.to_vllm() if vllm_extra: for key, value in vllm_extra.items(): @@ -219,15 +206,11 @@ def server(config: InferenceConfig, vllm_extra: dict[str, Any] | None = None): # Set the worker extension class based on the broadcast backend args.worker_extension_cls = WORKER_EXTENSION_CLS[config.weight_broadcast.type] - try: - if args.headless or args.api_server_count < 1: - run_headless(args) + if args.headless or args.api_server_count < 1: + run_headless(args) + else: + if args.api_server_count > 1: + run_multi_api_server(args) else: - if args.api_server_count > 1: - run_multi_api_server(args) - else: - # Single API server (this process). - uvloop.run(run_server(args)) - finally: - if mooncake_store is not None: - mooncake_store.shutdown() + # Single API server (this process). + uvloop.run(run_server(args)) diff --git a/src/prime_rl/templates/_mooncake_store.sh.j2 b/src/prime_rl/templates/_mooncake_store.sh.j2 new file mode 100644 index 0000000000..7ad3f226da --- /dev/null +++ b/src/prime_rl/templates/_mooncake_store.sh.j2 @@ -0,0 +1,30 @@ +{#- Node-local Mooncake store (master + client, standalone-store) for KV cache offload. + Included by the SLURM templates inside the per-node `bash -c` body, so this file must + contain NO single quotes / apostrophes (they would close the wrapping quote). + Requires shell vars: LOCAL_IP, INFER_NODE_RANK, OUTPUT_DIR. + Requires jinja vars: kv_offload_cpu_bytes, kv_offload_disk_path, kv_offload_device_name. + Exports PYTHONHASHSEED + MOONCAKE_CONFIG_PATH for the vLLM process on this node. -#} + # ---- Node-local Mooncake store (standalone-store) ---- + MC_DIR="$OUTPUT_DIR/mooncake/node_${INFER_NODE_RANK}" + mkdir -p "$MC_DIR" + export PYTHONHASHSEED=0 + export MOONCAKE_CONFIG_PATH="$MC_DIR/config.json" + MC_MASTER="$LOCAL_IP:50051" + MC_METADATA="http://$LOCAL_IP:8080/metadata" +{%- if kv_offload_disk_path %} + mkdir -p "{{ kv_offload_disk_path }}" + export MOONCAKE_OFFLOAD_FSDIR="{{ kv_offload_disk_path }}" + export MOONCAKE_OFFLOAD_FILE_STORAGE_PATH="{{ kv_offload_disk_path }}" + export MOONCAKE_OFFLOAD_ENABLE_EVICTION=true + mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=$LOCAL_IP -http_metadata_server_port=8080 -enable_offload=true -root_fs_dir={{ kv_offload_disk_path }} > "$MC_DIR/master.log" 2>&1 & +{%- else %} + mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=$LOCAL_IP -http_metadata_server_port=8080 > "$MC_DIR/master.log" 2>&1 & +{%- endif %} + for _ in $(seq 1 60); do timeout 1 bash -c "echo > /dev/tcp/$LOCAL_IP/50051" 2>/dev/null && break; sleep 1; done + for _ in $(seq 1 60); do timeout 1 bash -c "echo > /dev/tcp/$LOCAL_IP/8080" 2>/dev/null && break; sleep 1; done + mooncake_client -host=$LOCAL_IP -port=50052 -master_server_address=$MC_MASTER -metadata_server=$MC_METADATA -protocol=rdma -global_segment_size={{ kv_offload_cpu_bytes }}{% if kv_offload_device_name %} -device_names={{ kv_offload_device_name }}{% endif %}{% if kv_offload_disk_path %} -enable_offload=true{% endif %} > "$MC_DIR/client.log" 2>&1 & + for _ in $(seq 1 60); do timeout 1 bash -c "echo > /dev/tcp/$LOCAL_IP/50052" 2>/dev/null && break; sleep 1; done + cat > "$MOONCAKE_CONFIG_PATH" < Date: Tue, 2 Jun 2026 07:36:35 +0530 Subject: [PATCH 4/5] feat(inference): Mooncake shared distributed KV pool across nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch the Mooncake offload backend from per-node isolated stores to a single shared distributed pool, so all inference nodes' CPU RAM (and disk) form one KV cache and a prefix cached on any node is reusable by every other node/replica. - `_mooncake_store.sh.j2`: the head inference node (first in the SLURM nodelist) runs the one master + HTTP metadata server; every node runs a client pointing at the head and contributes its segment. Store keys are model + parallel-rank + content-hash (no instance id), so the pool is reused across nodes and replicas. - Drop the now-unused `MooncakeKVCacheOffloadConfig.master_server_address` (auto head). - docs: describe the shared pool (total pool ≈ num_bytes × #inference-nodes). Verified on a 2-node SLURM job (Qwen3-0.6B): one master on the head, both clients join (2 segments), and a prefix cached on node A is served from node B over RDMA (load_get ~160 MB cross-node hit). Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/inference.md | 4 +-- .../src/prime_rl/configs/inference.py | 11 +++---- src/prime_rl/templates/_mooncake_store.sh.j2 | 33 ++++++++++++------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/docs/inference.md b/docs/inference.md index 474131d297..0552c8f245 100644 --- a/docs/inference.md +++ b/docs/inference.md @@ -197,7 +197,7 @@ Maximizing KV-Cache space is crucial to support high-concurrency workloads. You The `type` field selects the backend: - `native` — vLLM's built-in offloading. CPU-only uses `OffloadingConnector`; CPU+disk uses `TieringOffloadingSpec` (a CPU primary tier with a filesystem secondary tier). Fully self-contained — no extra processes. -- `mooncake` — a per-node [Mooncake](https://github.com/kvcache-ai/Mooncake) distributed store. The launcher starts a node-local `mooncake_master` + `mooncake_client`; the client owns the pool (a DRAM segment, plus a file-backed disk tier when `disk` is set). +- `mooncake` — a [Mooncake](https://github.com/kvcache-ai/Mooncake) **shared distributed store** (SLURM only). One `mooncake_master` + metadata server runs on the head inference node; every inference node runs a `mooncake_client` that contributes its DRAM (and, with `disk`, SSD) segment to that *single* pool. Because blocks are keyed by model + parallel rank + content hash (no instance id), a prefix cached by one node/replica is reusable by all of them over RDMA — pooling every node's CPU RAM into one KV cache. Use `native` for local/single-process runs. ```toml # Native CPU offload (reserves 128GB of CPU KV cache for this instance) @@ -223,7 +223,7 @@ num_bytes = 128_000_000_000 path = "/scratch/kv" ``` -For `native`, `cpu.num_bytes` is the aggregate CPU KV pool for the instance (vLLM shards it across workers). For `mooncake`, `cpu.num_bytes` is the per-node store client's DRAM segment (the store uses RDMA, so it requires an RDMA-capable fabric). Enabling offload automatically enables prefix caching. +For `native`, `cpu.num_bytes` is the aggregate CPU KV pool for the instance (vLLM shards it across workers). For `mooncake`, `cpu.num_bytes` is the DRAM each node contributes to the shared pool (so the total pool ≈ `num_bytes × #inference-nodes`); the store uses RDMA, so it requires an RDMA-capable fabric. Enabling offload automatically enables prefix caching. ### Optimized P/D disaggregation deployment diff --git a/packages/prime-rl-configs/src/prime_rl/configs/inference.py b/packages/prime-rl-configs/src/prime_rl/configs/inference.py index 9f88e2185b..93a0860bcf 100644 --- a/packages/prime-rl-configs/src/prime_rl/configs/inference.py +++ b/packages/prime-rl-configs/src/prime_rl/configs/inference.py @@ -132,18 +132,15 @@ def to_connector_dict(self) -> dict[str, Any]: class MooncakeKVCacheOffloadConfig(BaseKVCacheOffloadConfig): type: Literal["mooncake"] = "mooncake" - """Mooncake distributed store offloading (standalone-store topology). A per-node ``mooncake_master`` + ``mooncake_client`` own the pool and vLLM workers are clients. The cpu tier sets the client's DRAM segment; the optional disk tier adds the client's SSD tier.""" - - master_server_address: str | None = None - """Mooncake master address (``host:port``). If None, the launcher starts a node-local master.""" + """Mooncake distributed store offloading (SLURM only). One ``mooncake_master`` + metadata server runs on the head inference node; every node runs a ``mooncake_client`` contributing its segment to the single shared pool, so prefixes cached on any node are reusable by all. The cpu tier sizes each node's DRAM segment; the optional disk tier adds an SSD tier.""" device_name: str = "" """RDMA device name(s) for the store (empty = auto-detect).""" def to_connector_dict(self) -> dict[str, Any]: - # Sizes/addresses/tiers are realized via the MOONCAKE_CONFIG_PATH JSON and the - # mooncake_client launch; the worker reads preferred_segment from the - # MOONCAKE_PREFERRED_SEGMENT env the launcher exports. + # Addresses/sizes/tiers are realized by the per-node store launch in the sbatch + # template (MOONCAKE_CONFIG_PATH JSON); blocks are keyed by model + parallel rank + + # content hash (no instance id), so the shared pool is reused across nodes/replicas. return { "kv_connector": "MooncakeStoreConnector", "kv_role": "kv_both", diff --git a/src/prime_rl/templates/_mooncake_store.sh.j2 b/src/prime_rl/templates/_mooncake_store.sh.j2 index 7ad3f226da..6068fd06d1 100644 --- a/src/prime_rl/templates/_mooncake_store.sh.j2 +++ b/src/prime_rl/templates/_mooncake_store.sh.j2 @@ -1,30 +1,39 @@ -{#- Node-local Mooncake store (master + client, standalone-store) for KV cache offload. - Included by the SLURM templates inside the per-node `bash -c` body, so this file must - contain NO single quotes / apostrophes (they would close the wrapping quote). - Requires shell vars: LOCAL_IP, INFER_NODE_RANK, OUTPUT_DIR. +{#- Mooncake distributed store (shared pool across inference nodes) for KV cache offload. + One master + HTTP metadata server runs on the head inference node (first in the SLURM + nodelist); every inference node runs a client that contributes its DRAM/SSD segment to + that single pool, so any node can reuse prefixes cached by any other. Included by the + SLURM templates inside the per-node `bash -c` body, so NO single quotes / apostrophes. + Requires shell vars: LOCAL_IP, INFER_NODE_RANK, OUTPUT_DIR (and SLURM env). Requires jinja vars: kv_offload_cpu_bytes, kv_offload_disk_path, kv_offload_device_name. Exports PYTHONHASHSEED + MOONCAKE_CONFIG_PATH for the vLLM process on this node. -#} - # ---- Node-local Mooncake store (standalone-store) ---- + # ---- Mooncake distributed store (shared pool across inference nodes) ---- MC_DIR="$OUTPUT_DIR/mooncake/node_${INFER_NODE_RANK}" mkdir -p "$MC_DIR" export PYTHONHASHSEED=0 export MOONCAKE_CONFIG_PATH="$MC_DIR/config.json" - MC_MASTER="$LOCAL_IP:50051" - MC_METADATA="http://$LOCAL_IP:8080/metadata" + # The head inference node (first in the nodelist) hosts the one master + metadata server. + MC_HEAD=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -1) + MC_MASTER="$MC_HEAD:50051" + MC_METADATA="http://$MC_HEAD:8080/metadata" {%- if kv_offload_disk_path %} mkdir -p "{{ kv_offload_disk_path }}" export MOONCAKE_OFFLOAD_FSDIR="{{ kv_offload_disk_path }}" export MOONCAKE_OFFLOAD_FILE_STORAGE_PATH="{{ kv_offload_disk_path }}" export MOONCAKE_OFFLOAD_ENABLE_EVICTION=true - mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=$LOCAL_IP -http_metadata_server_port=8080 -enable_offload=true -root_fs_dir={{ kv_offload_disk_path }} > "$MC_DIR/master.log" 2>&1 & +{%- endif %} + if [ "$SLURMD_NODENAME" = "$MC_HEAD" ]; then +{%- if kv_offload_disk_path %} + mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=0.0.0.0 -http_metadata_server_port=8080 -enable_offload=true -root_fs_dir={{ kv_offload_disk_path }} > "$MC_DIR/master.log" 2>&1 & {%- else %} - mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=$LOCAL_IP -http_metadata_server_port=8080 > "$MC_DIR/master.log" 2>&1 & + mooncake_master -rpc_port=50051 -enable_http_metadata_server -http_metadata_server_host=0.0.0.0 -http_metadata_server_port=8080 > "$MC_DIR/master.log" 2>&1 & {%- endif %} - for _ in $(seq 1 60); do timeout 1 bash -c "echo > /dev/tcp/$LOCAL_IP/50051" 2>/dev/null && break; sleep 1; done - for _ in $(seq 1 60); do timeout 1 bash -c "echo > /dev/tcp/$LOCAL_IP/8080" 2>/dev/null && break; sleep 1; done + fi + # Every node waits for the shared master + metadata, then contributes its segment. + for _ in $(seq 1 120); do timeout 1 bash -c "echo > /dev/tcp/$MC_HEAD/50051" 2>/dev/null && break; sleep 1; done + for _ in $(seq 1 120); do timeout 1 bash -c "echo > /dev/tcp/$MC_HEAD/8080" 2>/dev/null && break; sleep 1; done mooncake_client -host=$LOCAL_IP -port=50052 -master_server_address=$MC_MASTER -metadata_server=$MC_METADATA -protocol=rdma -global_segment_size={{ kv_offload_cpu_bytes }}{% if kv_offload_device_name %} -device_names={{ kv_offload_device_name }}{% endif %}{% if kv_offload_disk_path %} -enable_offload=true{% endif %} > "$MC_DIR/client.log" 2>&1 & for _ in $(seq 1 60); do timeout 1 bash -c "echo > /dev/tcp/$LOCAL_IP/50052" 2>/dev/null && break; sleep 1; done cat > "$MOONCAKE_CONFIG_PATH" < Date: Tue, 2 Jun 2026 21:22:44 +0530 Subject: [PATCH 5/5] chore(inference): drop runtime mooncake-requires-slurm check The RLConfig `validate_mooncake_offload_requires_slurm` config validator already guards the misuse case; the per-request runtime check in `inference_local` (and its `import os`) was redundant. Remove it. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/prime_rl/entrypoints/inference.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/src/prime_rl/entrypoints/inference.py b/src/prime_rl/entrypoints/inference.py index decb08b2d5..1d76a35a3d 100644 --- a/src/prime_rl/entrypoints/inference.py +++ b/src/prime_rl/entrypoints/inference.py @@ -123,25 +123,10 @@ def inference_slurm(config: InferenceConfig): def inference_local(config: InferenceConfig): """Run inference locally.""" - import os - from prime_rl.inference.server import setup_vllm_env logger = setup_logger("info") - # Mooncake offload relies on the per-node store the sbatch template launches (which also - # exports MOONCAKE_CONFIG_PATH). When that env is absent, this is a bare local run with no - # store, so fail fast instead of letting the worker error on a missing config. - if ( - config.kv_cache_offload is not None - and config.kv_cache_offload.type == "mooncake" - and "MOONCAKE_CONFIG_PATH" not in os.environ - ): - raise ValueError( - "Mooncake KV offload requires SLURM — the per-node store (master + client) is launched " - "by the sbatch template. Use inference.kv_cache_offload.type='native' for local runs." - ) - if config.dry_run: logger.success("Dry run complete. To start inference locally, remove --dry-run from your command.") return